sdf_metadata/metadata/dataflow/
dataflow_definition.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    fmt::Display,
4};
5
6use anyhow::Result;
7use tracing::info;
8
9use sdf_common::{
10    constants::DATAFLOW_STABLE_VERSION,
11    version::{ApiVersion, SdfContextVersion},
12};
13
14use crate::{
15    importer::resolver::DependencyResolver,
16    metadata::{
17        io::topic::{validate_topic_name, TopicValidationError, TopicValidationFailure},
18        metadata::header::HeaderValidationError,
19    },
20    util::{
21        merge::merge_types_and_states,
22        config_error::{ConfigError, INDENT},
23        operator_placement::OperatorPlacement,
24        sdf_types_map::SdfTypesMap,
25        validate::MetadataTypeValidationFailure,
26    },
27    wit::{
28        dataflow::{DataflowDefinition, Header, Operations, PackageDefinition, State},
29        metadata::{MetadataType, SdfType, SdfTypeOrigin},
30        operator::OperatorType,
31        package_interface::{PackageImport, StepInvocation},
32    },
33};
34
35use super::{operations::ServiceValidationFailure, schedule_config::ScheduleValidationFailure};
36
37#[derive(Debug, Clone, Eq, PartialEq)]
38pub struct DataflowDefinitionValidationFailure {
39    pub errors: Vec<DataflowDefinitionValidationError>,
40}
41
42impl Display for DataflowDefinitionValidationFailure {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        writeln!(f, "Dataflow Config failed validation\n")?;
45
46        for error in &self.errors {
47            writeln!(f, "{}", error.readable(1))?;
48        }
49
50        Ok(())
51    }
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
55pub enum DataflowDefinitionValidationError {
56    Meta(Vec<HeaderValidationError>),
57    Type(MetadataTypeValidationFailure),
58    Topic(TopicValidationFailure),
59    Service(ServiceValidationFailure),
60    DuplicateOperator(String),
61    Schedule(ScheduleValidationFailure),
62    UndefinedState {
63        service_name: String,
64        ref_state_name: String,
65    },
66    Versioning(DataflowDefinitionVersionError),
67    PackageConflictName(String),
68}
69
70#[derive(Debug, Clone, Eq, PartialEq)]
71pub enum DataflowDefinitionVersionError {
72    UnsupportedVersion(String),
73    InvalidVersionFeature {
74        version: String,
75        feature: String,
76        supported_version: String,
77    },
78    FailedToParseVersion(String),
79}
80
81impl ConfigError for DataflowDefinitionVersionError {
82    fn readable(&self, indents: usize) -> String {
83        let indent = INDENT.repeat(indents);
84
85        match self {
86            Self::UnsupportedVersion(version) => {
87                format!("{}Unsupported version: {}\n", indent, version)
88            }
89            Self::InvalidVersionFeature {
90                version,
91                feature,
92                supported_version,
93            } => {
94                format!(
95                    "{}Version {} does not support configuration: {}, supported version: {}\n",
96                    indent, version, feature, supported_version
97                )
98            }
99            Self::FailedToParseVersion(err) => {
100                format!("{}Failed to parse version: {}\n", indent, err)
101            }
102        }
103    }
104}
105
106impl ConfigError for DataflowDefinitionValidationError {
107    fn readable(&self, indents: usize) -> String {
108        let indent = INDENT.repeat(indents);
109
110        match self {
111            Self::Meta(errors) => {
112                let mut res = format!("{}Header is invalid:\n", indent);
113
114                for error in errors {
115                    res.push_str(&error.readable(indents + 1));
116                }
117
118                res
119            }
120            Self::Type(failure) => failure.readable(indents),
121            Self::Topic(failure) => failure.readable(indents),
122            Self::Service(failure) => failure.readable(indents),
123            Self::Schedule(failure) => failure.readable(indents),
124            Self::DuplicateOperator(name) => {
125                format!(
126                    "{}Duplicate inline operator with name: {} was found, inline operators must have unique names\n",
127                    indent, name
128                )
129            }
130            Self::PackageConflictName(name) => {
131                format!(
132                    "{}Package {} conflicts with dataflow name and namespace\n",
133                    indent, name
134                )
135            }
136            Self::UndefinedState {
137                service_name,
138                ref_state_name,
139            } => {
140                format!(
141                    "{}State with name {} is referenced in service {} but not defined in the dataflow\n",
142                    indent, ref_state_name, service_name
143                )
144            }
145            Self::Versioning(err) => err.readable(indents),
146        }
147    }
148}
149
150#[allow(clippy::derivable_impls)]
151impl Default for DataflowDefinition {
152    fn default() -> Self {
153        Self {
154            api_version: DATAFLOW_STABLE_VERSION.to_string(),
155            meta: Default::default(),
156            imports: Default::default(),
157            types: Default::default(),
158            services: Default::default(),
159            topics: Default::default(),
160            dev: Default::default(),
161            packages: Default::default(),
162            schedule: Default::default(),
163            default_config: Default::default(),
164        }
165    }
166}
167
168#[allow(clippy::derivable_impls)]
169impl Default for Header {
170    fn default() -> Self {
171        Self {
172            name: Default::default(),
173            version: Default::default(),
174            namespace: Default::default(),
175        }
176    }
177}
178
179impl DataflowDefinition {
180    /// readable name
181    pub fn name(&self) -> String {
182        self.meta.to_string()
183    }
184
185    pub fn has_custom_types(&self) -> bool {
186        !self.types.is_empty() || self.services.iter().any(|s| !s.states.is_empty())
187    }
188
189    pub fn resolve_imports(&mut self, debug: bool) -> Result<()> {
190        let dependency_resolver =
191            DependencyResolver::build(self.imports.clone(), self.packages.clone(), debug)?;
192        let package_configs = dependency_resolver.packages()?;
193
194        self.merge_dependencies(&package_configs)?;
195
196        for service in self.services.iter_mut() {
197            service.import_operator_configs(&self.imports, &package_configs)?;
198        }
199
200        self.packages = package_configs;
201
202        Ok(())
203    }
204
205    pub fn add_imported_operator(
206        &mut self,
207        function: StepInvocation,
208        operator_type: OperatorType,
209        operator_placement: OperatorPlacement,
210        package_import: PackageImport,
211    ) -> Result<()> {
212        self.merge_package_import(package_import);
213
214        let service = self.get_service_mut(&operator_placement.service_id)?;
215
216        service.add_operator(operator_type, operator_placement, function)?;
217
218        Ok(())
219    }
220
221    pub fn add_inline_operator(
222        &mut self,
223        function: StepInvocation,
224        operator_type: OperatorType,
225        operator_placement: OperatorPlacement,
226    ) -> Result<()> {
227        if function.code_info.code.is_none() {
228            return Err(anyhow::anyhow!("inline operator must have code"));
229        }
230
231        let service = self.get_service_mut(&operator_placement.service_id)?;
232
233        service.add_operator(operator_type, operator_placement, function)?;
234
235        Ok(())
236    }
237
238    pub fn replace_inline_operator(
239        &mut self,
240        function: StepInvocation,
241        operator_type: OperatorType,
242        operator_placement: OperatorPlacement,
243    ) -> Result<()> {
244        if function.code_info.code.is_none() {
245            return Err(anyhow::anyhow!("inline operator must have code"));
246        }
247
248        self.delete_operator(operator_placement.clone())?;
249        self.add_inline_operator(function, operator_type, operator_placement)?;
250
251        Ok(())
252    }
253
254    pub fn delete_operator(&mut self, operator_placement: OperatorPlacement) -> Result<()> {
255        match self.get_service_mut(&operator_placement.service_id) {
256            Ok(service) => service.delete_operator(operator_placement),
257            Err(e) => Err(e),
258        }
259    }
260
261    fn get_service_mut(&mut self, service_id: &str) -> Result<&mut Operations> {
262        match self.services.iter_mut().find(|s| s.name == service_id) {
263            Some(s) => Ok(s),
264            None => Err(anyhow::anyhow!("Service with id {} not found", service_id)),
265        }
266    }
267
268    pub fn merge_dependencies(&mut self, package_configs: &[PackageDefinition]) -> Result<()> {
269        let mut all_types = self.types_map();
270        let mut all_states = BTreeMap::new();
271
272        merge_types_and_states(
273            &mut all_types,
274            &mut all_states,
275            &self.imports,
276            package_configs,
277        )?;
278
279        self.types = all_types
280            .iter()
281            .map(|(name, (ty, origin))| MetadataType {
282                name: name.clone(),
283                type_: ty.clone(),
284                origin: origin.to_owned(),
285            })
286            .collect();
287
288        Ok(())
289    }
290
291    pub fn validate(&self) -> Result<(), DataflowDefinitionValidationFailure> {
292        info!("validating dataflow");
293        let mut errors: Vec<DataflowDefinitionValidationError> = vec![];
294
295        if let Err(validate_version_errors) = self.validate_version() {
296            errors.push(DataflowDefinitionValidationError::Versioning(
297                validate_version_errors,
298            ));
299        }
300
301        if let Err(header_errors) = self.meta.validate() {
302            errors.push(DataflowDefinitionValidationError::Meta(header_errors));
303        }
304
305        let types_map = self.types_map();
306
307        for metadata_type in &self.types {
308            if let Err(type_validation_failure) = metadata_type.validate(&types_map) {
309                errors.push(DataflowDefinitionValidationError::Type(
310                    type_validation_failure,
311                ));
312            }
313        }
314
315        // TODO!: need to have a discussion on whether wit definition for topics, states, types, etc..
316        // should be maps or arrays in both dataflow and package configs. This tuple should not exist
317        // since the Topic struct has a name field as well.
318        for (topic_name, topic) in self.topics.iter() {
319            if let Err(name_errors) = validate_topic_name(topic_name) {
320                errors.push(DataflowDefinitionValidationError::Topic(
321                    TopicValidationFailure {
322                        name: topic_name.to_string(),
323                        errors: vec![TopicValidationError::Name(name_errors)],
324                    },
325                ));
326            }
327
328            if let Err(topic_validation_failure) = topic.validate(self.default_config, &types_map) {
329                errors.push(DataflowDefinitionValidationError::Topic(
330                    topic_validation_failure,
331                ));
332            }
333        }
334
335        if let Some(schedule) = &self.schedule {
336            for schedule_config in schedule {
337                if let Err(schedule_validation_failure) = schedule_config.validate() {
338                    errors.push(DataflowDefinitionValidationError::Schedule(
339                        schedule_validation_failure,
340                    ))
341                }
342            }
343        }
344
345        for service in &self.services {
346            if let Err(service_validation_failure) =
347                service.validate(&types_map, &self.topics, self.schedule.as_deref())
348            {
349                errors.push(DataflowDefinitionValidationError::Service(
350                    service_validation_failure,
351                ))
352            }
353        }
354
355        if let Err(err) = self.validate_states() {
356            errors.push(err);
357        }
358
359        if let Err(err) = self.validate_non_duplicates() {
360            for name in err {
361                errors.push(DataflowDefinitionValidationError::DuplicateOperator(name));
362            }
363        }
364
365        if self.validate_pkg_names().is_err() {
366            errors.push(DataflowDefinitionValidationError::PackageConflictName(
367                format!("{}/{}", self.meta.namespace, self.meta.name),
368            ));
369        }
370
371        if errors.is_empty() {
372            Ok(())
373        } else {
374            Err(DataflowDefinitionValidationFailure { errors })
375        }
376    }
377
378    fn validate_pkg_names(&self) -> Result<(), String> {
379        // fail if the namespace and name of the package is the same as the dataflow
380        for pkg in &self.imports {
381            if pkg.metadata.namespace == self.meta.namespace && pkg.metadata.name == self.meta.name
382            {
383                return Err(format!(
384                    "Package {} with namespace {} conflicts with dataflow name and namespace",
385                    pkg.metadata.name, pkg.metadata.namespace
386                ));
387            }
388        }
389        Ok(())
390    }
391
392    pub fn validate_version(&self) -> Result<(), DataflowDefinitionVersionError> {
393        // only valid 0.5.0 and 0.6.0
394        // schedule, only valid on 0.6.0
395        let version = ApiVersion::from(&self.api_version)
396            .map_err(|err| DataflowDefinitionVersionError::FailedToParseVersion(err.to_string()))?;
397
398        if version.is_v5() {
399            if let Some(schedule) = &self.schedule {
400                if !schedule.is_empty() {
401                    return Err(DataflowDefinitionVersionError::InvalidVersionFeature {
402                        version: self.api_version.clone(),
403                        feature: "schedule".to_string(),
404                        supported_version: "0.6.0".to_string(),
405                    });
406                } else {
407                    return Ok(());
408                }
409            } else {
410                return Ok(());
411            }
412        }
413
414        if version.is_v6() {
415            return Ok(());
416        }
417
418        Err(DataflowDefinitionVersionError::UnsupportedVersion(
419            self.api_version.clone(),
420        ))
421    }
422
423    pub fn types_map(&self) -> SdfTypesMap {
424        SdfTypesMap {
425            map: self
426                .types
427                .iter()
428                .map(|ty| (ty.name.clone(), (ty.type_.clone(), ty.origin)))
429                .chain(self.services.iter().flat_map(|service| {
430                    service.states.iter().filter_map(|state| {
431                        if let State::Typed(state) = state {
432                            Some((
433                                state.name.clone(),
434                                (
435                                    SdfType::KeyedState(state.type_.to_owned()),
436                                    SdfTypeOrigin::Local,
437                                ),
438                            ))
439                        } else {
440                            None
441                        }
442                    })
443                }))
444                .collect(),
445        }
446    }
447
448    pub fn api_version(&self) -> Result<ApiVersion> {
449        ApiVersion::from(&self.api_version)
450    }
451
452    pub fn all_owned_states(&self) -> BTreeMap<String, State> {
453        let states = self
454            .services
455            .iter()
456            .flat_map(|service| service.states.iter().cloned());
457
458        states.into_iter().fold(BTreeMap::new(), |mut acc, state| {
459            if let State::Typed(ref state_ty) = state {
460                acc.insert(state_ty.name.clone(), state);
461            }
462
463            acc
464        })
465    }
466
467    pub fn validate_states(&self) -> Result<(), DataflowDefinitionValidationError> {
468        let (typed_states, ref_states): (Vec<_>, Vec<_>) = self
469            .services
470            .iter()
471            .flat_map(|service| service.states.iter().map(|state| (&service.name, state)))
472            .partition(|(_, state)| matches!(state, State::Typed(_)));
473
474        let mut typed_states_set = HashSet::new();
475        for (service_name, state) in typed_states {
476            if let State::Typed(state_ty) = state {
477                let ref_state_name = format!("{}.{}", service_name, state_ty.name);
478                typed_states_set.insert(ref_state_name);
479            } else {
480                unreachable!("state should be typed at this point");
481            }
482        }
483
484        for (service_name, state) in ref_states {
485            if let State::Reference(ref_state) = state {
486                let ref_service = ref_state.ref_service.as_str();
487                let ref_state_name = ref_state.name.as_str();
488                let ref_state_name = format!("{}.{}", ref_service, ref_state_name);
489                if !typed_states_set.contains(&ref_state_name) {
490                    return Err(DataflowDefinitionValidationError::UndefinedState {
491                        service_name: service_name.to_owned(),
492                        ref_state_name,
493                    });
494                }
495            }
496        }
497
498        Ok(())
499    }
500    pub fn merge_package_import(&mut self, package_import: PackageImport) {
501        if let Some(existing_import) = self
502            .imports
503            .iter_mut()
504            .find(|import| import.metadata == package_import.metadata)
505        {
506            existing_import.merge(&package_import);
507        } else {
508            self.imports.push(package_import);
509        }
510    }
511
512    #[cfg(feature = "parser")]
513    pub fn update_inline_operators(&mut self) -> Result<()> {
514        for service in self.services.iter_mut() {
515            service.update_inline_operators()?;
516        }
517
518        Ok(())
519    }
520
521    fn validate_non_duplicates(&self) -> Result<(), Vec<String>> {
522        let mut duplicate_names = vec![];
523        let mut op_names = HashSet::new();
524
525        for service in &self.services {
526            for source in &service.sources {
527                for step in &source.steps {
528                    if step.is_imported(&self.imports) {
529                        continue;
530                    }
531
532                    if !op_names.insert(step.name().to_owned()) {
533                        duplicate_names.push(step.name().to_owned());
534                    }
535                }
536            }
537
538            for sink in &service.sinks {
539                for step in &sink.steps {
540                    if step.is_imported(&self.imports) {
541                        continue;
542                    }
543
544                    if !op_names.insert(step.name().to_owned()) {
545                        duplicate_names.push(step.name().to_owned());
546                    }
547                }
548            }
549
550            for (step, _) in service.operators() {
551                if step.is_imported(&self.imports) {
552                    continue;
553                }
554
555                if !op_names.insert(step.uses.to_owned()) {
556                    duplicate_names.push(step.uses.to_owned());
557                }
558            }
559        }
560
561        if duplicate_names.is_empty() {
562            Ok(())
563        } else {
564            Err(duplicate_names)
565        }
566    }
567}
568
569#[cfg(test)]
570mod test {
571
572    use sdf_common::constants::DATAFLOW_STABLE_VERSION;
573
574    use crate::{
575        metadata::{
576            dataflow::{
577                dataflow_definition::{
578                    DataflowDefinitionValidationError, DataflowDefinitionVersionError,
579                },
580                operations::{ServiceValidationError, ServiceValidationFailure},
581            },
582            io::topic::{TopicNameError, TopicValidationError, TopicValidationFailure},
583            metadata::{header::HeaderValidationError, sdf_type::SdfTypeValidationError},
584        },
585        util::{
586            operator_placement::OperatorPlacement,
587            validate::{MetadataTypeValidationError, MetadataTypeValidationFailure},
588        },
589        wit::{
590            dataflow::{
591                DataflowDefinition, IoRef, IoType, Operations, PostTransforms, ScheduleConfig,
592                Schedule, Topic, TransformOperator,
593            },
594            io::{SchemaSerDe, SerdeConverter, TopicSchema},
595            metadata::{
596                ArrowColumnKind, Header, MetadataType, NamedParameter, ObjectField, OutputType,
597                Parameter, ParameterKind, SdfArrowColumn, SdfArrowRow, SdfKeyedState,
598                SdfKeyedStateValue, SdfObject, SdfType, SdfTypeOrigin, SerdeConfig, TypeRef,
599            },
600            operator::{
601                CodeInfo, CodeLang, OperatorType, PartitionOperator, Transforms, TumblingWindow,
602                WatermarkConfig, Window, WindowKind, WindowProperties,
603            },
604            package_interface::{
605                FunctionImport, PackageDefinition, PackageImport, StateTyped, StepInvocation,
606            },
607            states::{State, StateRef},
608        },
609    };
610
611    fn first_package_definition() -> PackageDefinition {
612        PackageDefinition {
613            api_version: DATAFLOW_STABLE_VERSION.to_string(),
614            meta: Header {
615                namespace: "example".to_string(),
616                name: "bank-update".to_string(),
617                version: "0.1.0".to_string(),
618            },
619            imports: vec![PackageImport {
620                metadata: Header {
621                    namespace: "example".to_string(),
622                    name: "bank".to_string(),
623                    version: "0.1.0".to_string(),
624                },
625                types: vec!["bank-event".to_string(), "bank-account".to_string()],
626                states: vec!["account-balance".to_string()],
627                path: Some("../bank-types".to_string()),
628                functions: vec![],
629            }],
630            functions: vec![(
631                StepInvocation {
632                    uses: "filter-positive-events".to_string(),
633                    inputs: vec![NamedParameter {
634                        name: "event".to_string(),
635                        type_: TypeRef {
636                            name: "bank-event".to_string(),
637                        },
638                        optional: false,
639                        kind: ParameterKind::Value,
640                    }],
641                    ..Default::default()
642                },
643                OperatorType::Filter,
644            )],
645            dev: None,
646            states: vec![],
647            types: vec![],
648        }
649    }
650
651    fn second_package_definition() -> PackageDefinition {
652        PackageDefinition {
653            api_version: DATAFLOW_STABLE_VERSION.to_string(),
654            meta: Header {
655                namespace: "example".to_string(),
656                name: "bank".to_string(),
657                version: "0.1.0".to_string(),
658            },
659            types: vec![
660                MetadataType {
661                    name: "bank-event".to_string(),
662                    type_: SdfType::Object(SdfObject {
663                        fields: vec![
664                            ObjectField {
665                                name: "name".to_string(),
666                                type_: TypeRef {
667                                    name: "string".to_string(),
668                                },
669                                optional: false,
670                                serde_config: SerdeConfig {
671                                    serialize: None,
672                                    deserialize: None,
673                                },
674                            },
675                            ObjectField {
676                                name: "amount".to_string(),
677                                type_: TypeRef {
678                                    name: "float32".to_string(),
679                                },
680                                optional: false,
681                                serde_config: SerdeConfig {
682                                    serialize: None,
683                                    deserialize: None,
684                                },
685                            },
686                        ],
687                    }),
688                    origin: SdfTypeOrigin::Local,
689                },
690                MetadataType {
691                    name: "bank-account".to_string(),
692                    type_: SdfType::ArrowRow(SdfArrowRow {
693                        columns: vec![
694                            SdfArrowColumn {
695                                name: "balance".to_string(),
696                                type_: ArrowColumnKind::Float32,
697                            },
698                            SdfArrowColumn {
699                                name: "name".to_string(),
700                                type_: ArrowColumnKind::String,
701                            },
702                        ],
703                        ..Default::default()
704                    }),
705                    origin: SdfTypeOrigin::Local,
706                },
707            ],
708            states: vec![StateTyped {
709                name: "account-balance".to_string(),
710
711                type_: SdfKeyedState {
712                    key: TypeRef {
713                        name: "string".to_string(),
714                    },
715                    value: SdfKeyedStateValue::U32,
716                },
717            }],
718            imports: vec![],
719            functions: vec![],
720            dev: None,
721        }
722    }
723
724    fn dataflow() -> DataflowDefinition {
725        DataflowDefinition {
726            api_version: DATAFLOW_STABLE_VERSION.to_string(),
727            meta: Header {
728                name: "example".to_string(),
729                version: "0.1.0".to_string(),
730                namespace: "example".to_string(),
731            },
732            imports: vec![
733                PackageImport {
734                    metadata: Header {
735                        namespace: "example".to_string(),
736                        name: "bank-update".to_string(),
737                        version: "0.1.0".to_string(),
738                    },
739                    functions: vec![FunctionImport {
740                        name: "filter-positive-events".to_string(),
741                        alias: None,
742                    }],
743                    path: None,
744                    types: vec![],
745                    states: vec![],
746                },
747                PackageImport {
748                    metadata: Header {
749                        namespace: "example".to_string(),
750                        name: "bank".to_string(),
751                        version: "0.1.0".to_string(),
752                    },
753                    types: vec!["bank-event".to_string()],
754                    functions: vec![],
755                    states: vec![],
756                    path: None,
757                },
758            ],
759            ..Default::default()
760        }
761    }
762
763    fn dataflow_b() -> DataflowDefinition {
764        DataflowDefinition {
765            meta: Header {
766                name: "my-df".to_string(),
767                version: "0.1.0".to_string(),
768                namespace: "inf-namespace".to_string(),
769            },
770            api_version: DATAFLOW_STABLE_VERSION.to_string(),
771            services: vec![service()],
772            ..Default::default()
773        }
774    }
775
776    fn service() -> Operations {
777        let sources = vec![IoRef {
778            type_: IoType::Topic,
779            id: "listing".to_string(),
780            steps: vec![],
781        }];
782        let sinks = vec![IoRef {
783            type_: IoType::Topic,
784            id: "prospect".to_string(),
785            steps: vec![],
786        }];
787        let transforms = Transforms {
788            steps: vec![
789                TransformOperator::FilterMap(StepInvocation {
790                    uses: "listing_map_job".to_string(),
791                    ..Default::default()
792                }),
793                TransformOperator::Map(StepInvocation {
794                    uses: "job_map_prospect".to_string(),
795                    ..Default::default()
796                }),
797            ],
798        };
799
800        let post_transforms = Some(PostTransforms::AssignTimestamp(Window {
801            assign_timestamp: StepInvocation {
802                uses: "assign_timestamp".to_string(),
803                ..Default::default()
804            },
805            transforms: Transforms {
806                steps: vec![TransformOperator::Map(StepInvocation {
807                    uses: "prospect_map_prospect".to_string(),
808                    ..Default::default()
809                })],
810            },
811            partition: Some(PartitionOperator {
812                assign_key: StepInvocation {
813                    uses: "assign_key".to_string(),
814                    ..Default::default()
815                },
816                transforms: Transforms {
817                    steps: vec![TransformOperator::Map(StepInvocation {
818                        uses: "prospect_map_prospect2".to_string(),
819                        ..Default::default()
820                    })],
821                },
822                update_state: None,
823            }),
824            flush: Some(StepInvocation {
825                uses: "job_aggregate".to_string(),
826                ..Default::default()
827            }),
828            properties: WindowProperties {
829                kind: WindowKind::Tumbling(TumblingWindow {
830                    duration: 3600000,
831                    offset: 0,
832                }),
833                watermark_config: WatermarkConfig::default(),
834            },
835        }));
836
837        Operations {
838            name: "listing-to-prospect-op".to_string(),
839            sources,
840            sinks,
841            transforms,
842            post_transforms,
843            states: vec![],
844        }
845    }
846
847    #[test]
848    fn test_merge_types_and_states() {
849        let mut dataflow = dataflow();
850        let package_configs = vec![first_package_definition(), second_package_definition()];
851        assert_eq!(dataflow.types.len(), 0);
852
853        dataflow.merge_dependencies(&package_configs).unwrap();
854
855        assert_eq!(dataflow.types.first().unwrap().name, "bank-event");
856    }
857
858    #[test]
859    fn test_validate_validates_states() {
860        let mut dataflow = dataflow();
861        dataflow.services = vec![Operations {
862            name: "basic".to_string(),
863            sources: vec![],
864            sinks: vec![],
865            transforms: Transforms { steps: vec![] },
866            post_transforms: None,
867            states: vec![State::Reference(StateRef {
868                ref_service: "other".to_string(),
869                name: "account-balance".to_string(),
870            })],
871        }];
872        let res = dataflow
873            .validate()
874            .expect_err("should error for undefined state");
875
876        assert!(res
877            .errors
878            .contains(&DataflowDefinitionValidationError::UndefinedState {
879                service_name: "basic".to_string(),
880                ref_state_name: "other.account-balance".to_string()
881            }));
882    }
883
884    #[test]
885    fn test_validate_validates_metadata() {
886        let mut dataflow = dataflow();
887        dataflow.meta.name = "".to_string();
888
889        let res = dataflow
890            .validate()
891            .expect_err("should error for empty name");
892
893        assert!(res
894            .errors
895            .contains(&DataflowDefinitionValidationError::Meta(vec![
896                HeaderValidationError::new("Name cannot be empty\n")
897            ])));
898
899        assert_eq!(
900            res.to_string(),
901            r#"Dataflow Config failed validation
902
903    Header is invalid:
904        Name cannot be empty
905
906"#
907        );
908    }
909
910    #[test]
911    fn test_validate_rejects_schedule_on_v5() {
912        let mut dataflow = dataflow();
913        dataflow.api_version = "0.5.0".to_string();
914        dataflow.schedule = Some(vec![ScheduleConfig {
915            name: "weekly".to_string(),
916            schedule: Schedule::Cron("0 0 * * 0".to_string()),
917        }]);
918
919        let res = dataflow
920            .validate()
921            .expect_err("should error for schedule on v5");
922
923        assert!(res
924            .errors
925            .contains(&DataflowDefinitionValidationError::Versioning(
926                DataflowDefinitionVersionError::InvalidVersionFeature {
927                    version: "0.5.0".to_string(),
928                    feature: "schedule".to_string(),
929                    supported_version: "0.6.0".to_string()
930                }
931            )));
932    }
933    #[test]
934    fn test_validate_rejects_empty_type_names() {
935        let mut dataflow = dataflow();
936
937        dataflow.types = vec![MetadataType {
938            name: "".to_string(),
939            type_: SdfType::Object(SdfObject { fields: vec![] }),
940            origin: SdfTypeOrigin::Local,
941        }];
942
943        let res = dataflow
944            .validate()
945            .expect_err("should error for empty type name");
946
947        assert!(res
948            .errors
949            .contains(&DataflowDefinitionValidationError::Type(
950                MetadataTypeValidationFailure {
951                    name: "".to_string(),
952                    errors: vec![MetadataTypeValidationError::EmptyName]
953                }
954            )));
955
956        assert_eq!(
957            res.to_string(),
958            r#"Dataflow Config failed validation
959
960    Defined type `` is invalid:
961        Name cannot be empty
962
963"#
964        );
965    }
966
967    #[test]
968    fn test_validate_validates_types() {
969        let mut dataflow = dataflow();
970
971        dataflow.types = vec![MetadataType {
972            name: "my-type".to_string(),
973            type_: SdfType::Named(TypeRef {
974                name: "foobar".to_string(),
975            }),
976            origin: SdfTypeOrigin::Local,
977        }];
978
979        let res = dataflow
980            .validate()
981            .expect_err("should error for invalid type reference");
982
983        assert!(res
984            .errors
985            .contains(&DataflowDefinitionValidationError::Type(
986                MetadataTypeValidationFailure {
987                    name: "my-type".to_string(),
988                    errors: vec![MetadataTypeValidationError::SdfType(
989                        SdfTypeValidationError::InvalidRef("foobar".to_string())
990                    )]
991                }
992            )));
993
994        assert_eq!(
995            res.to_string(),
996            r#"Dataflow Config failed validation
997
998    Defined type `my-type` is invalid:
999        Referenced type `foobar` not found in config or imported types
1000
1001"#
1002        );
1003    }
1004
1005    #[test]
1006    fn test_validate_validates_topic_names() {
1007        let mut dataflow = dataflow();
1008
1009        dataflow.topics = vec![(
1010            "".to_string(),
1011            Topic {
1012                name: "my-topic".to_string(),
1013                schema: TopicSchema {
1014                    key: None,
1015                    value: SchemaSerDe {
1016                        converter: None,
1017                        type_: TypeRef {
1018                            name: "u8".to_string(),
1019                        },
1020                    },
1021                },
1022                consumer: None,
1023                producer: None,
1024                profile: None,
1025            },
1026        )];
1027
1028        let res = dataflow
1029            .validate()
1030            .expect_err("should error for empty topic key");
1031
1032        assert!(res
1033            .errors
1034            .contains(&DataflowDefinitionValidationError::Topic(
1035                TopicValidationFailure {
1036                    name: "".to_string(),
1037                    errors: vec![TopicValidationError::Name(vec![TopicNameError::Empty])]
1038                }
1039            )));
1040
1041        assert!(res.to_string().contains(
1042            r#"Dataflow Config failed validation
1043
1044    Topic `` is invalid:
1045        Topic name is invalid:
1046            Name cannot be empty
1047"#
1048        ));
1049    }
1050
1051    #[test]
1052    fn test_validate_validates_topics() {
1053        let mut dataflow = dataflow();
1054
1055        dataflow.topics = vec![(
1056            "my-topic".to_string(),
1057            Topic {
1058                name: "my-topic".to_string(),
1059                schema: TopicSchema {
1060                    key: None,
1061                    value: SchemaSerDe {
1062                        converter: Some(SerdeConverter::Raw),
1063                        type_: TypeRef {
1064                            name: "foobar".to_string(),
1065                        },
1066                    },
1067                },
1068                consumer: None,
1069                producer: None,
1070                profile: None,
1071            },
1072        )];
1073
1074        let res = dataflow
1075            .validate()
1076            .expect_err("should error for invalid topic value type");
1077
1078        assert!(res
1079            .errors
1080            .contains(&DataflowDefinitionValidationError::Topic(
1081                TopicValidationFailure {
1082                    name: "my-topic".to_string(),
1083                    errors: vec![TopicValidationError::InvalidValueRef("foobar".to_string())]
1084                }
1085            )));
1086
1087        assert_eq!(
1088            res.to_string(),
1089            r#"Dataflow Config failed validation
1090
1091    Topic `my-topic` is invalid:
1092        Referenced type `foobar` not found in config or imported types
1093
1094"#
1095        );
1096    }
1097
1098    #[test]
1099    fn test_validate_validates_services() {
1100        let mut dataflow = dataflow_b();
1101
1102        dataflow.services = vec![Operations {
1103            name: "".to_string(),
1104            sources: vec![],
1105            sinks: vec![],
1106            transforms: Transforms { steps: vec![] },
1107            post_transforms: None,
1108            states: vec![],
1109        }];
1110
1111        let res = dataflow
1112            .validate()
1113            .expect_err("should error for missing service name");
1114
1115        assert!(res
1116            .errors
1117            .contains(&DataflowDefinitionValidationError::Service(
1118                ServiceValidationFailure {
1119                    name: "".to_string(),
1120                    errors: vec![
1121                        ServiceValidationError::NameEmpty,
1122                        ServiceValidationError::NoSources
1123                    ]
1124                }
1125            )));
1126
1127        assert_eq!(
1128            res.to_string(),
1129            r#"Dataflow Config failed validation
1130
1131    Service `` is invalid:
1132        Service name cannot be empty
1133        Service must have at least one source
1134
1135"#
1136        );
1137    }
1138
1139    #[test]
1140    fn test_validate_passes_valid_config() {
1141        let dataflow = dataflow();
1142
1143        assert!(dataflow.validate().is_ok());
1144    }
1145
1146    #[test]
1147    fn test_has_custom_types() {
1148        let mut dataflow = dataflow();
1149        assert!(!dataflow.has_custom_types());
1150
1151        dataflow.types = vec![MetadataType {
1152            name: "my-type".to_string(),
1153            type_: SdfType::Object(SdfObject { fields: vec![] }),
1154            origin: SdfTypeOrigin::Local,
1155        }];
1156
1157        assert!(dataflow.has_custom_types());
1158    }
1159
1160    #[test]
1161    fn test_has_custom_types_with_states() {
1162        let mut dataflow = dataflow();
1163        assert!(!dataflow.has_custom_types());
1164
1165        dataflow.services = vec![Operations {
1166            name: "my-service".to_string(),
1167            sources: vec![],
1168            sinks: vec![],
1169            transforms: Transforms { steps: vec![] },
1170            post_transforms: None,
1171            states: vec![State::Typed(StateTyped {
1172                name: "my-state".to_string(),
1173
1174                type_: SdfKeyedState {
1175                    key: TypeRef {
1176                        name: "string".to_string(),
1177                    },
1178                    value: SdfKeyedStateValue::U32,
1179                },
1180            })],
1181        }];
1182
1183        assert!(dataflow.has_custom_types());
1184    }
1185
1186    #[test]
1187    fn test_validate_rejects_dataflows_with_operator_name_duplicated() {
1188        let mut dataflow = dataflow();
1189
1190        dataflow.topics.push((
1191            "listing".to_string(),
1192            Topic {
1193                name: "listing".to_string(),
1194                schema: TopicSchema {
1195                    key: None,
1196                    value: SchemaSerDe {
1197                        converter: Some(crate::wit::io::SerdeConverter::Json),
1198                        type_: TypeRef {
1199                            name: "string".to_string(),
1200                        },
1201                    },
1202                },
1203                consumer: None,
1204                producer: None,
1205                profile: None,
1206            },
1207        ));
1208
1209        dataflow.validate().expect("should validate first");
1210
1211        dataflow.services.push(Operations {
1212            name: "my-op".to_string(),
1213            sources: vec![IoRef {
1214                type_: IoType::Topic,
1215                id: "listing".to_string(),
1216                steps: vec![],
1217            }],
1218            sinks: vec![],
1219            transforms: Transforms {
1220                steps: vec![
1221                    TransformOperator::Filter(StepInvocation {
1222                        uses: "duplicated-fn".to_string(),
1223                        inputs: vec![NamedParameter {
1224                            name: "cat".to_string(),
1225                            type_: TypeRef {
1226                                name: "string".to_string(),
1227                            },
1228                            optional: false,
1229                            kind: ParameterKind::Value,
1230                        }],
1231                        output: Some(Parameter {
1232                            type_: OutputType::Ref(TypeRef {
1233                                name: "bool".to_string(),
1234                            }),
1235                            ..Default::default()
1236                        }),
1237                        ..Default::default()
1238                    }),
1239                    TransformOperator::Filter(StepInvocation {
1240                        uses: "duplicated-fn".to_string(),
1241                        inputs: vec![NamedParameter {
1242                            name: "cat".to_string(),
1243                            type_: TypeRef {
1244                                name: "string".to_string(),
1245                            },
1246                            optional: false,
1247                            kind: ParameterKind::Value,
1248                        }],
1249                        output: Some(Parameter {
1250                            type_: OutputType::Ref(TypeRef {
1251                                name: "bool".to_string(),
1252                            }),
1253                            ..Default::default()
1254                        }),
1255                        ..Default::default()
1256                    }),
1257                ],
1258            },
1259            ..Default::default()
1260        });
1261
1262        let res = dataflow
1263            .validate()
1264            .expect_err("should error for duplicated operator name");
1265
1266        assert!(res
1267            .errors
1268            .contains(&DataflowDefinitionValidationError::DuplicateOperator(
1269                "duplicated-fn".to_string()
1270            )));
1271
1272        assert_eq!(
1273            res.to_string(),
1274            r#"Dataflow Config failed validation
1275
1276    Duplicate inline operator with name: duplicated-fn was found, inline operators must have unique names
1277
1278"#
1279        );
1280    }
1281    #[test]
1282    fn test_add_imported_operator() {
1283        let mut dataflow = dataflow_b();
1284
1285        let operator_placement = OperatorPlacement {
1286            service_id: "listing-to-prospect-op".to_string(),
1287            transforms_index: Some(2),
1288            ..Default::default()
1289        };
1290
1291        let function = StepInvocation {
1292            uses: "cat_map_cat".to_string(),
1293            inputs: vec![NamedParameter {
1294                name: "cat".to_string(),
1295                type_: TypeRef {
1296                    name: "string".to_string(),
1297                },
1298                optional: false,
1299                kind: ParameterKind::Value,
1300            }],
1301            output: Some(Parameter {
1302                type_: OutputType::Ref(TypeRef {
1303                    name: "string".to_string(),
1304                }),
1305                ..Default::default()
1306            }),
1307            ..Default::default()
1308        };
1309
1310        dataflow
1311            .add_imported_operator(
1312                function.clone(),
1313                OperatorType::Map,
1314                operator_placement,
1315                package_import(),
1316            )
1317            .expect("Failed to add imported operator");
1318
1319        let result_operator = dataflow.services.first().unwrap().transforms.steps[2].clone();
1320
1321        assert_eq!(result_operator, TransformOperator::Map(function));
1322        assert_eq!(
1323            *dataflow
1324                .imports
1325                .first()
1326                .expect("Should be a package import"),
1327            package_import()
1328        );
1329    }
1330
1331    #[test]
1332    fn test_add_imported_operator_with_index_out_of_bounds() {
1333        let mut dataflow = dataflow_b();
1334
1335        let operator_placement = OperatorPlacement {
1336            service_id: "listing-to-prospect-op".to_string(),
1337            transforms_index: Some(3),
1338            ..Default::default()
1339        };
1340
1341        let function = StepInvocation {
1342            uses: "cat_map_cat".to_string(),
1343            ..Default::default()
1344        };
1345
1346        let res = dataflow.add_imported_operator(
1347            function,
1348            OperatorType::Map,
1349            operator_placement,
1350            package_import(),
1351        );
1352
1353        assert!(res.is_err());
1354        assert!(res.unwrap_err().to_string().contains(
1355            "cannot insert operator into transforms block, index is out of bounds, len = 2"
1356        ))
1357    }
1358
1359    #[test]
1360    fn test_add_imported_operator_merges_package_import() {
1361        let mut dataflow = dataflow_b();
1362
1363        let operator_placement = OperatorPlacement {
1364            service_id: "listing-to-prospect-op".to_string(),
1365            transforms_index: Some(2),
1366            ..Default::default()
1367        };
1368
1369        // import an operator
1370        let function = StepInvocation {
1371            uses: "cat_map_cat".to_string(),
1372            ..Default::default()
1373        };
1374
1375        dataflow
1376            .add_imported_operator(
1377                function,
1378                OperatorType::Map,
1379                operator_placement,
1380                package_import(),
1381            )
1382            .expect("Failed to add imported operator");
1383
1384        // import another operator from the same package
1385        let operator_placement = OperatorPlacement {
1386            service_id: "listing-to-prospect-op".to_string(),
1387            transforms_index: Some(3),
1388            ..Default::default()
1389        };
1390
1391        let second_function = StepInvocation {
1392            uses: "cat_map_dog".to_string(),
1393            ..Default::default()
1394        };
1395
1396        dataflow
1397            .add_imported_operator(
1398                second_function,
1399                OperatorType::Map,
1400                operator_placement,
1401                package_import_b(),
1402            )
1403            .expect("Failed to add imported operator");
1404
1405        assert_eq!(dataflow.imports.len(), 1);
1406        assert_eq!(
1407            *dataflow
1408                .imports
1409                .first()
1410                .expect("Should be a package import"),
1411            package_import_merged()
1412        );
1413    }
1414
1415    #[test]
1416    fn test_add_inline_operator() {
1417        let mut dataflow = dataflow_b();
1418
1419        let operator_placement = OperatorPlacement {
1420            service_id: "listing-to-prospect-op".to_string(),
1421            transforms_index: Some(2),
1422            ..Default::default()
1423        };
1424
1425        let function = StepInvocation {
1426            uses: "cat_map_cat".to_string(),
1427            code_info: CodeInfo {
1428                lang: CodeLang::Rust,
1429                code: Some("fn map_cat(cat: Cat) -> Cat { cat }".to_string()),
1430                ..Default::default()
1431            },
1432            ..Default::default()
1433        };
1434
1435        dataflow
1436            .add_inline_operator(function, OperatorType::Map, operator_placement)
1437            .expect("Failed to add imported operator");
1438
1439        let result_operator = dataflow.services.first().unwrap().transforms.steps[2].clone();
1440
1441        assert_eq!(
1442            result_operator,
1443            TransformOperator::Map(StepInvocation {
1444                uses: "cat_map_cat".to_string(),
1445                code_info: CodeInfo {
1446                    lang: CodeLang::Rust,
1447                    code: Some("fn map_cat(cat: Cat) -> Cat { cat }".to_string()),
1448                    ..Default::default()
1449                },
1450                ..Default::default()
1451            })
1452        );
1453    }
1454
1455    #[test]
1456    fn test_add_inline_operator_has_no_code() {
1457        let mut dataflow = dataflow();
1458
1459        let function = StepInvocation {
1460            uses: "cat_map_cat".to_string(),
1461            code_info: CodeInfo {
1462                lang: CodeLang::Rust,
1463                code: None,
1464                ..Default::default()
1465            },
1466            ..Default::default()
1467        };
1468
1469        let res =
1470            dataflow.add_inline_operator(function, OperatorType::Map, OperatorPlacement::default());
1471
1472        assert!(res.is_err());
1473        assert!(res
1474            .unwrap_err()
1475            .to_string()
1476            .contains("inline operator must have code"))
1477    }
1478
1479    #[test]
1480    fn test_replace_inline_operator() {
1481        let mut dataflow = dataflow_b();
1482
1483        let operator_placement = OperatorPlacement {
1484            service_id: "listing-to-prospect-op".to_string(),
1485            transforms_index: Some(2),
1486            ..Default::default()
1487        };
1488
1489        let function = StepInvocation {
1490            uses: "cat_map_cat".to_string(),
1491            code_info: CodeInfo {
1492                lang: CodeLang::Rust,
1493                code: Some("fn map_cat(cat: Cat) -> Cat { cat }".to_string()),
1494                ..Default::default()
1495            },
1496            ..Default::default()
1497        };
1498
1499        let new_function = StepInvocation {
1500            uses: "cat_map_cat".to_string(),
1501            code_info: CodeInfo {
1502                lang: CodeLang::Rust,
1503                code: Some("fn map_dog(dog: Dog) -> Dog { dog }".to_string()),
1504                ..Default::default()
1505            },
1506            ..Default::default()
1507        };
1508
1509        dataflow
1510            .add_inline_operator(function, OperatorType::Map, operator_placement.clone())
1511            .expect("Failed to add imported operator");
1512
1513        dataflow
1514            .replace_inline_operator(new_function, OperatorType::FilterMap, operator_placement)
1515            .expect("Failed to replace inline operator");
1516
1517        let result_operator = dataflow.services.first().unwrap().transforms.steps[2].clone();
1518
1519        assert_eq!(
1520            result_operator,
1521            TransformOperator::FilterMap(StepInvocation {
1522                uses: "cat_map_cat".to_string(),
1523                code_info: CodeInfo {
1524                    lang: CodeLang::Rust,
1525                    code: Some("fn map_dog(dog: Dog) -> Dog { dog }".to_string()),
1526                    ..Default::default()
1527                },
1528                ..Default::default()
1529            })
1530        );
1531    }
1532
1533    #[test]
1534    fn test_replace_inline_operator_has_no_code() {
1535        let mut dataflow = dataflow();
1536
1537        let function = StepInvocation {
1538            uses: "cat_map_cat".to_string(),
1539            code_info: CodeInfo {
1540                lang: CodeLang::Rust,
1541                code: None,
1542                ..Default::default()
1543            },
1544            ..Default::default()
1545        };
1546
1547        let res = dataflow.replace_inline_operator(
1548            function,
1549            OperatorType::Map,
1550            OperatorPlacement::default(),
1551        );
1552
1553        assert!(res.is_err());
1554        assert!(res
1555            .unwrap_err()
1556            .to_string()
1557            .contains("inline operator must have code"))
1558    }
1559
1560    #[test]
1561    fn test_delete_operator() {
1562        let mut dataflow = dataflow_b();
1563
1564        let operator_placement = OperatorPlacement {
1565            service_id: "listing-to-prospect-op".to_string(),
1566            transforms_index: Some(1),
1567            ..Default::default()
1568        };
1569
1570        let _ = dataflow.delete_operator(operator_placement);
1571
1572        let steps = &dataflow.services.first().unwrap().transforms.steps;
1573        let remaining_op = steps.first().unwrap();
1574        let function = match remaining_op {
1575            TransformOperator::FilterMap(f) => f,
1576            _ => panic!("Expected FilterMap operator"),
1577        };
1578
1579        assert_eq!(steps.len(), 1);
1580        assert_eq!(function.uses, "listing_map_job");
1581    }
1582
1583    #[test]
1584    fn test_delete_operator_fails_when_service_does_not_exist() {
1585        let mut dataflow = dataflow_b();
1586
1587        let operator_placement = OperatorPlacement {
1588            service_id: "my-missing-service".to_string(),
1589            transforms_index: Some(1),
1590            ..Default::default()
1591        };
1592
1593        let res = dataflow.delete_operator(operator_placement);
1594
1595        assert_eq!(
1596            res.unwrap_err().to_string(),
1597            "Service with id my-missing-service not found"
1598        );
1599    }
1600
1601    #[test]
1602    fn test_merge_package_import_merges_repeat_imports() {
1603        let mut dataflow = dataflow_b();
1604
1605        dataflow.merge_package_import(package_import());
1606        dataflow.merge_package_import(package_import_b());
1607
1608        assert_eq!(
1609            *dataflow
1610                .imports
1611                .first()
1612                .expect("Should be a package import"),
1613            package_import_merged()
1614        );
1615    }
1616
1617    #[test]
1618    fn test_merge_package_import_appends_new_imports() {
1619        let mut dataflow = dataflow_b();
1620
1621        let next_version_import = PackageImport {
1622            metadata: Header {
1623                name: "cats_package".to_string(),
1624                version: "0.1.1".to_string(),
1625                namespace: "inf-namespace".to_string(),
1626            },
1627            path: None,
1628            types: vec!["cat".to_string(), "dog".to_string()],
1629            states: vec![],
1630            functions: vec![FunctionImport {
1631                name: "cat_map_dog".to_string(),
1632                alias: None,
1633            }],
1634        };
1635
1636        dataflow.merge_package_import(package_import());
1637        dataflow.merge_package_import(next_version_import.clone());
1638
1639        assert_eq!(dataflow.imports.len(), 2);
1640        assert_eq!(
1641            *dataflow
1642                .imports
1643                .first()
1644                .expect("Should be a package import"),
1645            package_import()
1646        );
1647        assert_eq!(dataflow.imports[1], next_version_import);
1648    }
1649
1650    #[test]
1651    fn test_pkg_df_name_conflict() {
1652        let mut dataflow = dataflow();
1653
1654        // conflict with the dataflow name
1655        dataflow.imports.push(PackageImport {
1656            metadata: Header {
1657                name: "example".to_string(),
1658                version: "0.1.0".to_string(),
1659                namespace: "example".to_string(),
1660            },
1661            path: None,
1662            types: vec![],
1663            states: vec![],
1664            functions: vec![],
1665        });
1666
1667        let res = dataflow.validate();
1668        assert!(res.is_err());
1669        let err = res.unwrap_err();
1670        assert!(err
1671            .to_string()
1672            .contains("Package example/example conflicts with dataflow name and namespace"),)
1673    }
1674
1675    fn package_import() -> PackageImport {
1676        PackageImport {
1677            metadata: Header {
1678                name: "cats_package".to_string(),
1679                version: "0.1.0".to_string(),
1680                namespace: "inf-namespace".to_string(),
1681            },
1682            path: None,
1683            types: vec!["cat".to_string()],
1684            states: vec![],
1685            functions: vec![FunctionImport {
1686                name: "cat_map_cat".to_string(),
1687                alias: None,
1688            }],
1689        }
1690    }
1691
1692    fn package_import_b() -> PackageImport {
1693        PackageImport {
1694            metadata: Header {
1695                name: "cats_package".to_string(),
1696                version: "0.1.0".to_string(),
1697                namespace: "inf-namespace".to_string(),
1698            },
1699            path: None,
1700            types: vec!["cat".to_string(), "dog".to_string()],
1701            states: vec![],
1702            functions: vec![FunctionImport {
1703                name: "cat_map_dog".to_string(),
1704                alias: None,
1705            }],
1706        }
1707    }
1708
1709    fn package_import_merged() -> PackageImport {
1710        PackageImport {
1711            metadata: Header {
1712                name: "cats_package".to_string(),
1713                version: "0.1.0".to_string(),
1714                namespace: "inf-namespace".to_string(),
1715            },
1716            path: None,
1717            types: vec!["cat".to_string(), "dog".to_string()],
1718            states: vec![],
1719            functions: vec![
1720                FunctionImport {
1721                    name: "cat_map_cat".to_string(),
1722                    alias: None,
1723                },
1724                FunctionImport {
1725                    name: "cat_map_dog".to_string(),
1726                    alias: None,
1727                },
1728            ],
1729        }
1730    }
1731}