1use std::{
2 collections::{BTreeMap, HashSet},
3 fmt::Display,
4};
5
6use anyhow::Result;
7use tracing::info;
8
9use sdf_common::{
10 constants::DATAFLOW_STABLE_VERSION,
11 version::{ApiVersion, SdfContextVersion},
12};
13
14use crate::{
15 importer::resolver::DependencyResolver,
16 metadata::{
17 io::topic::{validate_topic_name, TopicValidationError, TopicValidationFailure},
18 metadata::header::HeaderValidationError,
19 },
20 util::{
21 merge::merge_types_and_states,
22 config_error::{ConfigError, INDENT},
23 operator_placement::OperatorPlacement,
24 sdf_types_map::SdfTypesMap,
25 validate::MetadataTypeValidationFailure,
26 },
27 wit::{
28 dataflow::{DataflowDefinition, Header, Operations, PackageDefinition, State},
29 metadata::{MetadataType, SdfType, SdfTypeOrigin},
30 operator::OperatorType,
31 package_interface::{PackageImport, StepInvocation},
32 },
33};
34
35use super::{operations::ServiceValidationFailure, schedule_config::ScheduleValidationFailure};
36
37#[derive(Debug, Clone, Eq, PartialEq)]
38pub struct DataflowDefinitionValidationFailure {
39 pub errors: Vec<DataflowDefinitionValidationError>,
40}
41
42impl Display for DataflowDefinitionValidationFailure {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 writeln!(f, "Dataflow Config failed validation\n")?;
45
46 for error in &self.errors {
47 writeln!(f, "{}", error.readable(1))?;
48 }
49
50 Ok(())
51 }
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
55pub enum DataflowDefinitionValidationError {
56 Meta(Vec<HeaderValidationError>),
57 Type(MetadataTypeValidationFailure),
58 Topic(TopicValidationFailure),
59 Service(ServiceValidationFailure),
60 DuplicateOperator(String),
61 Schedule(ScheduleValidationFailure),
62 UndefinedState {
63 service_name: String,
64 ref_state_name: String,
65 },
66 Versioning(DataflowDefinitionVersionError),
67 PackageConflictName(String),
68}
69
70#[derive(Debug, Clone, Eq, PartialEq)]
71pub enum DataflowDefinitionVersionError {
72 UnsupportedVersion(String),
73 InvalidVersionFeature {
74 version: String,
75 feature: String,
76 supported_version: String,
77 },
78 FailedToParseVersion(String),
79}
80
81impl ConfigError for DataflowDefinitionVersionError {
82 fn readable(&self, indents: usize) -> String {
83 let indent = INDENT.repeat(indents);
84
85 match self {
86 Self::UnsupportedVersion(version) => {
87 format!("{}Unsupported version: {}\n", indent, version)
88 }
89 Self::InvalidVersionFeature {
90 version,
91 feature,
92 supported_version,
93 } => {
94 format!(
95 "{}Version {} does not support configuration: {}, supported version: {}\n",
96 indent, version, feature, supported_version
97 )
98 }
99 Self::FailedToParseVersion(err) => {
100 format!("{}Failed to parse version: {}\n", indent, err)
101 }
102 }
103 }
104}
105
106impl ConfigError for DataflowDefinitionValidationError {
107 fn readable(&self, indents: usize) -> String {
108 let indent = INDENT.repeat(indents);
109
110 match self {
111 Self::Meta(errors) => {
112 let mut res = format!("{}Header is invalid:\n", indent);
113
114 for error in errors {
115 res.push_str(&error.readable(indents + 1));
116 }
117
118 res
119 }
120 Self::Type(failure) => failure.readable(indents),
121 Self::Topic(failure) => failure.readable(indents),
122 Self::Service(failure) => failure.readable(indents),
123 Self::Schedule(failure) => failure.readable(indents),
124 Self::DuplicateOperator(name) => {
125 format!(
126 "{}Duplicate inline operator with name: {} was found, inline operators must have unique names\n",
127 indent, name
128 )
129 }
130 Self::PackageConflictName(name) => {
131 format!(
132 "{}Package {} conflicts with dataflow name and namespace\n",
133 indent, name
134 )
135 }
136 Self::UndefinedState {
137 service_name,
138 ref_state_name,
139 } => {
140 format!(
141 "{}State with name {} is referenced in service {} but not defined in the dataflow\n",
142 indent, ref_state_name, service_name
143 )
144 }
145 Self::Versioning(err) => err.readable(indents),
146 }
147 }
148}
149
150#[allow(clippy::derivable_impls)]
151impl Default for DataflowDefinition {
152 fn default() -> Self {
153 Self {
154 api_version: DATAFLOW_STABLE_VERSION.to_string(),
155 meta: Default::default(),
156 imports: Default::default(),
157 types: Default::default(),
158 services: Default::default(),
159 topics: Default::default(),
160 dev: Default::default(),
161 packages: Default::default(),
162 schedule: Default::default(),
163 default_config: Default::default(),
164 }
165 }
166}
167
168#[allow(clippy::derivable_impls)]
169impl Default for Header {
170 fn default() -> Self {
171 Self {
172 name: Default::default(),
173 version: Default::default(),
174 namespace: Default::default(),
175 }
176 }
177}
178
179impl DataflowDefinition {
180 pub fn name(&self) -> String {
182 self.meta.to_string()
183 }
184
185 pub fn has_custom_types(&self) -> bool {
186 !self.types.is_empty() || self.services.iter().any(|s| !s.states.is_empty())
187 }
188
189 pub fn resolve_imports(&mut self, debug: bool) -> Result<()> {
190 let dependency_resolver =
191 DependencyResolver::build(self.imports.clone(), self.packages.clone(), debug)?;
192 let package_configs = dependency_resolver.packages()?;
193
194 self.merge_dependencies(&package_configs)?;
195
196 for service in self.services.iter_mut() {
197 service.import_operator_configs(&self.imports, &package_configs)?;
198 }
199
200 self.packages = package_configs;
201
202 Ok(())
203 }
204
205 pub fn add_imported_operator(
206 &mut self,
207 function: StepInvocation,
208 operator_type: OperatorType,
209 operator_placement: OperatorPlacement,
210 package_import: PackageImport,
211 ) -> Result<()> {
212 self.merge_package_import(package_import);
213
214 let service = self.get_service_mut(&operator_placement.service_id)?;
215
216 service.add_operator(operator_type, operator_placement, function)?;
217
218 Ok(())
219 }
220
221 pub fn add_inline_operator(
222 &mut self,
223 function: StepInvocation,
224 operator_type: OperatorType,
225 operator_placement: OperatorPlacement,
226 ) -> Result<()> {
227 if function.code_info.code.is_none() {
228 return Err(anyhow::anyhow!("inline operator must have code"));
229 }
230
231 let service = self.get_service_mut(&operator_placement.service_id)?;
232
233 service.add_operator(operator_type, operator_placement, function)?;
234
235 Ok(())
236 }
237
238 pub fn replace_inline_operator(
239 &mut self,
240 function: StepInvocation,
241 operator_type: OperatorType,
242 operator_placement: OperatorPlacement,
243 ) -> Result<()> {
244 if function.code_info.code.is_none() {
245 return Err(anyhow::anyhow!("inline operator must have code"));
246 }
247
248 self.delete_operator(operator_placement.clone())?;
249 self.add_inline_operator(function, operator_type, operator_placement)?;
250
251 Ok(())
252 }
253
254 pub fn delete_operator(&mut self, operator_placement: OperatorPlacement) -> Result<()> {
255 match self.get_service_mut(&operator_placement.service_id) {
256 Ok(service) => service.delete_operator(operator_placement),
257 Err(e) => Err(e),
258 }
259 }
260
261 fn get_service_mut(&mut self, service_id: &str) -> Result<&mut Operations> {
262 match self.services.iter_mut().find(|s| s.name == service_id) {
263 Some(s) => Ok(s),
264 None => Err(anyhow::anyhow!("Service with id {} not found", service_id)),
265 }
266 }
267
268 pub fn merge_dependencies(&mut self, package_configs: &[PackageDefinition]) -> Result<()> {
269 let mut all_types = self.types_map();
270 let mut all_states = BTreeMap::new();
271
272 merge_types_and_states(
273 &mut all_types,
274 &mut all_states,
275 &self.imports,
276 package_configs,
277 )?;
278
279 self.types = all_types
280 .iter()
281 .map(|(name, (ty, origin))| MetadataType {
282 name: name.clone(),
283 type_: ty.clone(),
284 origin: origin.to_owned(),
285 })
286 .collect();
287
288 Ok(())
289 }
290
291 pub fn validate(&self) -> Result<(), DataflowDefinitionValidationFailure> {
292 info!("validating dataflow");
293 let mut errors: Vec<DataflowDefinitionValidationError> = vec![];
294
295 if let Err(validate_version_errors) = self.validate_version() {
296 errors.push(DataflowDefinitionValidationError::Versioning(
297 validate_version_errors,
298 ));
299 }
300
301 if let Err(header_errors) = self.meta.validate() {
302 errors.push(DataflowDefinitionValidationError::Meta(header_errors));
303 }
304
305 let types_map = self.types_map();
306
307 for metadata_type in &self.types {
308 if let Err(type_validation_failure) = metadata_type.validate(&types_map) {
309 errors.push(DataflowDefinitionValidationError::Type(
310 type_validation_failure,
311 ));
312 }
313 }
314
315 for (topic_name, topic) in self.topics.iter() {
319 if let Err(name_errors) = validate_topic_name(topic_name) {
320 errors.push(DataflowDefinitionValidationError::Topic(
321 TopicValidationFailure {
322 name: topic_name.to_string(),
323 errors: vec![TopicValidationError::Name(name_errors)],
324 },
325 ));
326 }
327
328 if let Err(topic_validation_failure) = topic.validate(self.default_config, &types_map) {
329 errors.push(DataflowDefinitionValidationError::Topic(
330 topic_validation_failure,
331 ));
332 }
333 }
334
335 if let Some(schedule) = &self.schedule {
336 for schedule_config in schedule {
337 if let Err(schedule_validation_failure) = schedule_config.validate() {
338 errors.push(DataflowDefinitionValidationError::Schedule(
339 schedule_validation_failure,
340 ))
341 }
342 }
343 }
344
345 for service in &self.services {
346 if let Err(service_validation_failure) =
347 service.validate(&types_map, &self.topics, self.schedule.as_deref())
348 {
349 errors.push(DataflowDefinitionValidationError::Service(
350 service_validation_failure,
351 ))
352 }
353 }
354
355 if let Err(err) = self.validate_states() {
356 errors.push(err);
357 }
358
359 if let Err(err) = self.validate_non_duplicates() {
360 for name in err {
361 errors.push(DataflowDefinitionValidationError::DuplicateOperator(name));
362 }
363 }
364
365 if self.validate_pkg_names().is_err() {
366 errors.push(DataflowDefinitionValidationError::PackageConflictName(
367 format!("{}/{}", self.meta.namespace, self.meta.name),
368 ));
369 }
370
371 if errors.is_empty() {
372 Ok(())
373 } else {
374 Err(DataflowDefinitionValidationFailure { errors })
375 }
376 }
377
378 fn validate_pkg_names(&self) -> Result<(), String> {
379 for pkg in &self.imports {
381 if pkg.metadata.namespace == self.meta.namespace && pkg.metadata.name == self.meta.name
382 {
383 return Err(format!(
384 "Package {} with namespace {} conflicts with dataflow name and namespace",
385 pkg.metadata.name, pkg.metadata.namespace
386 ));
387 }
388 }
389 Ok(())
390 }
391
392 pub fn validate_version(&self) -> Result<(), DataflowDefinitionVersionError> {
393 let version = ApiVersion::from(&self.api_version)
396 .map_err(|err| DataflowDefinitionVersionError::FailedToParseVersion(err.to_string()))?;
397
398 if version.is_v5() {
399 if let Some(schedule) = &self.schedule {
400 if !schedule.is_empty() {
401 return Err(DataflowDefinitionVersionError::InvalidVersionFeature {
402 version: self.api_version.clone(),
403 feature: "schedule".to_string(),
404 supported_version: "0.6.0".to_string(),
405 });
406 } else {
407 return Ok(());
408 }
409 } else {
410 return Ok(());
411 }
412 }
413
414 if version.is_v6() {
415 return Ok(());
416 }
417
418 Err(DataflowDefinitionVersionError::UnsupportedVersion(
419 self.api_version.clone(),
420 ))
421 }
422
423 pub fn types_map(&self) -> SdfTypesMap {
424 SdfTypesMap {
425 map: self
426 .types
427 .iter()
428 .map(|ty| (ty.name.clone(), (ty.type_.clone(), ty.origin)))
429 .chain(self.services.iter().flat_map(|service| {
430 service.states.iter().filter_map(|state| {
431 if let State::Typed(state) = state {
432 Some((
433 state.name.clone(),
434 (
435 SdfType::KeyedState(state.type_.to_owned()),
436 SdfTypeOrigin::Local,
437 ),
438 ))
439 } else {
440 None
441 }
442 })
443 }))
444 .collect(),
445 }
446 }
447
448 pub fn api_version(&self) -> Result<ApiVersion> {
449 ApiVersion::from(&self.api_version)
450 }
451
452 pub fn all_owned_states(&self) -> BTreeMap<String, State> {
453 let states = self
454 .services
455 .iter()
456 .flat_map(|service| service.states.iter().cloned());
457
458 states.into_iter().fold(BTreeMap::new(), |mut acc, state| {
459 if let State::Typed(ref state_ty) = state {
460 acc.insert(state_ty.name.clone(), state);
461 }
462
463 acc
464 })
465 }
466
467 pub fn validate_states(&self) -> Result<(), DataflowDefinitionValidationError> {
468 let (typed_states, ref_states): (Vec<_>, Vec<_>) = self
469 .services
470 .iter()
471 .flat_map(|service| service.states.iter().map(|state| (&service.name, state)))
472 .partition(|(_, state)| matches!(state, State::Typed(_)));
473
474 let mut typed_states_set = HashSet::new();
475 for (service_name, state) in typed_states {
476 if let State::Typed(state_ty) = state {
477 let ref_state_name = format!("{}.{}", service_name, state_ty.name);
478 typed_states_set.insert(ref_state_name);
479 } else {
480 unreachable!("state should be typed at this point");
481 }
482 }
483
484 for (service_name, state) in ref_states {
485 if let State::Reference(ref_state) = state {
486 let ref_service = ref_state.ref_service.as_str();
487 let ref_state_name = ref_state.name.as_str();
488 let ref_state_name = format!("{}.{}", ref_service, ref_state_name);
489 if !typed_states_set.contains(&ref_state_name) {
490 return Err(DataflowDefinitionValidationError::UndefinedState {
491 service_name: service_name.to_owned(),
492 ref_state_name,
493 });
494 }
495 }
496 }
497
498 Ok(())
499 }
500 pub fn merge_package_import(&mut self, package_import: PackageImport) {
501 if let Some(existing_import) = self
502 .imports
503 .iter_mut()
504 .find(|import| import.metadata == package_import.metadata)
505 {
506 existing_import.merge(&package_import);
507 } else {
508 self.imports.push(package_import);
509 }
510 }
511
512 #[cfg(feature = "parser")]
513 pub fn update_inline_operators(&mut self) -> Result<()> {
514 for service in self.services.iter_mut() {
515 service.update_inline_operators()?;
516 }
517
518 Ok(())
519 }
520
521 fn validate_non_duplicates(&self) -> Result<(), Vec<String>> {
522 let mut duplicate_names = vec![];
523 let mut op_names = HashSet::new();
524
525 for service in &self.services {
526 for source in &service.sources {
527 for step in &source.steps {
528 if step.is_imported(&self.imports) {
529 continue;
530 }
531
532 if !op_names.insert(step.name().to_owned()) {
533 duplicate_names.push(step.name().to_owned());
534 }
535 }
536 }
537
538 for sink in &service.sinks {
539 for step in &sink.steps {
540 if step.is_imported(&self.imports) {
541 continue;
542 }
543
544 if !op_names.insert(step.name().to_owned()) {
545 duplicate_names.push(step.name().to_owned());
546 }
547 }
548 }
549
550 for (step, _) in service.operators() {
551 if step.is_imported(&self.imports) {
552 continue;
553 }
554
555 if !op_names.insert(step.uses.to_owned()) {
556 duplicate_names.push(step.uses.to_owned());
557 }
558 }
559 }
560
561 if duplicate_names.is_empty() {
562 Ok(())
563 } else {
564 Err(duplicate_names)
565 }
566 }
567}
568
569#[cfg(test)]
570mod test {
571
572 use sdf_common::constants::DATAFLOW_STABLE_VERSION;
573
574 use crate::{
575 metadata::{
576 dataflow::{
577 dataflow_definition::{
578 DataflowDefinitionValidationError, DataflowDefinitionVersionError,
579 },
580 operations::{ServiceValidationError, ServiceValidationFailure},
581 },
582 io::topic::{TopicNameError, TopicValidationError, TopicValidationFailure},
583 metadata::{header::HeaderValidationError, sdf_type::SdfTypeValidationError},
584 },
585 util::{
586 operator_placement::OperatorPlacement,
587 validate::{MetadataTypeValidationError, MetadataTypeValidationFailure},
588 },
589 wit::{
590 dataflow::{
591 DataflowDefinition, IoRef, IoType, Operations, PostTransforms, ScheduleConfig,
592 Schedule, Topic, TransformOperator,
593 },
594 io::{SchemaSerDe, SerdeConverter, TopicSchema},
595 metadata::{
596 ArrowColumnKind, Header, MetadataType, NamedParameter, ObjectField, OutputType,
597 Parameter, ParameterKind, SdfArrowColumn, SdfArrowRow, SdfKeyedState,
598 SdfKeyedStateValue, SdfObject, SdfType, SdfTypeOrigin, SerdeConfig, TypeRef,
599 },
600 operator::{
601 CodeInfo, CodeLang, OperatorType, PartitionOperator, Transforms, TumblingWindow,
602 WatermarkConfig, Window, WindowKind, WindowProperties,
603 },
604 package_interface::{
605 FunctionImport, PackageDefinition, PackageImport, StateTyped, StepInvocation,
606 },
607 states::{State, StateRef},
608 },
609 };
610
611 fn first_package_definition() -> PackageDefinition {
612 PackageDefinition {
613 api_version: DATAFLOW_STABLE_VERSION.to_string(),
614 meta: Header {
615 namespace: "example".to_string(),
616 name: "bank-update".to_string(),
617 version: "0.1.0".to_string(),
618 },
619 imports: vec![PackageImport {
620 metadata: Header {
621 namespace: "example".to_string(),
622 name: "bank".to_string(),
623 version: "0.1.0".to_string(),
624 },
625 types: vec!["bank-event".to_string(), "bank-account".to_string()],
626 states: vec!["account-balance".to_string()],
627 path: Some("../bank-types".to_string()),
628 functions: vec![],
629 }],
630 functions: vec![(
631 StepInvocation {
632 uses: "filter-positive-events".to_string(),
633 inputs: vec![NamedParameter {
634 name: "event".to_string(),
635 type_: TypeRef {
636 name: "bank-event".to_string(),
637 },
638 optional: false,
639 kind: ParameterKind::Value,
640 }],
641 ..Default::default()
642 },
643 OperatorType::Filter,
644 )],
645 dev: None,
646 states: vec![],
647 types: vec![],
648 }
649 }
650
651 fn second_package_definition() -> PackageDefinition {
652 PackageDefinition {
653 api_version: DATAFLOW_STABLE_VERSION.to_string(),
654 meta: Header {
655 namespace: "example".to_string(),
656 name: "bank".to_string(),
657 version: "0.1.0".to_string(),
658 },
659 types: vec![
660 MetadataType {
661 name: "bank-event".to_string(),
662 type_: SdfType::Object(SdfObject {
663 fields: vec![
664 ObjectField {
665 name: "name".to_string(),
666 type_: TypeRef {
667 name: "string".to_string(),
668 },
669 optional: false,
670 serde_config: SerdeConfig {
671 serialize: None,
672 deserialize: None,
673 },
674 },
675 ObjectField {
676 name: "amount".to_string(),
677 type_: TypeRef {
678 name: "float32".to_string(),
679 },
680 optional: false,
681 serde_config: SerdeConfig {
682 serialize: None,
683 deserialize: None,
684 },
685 },
686 ],
687 }),
688 origin: SdfTypeOrigin::Local,
689 },
690 MetadataType {
691 name: "bank-account".to_string(),
692 type_: SdfType::ArrowRow(SdfArrowRow {
693 columns: vec![
694 SdfArrowColumn {
695 name: "balance".to_string(),
696 type_: ArrowColumnKind::Float32,
697 },
698 SdfArrowColumn {
699 name: "name".to_string(),
700 type_: ArrowColumnKind::String,
701 },
702 ],
703 ..Default::default()
704 }),
705 origin: SdfTypeOrigin::Local,
706 },
707 ],
708 states: vec![StateTyped {
709 name: "account-balance".to_string(),
710
711 type_: SdfKeyedState {
712 key: TypeRef {
713 name: "string".to_string(),
714 },
715 value: SdfKeyedStateValue::U32,
716 },
717 }],
718 imports: vec![],
719 functions: vec![],
720 dev: None,
721 }
722 }
723
724 fn dataflow() -> DataflowDefinition {
725 DataflowDefinition {
726 api_version: DATAFLOW_STABLE_VERSION.to_string(),
727 meta: Header {
728 name: "example".to_string(),
729 version: "0.1.0".to_string(),
730 namespace: "example".to_string(),
731 },
732 imports: vec![
733 PackageImport {
734 metadata: Header {
735 namespace: "example".to_string(),
736 name: "bank-update".to_string(),
737 version: "0.1.0".to_string(),
738 },
739 functions: vec![FunctionImport {
740 name: "filter-positive-events".to_string(),
741 alias: None,
742 }],
743 path: None,
744 types: vec![],
745 states: vec![],
746 },
747 PackageImport {
748 metadata: Header {
749 namespace: "example".to_string(),
750 name: "bank".to_string(),
751 version: "0.1.0".to_string(),
752 },
753 types: vec!["bank-event".to_string()],
754 functions: vec![],
755 states: vec![],
756 path: None,
757 },
758 ],
759 ..Default::default()
760 }
761 }
762
763 fn dataflow_b() -> DataflowDefinition {
764 DataflowDefinition {
765 meta: Header {
766 name: "my-df".to_string(),
767 version: "0.1.0".to_string(),
768 namespace: "inf-namespace".to_string(),
769 },
770 api_version: DATAFLOW_STABLE_VERSION.to_string(),
771 services: vec![service()],
772 ..Default::default()
773 }
774 }
775
776 fn service() -> Operations {
777 let sources = vec![IoRef {
778 type_: IoType::Topic,
779 id: "listing".to_string(),
780 steps: vec![],
781 }];
782 let sinks = vec![IoRef {
783 type_: IoType::Topic,
784 id: "prospect".to_string(),
785 steps: vec![],
786 }];
787 let transforms = Transforms {
788 steps: vec![
789 TransformOperator::FilterMap(StepInvocation {
790 uses: "listing_map_job".to_string(),
791 ..Default::default()
792 }),
793 TransformOperator::Map(StepInvocation {
794 uses: "job_map_prospect".to_string(),
795 ..Default::default()
796 }),
797 ],
798 };
799
800 let post_transforms = Some(PostTransforms::AssignTimestamp(Window {
801 assign_timestamp: StepInvocation {
802 uses: "assign_timestamp".to_string(),
803 ..Default::default()
804 },
805 transforms: Transforms {
806 steps: vec![TransformOperator::Map(StepInvocation {
807 uses: "prospect_map_prospect".to_string(),
808 ..Default::default()
809 })],
810 },
811 partition: Some(PartitionOperator {
812 assign_key: StepInvocation {
813 uses: "assign_key".to_string(),
814 ..Default::default()
815 },
816 transforms: Transforms {
817 steps: vec![TransformOperator::Map(StepInvocation {
818 uses: "prospect_map_prospect2".to_string(),
819 ..Default::default()
820 })],
821 },
822 update_state: None,
823 }),
824 flush: Some(StepInvocation {
825 uses: "job_aggregate".to_string(),
826 ..Default::default()
827 }),
828 properties: WindowProperties {
829 kind: WindowKind::Tumbling(TumblingWindow {
830 duration: 3600000,
831 offset: 0,
832 }),
833 watermark_config: WatermarkConfig::default(),
834 },
835 }));
836
837 Operations {
838 name: "listing-to-prospect-op".to_string(),
839 sources,
840 sinks,
841 transforms,
842 post_transforms,
843 states: vec![],
844 }
845 }
846
847 #[test]
848 fn test_merge_types_and_states() {
849 let mut dataflow = dataflow();
850 let package_configs = vec![first_package_definition(), second_package_definition()];
851 assert_eq!(dataflow.types.len(), 0);
852
853 dataflow.merge_dependencies(&package_configs).unwrap();
854
855 assert_eq!(dataflow.types.first().unwrap().name, "bank-event");
856 }
857
858 #[test]
859 fn test_validate_validates_states() {
860 let mut dataflow = dataflow();
861 dataflow.services = vec![Operations {
862 name: "basic".to_string(),
863 sources: vec![],
864 sinks: vec![],
865 transforms: Transforms { steps: vec![] },
866 post_transforms: None,
867 states: vec![State::Reference(StateRef {
868 ref_service: "other".to_string(),
869 name: "account-balance".to_string(),
870 })],
871 }];
872 let res = dataflow
873 .validate()
874 .expect_err("should error for undefined state");
875
876 assert!(res
877 .errors
878 .contains(&DataflowDefinitionValidationError::UndefinedState {
879 service_name: "basic".to_string(),
880 ref_state_name: "other.account-balance".to_string()
881 }));
882 }
883
884 #[test]
885 fn test_validate_validates_metadata() {
886 let mut dataflow = dataflow();
887 dataflow.meta.name = "".to_string();
888
889 let res = dataflow
890 .validate()
891 .expect_err("should error for empty name");
892
893 assert!(res
894 .errors
895 .contains(&DataflowDefinitionValidationError::Meta(vec![
896 HeaderValidationError::new("Name cannot be empty\n")
897 ])));
898
899 assert_eq!(
900 res.to_string(),
901 r#"Dataflow Config failed validation
902
903 Header is invalid:
904 Name cannot be empty
905
906"#
907 );
908 }
909
910 #[test]
911 fn test_validate_rejects_schedule_on_v5() {
912 let mut dataflow = dataflow();
913 dataflow.api_version = "0.5.0".to_string();
914 dataflow.schedule = Some(vec![ScheduleConfig {
915 name: "weekly".to_string(),
916 schedule: Schedule::Cron("0 0 * * 0".to_string()),
917 }]);
918
919 let res = dataflow
920 .validate()
921 .expect_err("should error for schedule on v5");
922
923 assert!(res
924 .errors
925 .contains(&DataflowDefinitionValidationError::Versioning(
926 DataflowDefinitionVersionError::InvalidVersionFeature {
927 version: "0.5.0".to_string(),
928 feature: "schedule".to_string(),
929 supported_version: "0.6.0".to_string()
930 }
931 )));
932 }
933 #[test]
934 fn test_validate_rejects_empty_type_names() {
935 let mut dataflow = dataflow();
936
937 dataflow.types = vec![MetadataType {
938 name: "".to_string(),
939 type_: SdfType::Object(SdfObject { fields: vec![] }),
940 origin: SdfTypeOrigin::Local,
941 }];
942
943 let res = dataflow
944 .validate()
945 .expect_err("should error for empty type name");
946
947 assert!(res
948 .errors
949 .contains(&DataflowDefinitionValidationError::Type(
950 MetadataTypeValidationFailure {
951 name: "".to_string(),
952 errors: vec![MetadataTypeValidationError::EmptyName]
953 }
954 )));
955
956 assert_eq!(
957 res.to_string(),
958 r#"Dataflow Config failed validation
959
960 Defined type `` is invalid:
961 Name cannot be empty
962
963"#
964 );
965 }
966
967 #[test]
968 fn test_validate_validates_types() {
969 let mut dataflow = dataflow();
970
971 dataflow.types = vec![MetadataType {
972 name: "my-type".to_string(),
973 type_: SdfType::Named(TypeRef {
974 name: "foobar".to_string(),
975 }),
976 origin: SdfTypeOrigin::Local,
977 }];
978
979 let res = dataflow
980 .validate()
981 .expect_err("should error for invalid type reference");
982
983 assert!(res
984 .errors
985 .contains(&DataflowDefinitionValidationError::Type(
986 MetadataTypeValidationFailure {
987 name: "my-type".to_string(),
988 errors: vec![MetadataTypeValidationError::SdfType(
989 SdfTypeValidationError::InvalidRef("foobar".to_string())
990 )]
991 }
992 )));
993
994 assert_eq!(
995 res.to_string(),
996 r#"Dataflow Config failed validation
997
998 Defined type `my-type` is invalid:
999 Referenced type `foobar` not found in config or imported types
1000
1001"#
1002 );
1003 }
1004
1005 #[test]
1006 fn test_validate_validates_topic_names() {
1007 let mut dataflow = dataflow();
1008
1009 dataflow.topics = vec![(
1010 "".to_string(),
1011 Topic {
1012 name: "my-topic".to_string(),
1013 schema: TopicSchema {
1014 key: None,
1015 value: SchemaSerDe {
1016 converter: None,
1017 type_: TypeRef {
1018 name: "u8".to_string(),
1019 },
1020 },
1021 },
1022 consumer: None,
1023 producer: None,
1024 profile: None,
1025 },
1026 )];
1027
1028 let res = dataflow
1029 .validate()
1030 .expect_err("should error for empty topic key");
1031
1032 assert!(res
1033 .errors
1034 .contains(&DataflowDefinitionValidationError::Topic(
1035 TopicValidationFailure {
1036 name: "".to_string(),
1037 errors: vec![TopicValidationError::Name(vec![TopicNameError::Empty])]
1038 }
1039 )));
1040
1041 assert!(res.to_string().contains(
1042 r#"Dataflow Config failed validation
1043
1044 Topic `` is invalid:
1045 Topic name is invalid:
1046 Name cannot be empty
1047"#
1048 ));
1049 }
1050
1051 #[test]
1052 fn test_validate_validates_topics() {
1053 let mut dataflow = dataflow();
1054
1055 dataflow.topics = vec![(
1056 "my-topic".to_string(),
1057 Topic {
1058 name: "my-topic".to_string(),
1059 schema: TopicSchema {
1060 key: None,
1061 value: SchemaSerDe {
1062 converter: Some(SerdeConverter::Raw),
1063 type_: TypeRef {
1064 name: "foobar".to_string(),
1065 },
1066 },
1067 },
1068 consumer: None,
1069 producer: None,
1070 profile: None,
1071 },
1072 )];
1073
1074 let res = dataflow
1075 .validate()
1076 .expect_err("should error for invalid topic value type");
1077
1078 assert!(res
1079 .errors
1080 .contains(&DataflowDefinitionValidationError::Topic(
1081 TopicValidationFailure {
1082 name: "my-topic".to_string(),
1083 errors: vec![TopicValidationError::InvalidValueRef("foobar".to_string())]
1084 }
1085 )));
1086
1087 assert_eq!(
1088 res.to_string(),
1089 r#"Dataflow Config failed validation
1090
1091 Topic `my-topic` is invalid:
1092 Referenced type `foobar` not found in config or imported types
1093
1094"#
1095 );
1096 }
1097
1098 #[test]
1099 fn test_validate_validates_services() {
1100 let mut dataflow = dataflow_b();
1101
1102 dataflow.services = vec![Operations {
1103 name: "".to_string(),
1104 sources: vec![],
1105 sinks: vec![],
1106 transforms: Transforms { steps: vec![] },
1107 post_transforms: None,
1108 states: vec![],
1109 }];
1110
1111 let res = dataflow
1112 .validate()
1113 .expect_err("should error for missing service name");
1114
1115 assert!(res
1116 .errors
1117 .contains(&DataflowDefinitionValidationError::Service(
1118 ServiceValidationFailure {
1119 name: "".to_string(),
1120 errors: vec![
1121 ServiceValidationError::NameEmpty,
1122 ServiceValidationError::NoSources
1123 ]
1124 }
1125 )));
1126
1127 assert_eq!(
1128 res.to_string(),
1129 r#"Dataflow Config failed validation
1130
1131 Service `` is invalid:
1132 Service name cannot be empty
1133 Service must have at least one source
1134
1135"#
1136 );
1137 }
1138
1139 #[test]
1140 fn test_validate_passes_valid_config() {
1141 let dataflow = dataflow();
1142
1143 assert!(dataflow.validate().is_ok());
1144 }
1145
1146 #[test]
1147 fn test_has_custom_types() {
1148 let mut dataflow = dataflow();
1149 assert!(!dataflow.has_custom_types());
1150
1151 dataflow.types = vec![MetadataType {
1152 name: "my-type".to_string(),
1153 type_: SdfType::Object(SdfObject { fields: vec![] }),
1154 origin: SdfTypeOrigin::Local,
1155 }];
1156
1157 assert!(dataflow.has_custom_types());
1158 }
1159
1160 #[test]
1161 fn test_has_custom_types_with_states() {
1162 let mut dataflow = dataflow();
1163 assert!(!dataflow.has_custom_types());
1164
1165 dataflow.services = vec![Operations {
1166 name: "my-service".to_string(),
1167 sources: vec![],
1168 sinks: vec![],
1169 transforms: Transforms { steps: vec![] },
1170 post_transforms: None,
1171 states: vec![State::Typed(StateTyped {
1172 name: "my-state".to_string(),
1173
1174 type_: SdfKeyedState {
1175 key: TypeRef {
1176 name: "string".to_string(),
1177 },
1178 value: SdfKeyedStateValue::U32,
1179 },
1180 })],
1181 }];
1182
1183 assert!(dataflow.has_custom_types());
1184 }
1185
1186 #[test]
1187 fn test_validate_rejects_dataflows_with_operator_name_duplicated() {
1188 let mut dataflow = dataflow();
1189
1190 dataflow.topics.push((
1191 "listing".to_string(),
1192 Topic {
1193 name: "listing".to_string(),
1194 schema: TopicSchema {
1195 key: None,
1196 value: SchemaSerDe {
1197 converter: Some(crate::wit::io::SerdeConverter::Json),
1198 type_: TypeRef {
1199 name: "string".to_string(),
1200 },
1201 },
1202 },
1203 consumer: None,
1204 producer: None,
1205 profile: None,
1206 },
1207 ));
1208
1209 dataflow.validate().expect("should validate first");
1210
1211 dataflow.services.push(Operations {
1212 name: "my-op".to_string(),
1213 sources: vec![IoRef {
1214 type_: IoType::Topic,
1215 id: "listing".to_string(),
1216 steps: vec![],
1217 }],
1218 sinks: vec![],
1219 transforms: Transforms {
1220 steps: vec![
1221 TransformOperator::Filter(StepInvocation {
1222 uses: "duplicated-fn".to_string(),
1223 inputs: vec![NamedParameter {
1224 name: "cat".to_string(),
1225 type_: TypeRef {
1226 name: "string".to_string(),
1227 },
1228 optional: false,
1229 kind: ParameterKind::Value,
1230 }],
1231 output: Some(Parameter {
1232 type_: OutputType::Ref(TypeRef {
1233 name: "bool".to_string(),
1234 }),
1235 ..Default::default()
1236 }),
1237 ..Default::default()
1238 }),
1239 TransformOperator::Filter(StepInvocation {
1240 uses: "duplicated-fn".to_string(),
1241 inputs: vec![NamedParameter {
1242 name: "cat".to_string(),
1243 type_: TypeRef {
1244 name: "string".to_string(),
1245 },
1246 optional: false,
1247 kind: ParameterKind::Value,
1248 }],
1249 output: Some(Parameter {
1250 type_: OutputType::Ref(TypeRef {
1251 name: "bool".to_string(),
1252 }),
1253 ..Default::default()
1254 }),
1255 ..Default::default()
1256 }),
1257 ],
1258 },
1259 ..Default::default()
1260 });
1261
1262 let res = dataflow
1263 .validate()
1264 .expect_err("should error for duplicated operator name");
1265
1266 assert!(res
1267 .errors
1268 .contains(&DataflowDefinitionValidationError::DuplicateOperator(
1269 "duplicated-fn".to_string()
1270 )));
1271
1272 assert_eq!(
1273 res.to_string(),
1274 r#"Dataflow Config failed validation
1275
1276 Duplicate inline operator with name: duplicated-fn was found, inline operators must have unique names
1277
1278"#
1279 );
1280 }
1281 #[test]
1282 fn test_add_imported_operator() {
1283 let mut dataflow = dataflow_b();
1284
1285 let operator_placement = OperatorPlacement {
1286 service_id: "listing-to-prospect-op".to_string(),
1287 transforms_index: Some(2),
1288 ..Default::default()
1289 };
1290
1291 let function = StepInvocation {
1292 uses: "cat_map_cat".to_string(),
1293 inputs: vec![NamedParameter {
1294 name: "cat".to_string(),
1295 type_: TypeRef {
1296 name: "string".to_string(),
1297 },
1298 optional: false,
1299 kind: ParameterKind::Value,
1300 }],
1301 output: Some(Parameter {
1302 type_: OutputType::Ref(TypeRef {
1303 name: "string".to_string(),
1304 }),
1305 ..Default::default()
1306 }),
1307 ..Default::default()
1308 };
1309
1310 dataflow
1311 .add_imported_operator(
1312 function.clone(),
1313 OperatorType::Map,
1314 operator_placement,
1315 package_import(),
1316 )
1317 .expect("Failed to add imported operator");
1318
1319 let result_operator = dataflow.services.first().unwrap().transforms.steps[2].clone();
1320
1321 assert_eq!(result_operator, TransformOperator::Map(function));
1322 assert_eq!(
1323 *dataflow
1324 .imports
1325 .first()
1326 .expect("Should be a package import"),
1327 package_import()
1328 );
1329 }
1330
1331 #[test]
1332 fn test_add_imported_operator_with_index_out_of_bounds() {
1333 let mut dataflow = dataflow_b();
1334
1335 let operator_placement = OperatorPlacement {
1336 service_id: "listing-to-prospect-op".to_string(),
1337 transforms_index: Some(3),
1338 ..Default::default()
1339 };
1340
1341 let function = StepInvocation {
1342 uses: "cat_map_cat".to_string(),
1343 ..Default::default()
1344 };
1345
1346 let res = dataflow.add_imported_operator(
1347 function,
1348 OperatorType::Map,
1349 operator_placement,
1350 package_import(),
1351 );
1352
1353 assert!(res.is_err());
1354 assert!(res.unwrap_err().to_string().contains(
1355 "cannot insert operator into transforms block, index is out of bounds, len = 2"
1356 ))
1357 }
1358
1359 #[test]
1360 fn test_add_imported_operator_merges_package_import() {
1361 let mut dataflow = dataflow_b();
1362
1363 let operator_placement = OperatorPlacement {
1364 service_id: "listing-to-prospect-op".to_string(),
1365 transforms_index: Some(2),
1366 ..Default::default()
1367 };
1368
1369 let function = StepInvocation {
1371 uses: "cat_map_cat".to_string(),
1372 ..Default::default()
1373 };
1374
1375 dataflow
1376 .add_imported_operator(
1377 function,
1378 OperatorType::Map,
1379 operator_placement,
1380 package_import(),
1381 )
1382 .expect("Failed to add imported operator");
1383
1384 let operator_placement = OperatorPlacement {
1386 service_id: "listing-to-prospect-op".to_string(),
1387 transforms_index: Some(3),
1388 ..Default::default()
1389 };
1390
1391 let second_function = StepInvocation {
1392 uses: "cat_map_dog".to_string(),
1393 ..Default::default()
1394 };
1395
1396 dataflow
1397 .add_imported_operator(
1398 second_function,
1399 OperatorType::Map,
1400 operator_placement,
1401 package_import_b(),
1402 )
1403 .expect("Failed to add imported operator");
1404
1405 assert_eq!(dataflow.imports.len(), 1);
1406 assert_eq!(
1407 *dataflow
1408 .imports
1409 .first()
1410 .expect("Should be a package import"),
1411 package_import_merged()
1412 );
1413 }
1414
1415 #[test]
1416 fn test_add_inline_operator() {
1417 let mut dataflow = dataflow_b();
1418
1419 let operator_placement = OperatorPlacement {
1420 service_id: "listing-to-prospect-op".to_string(),
1421 transforms_index: Some(2),
1422 ..Default::default()
1423 };
1424
1425 let function = StepInvocation {
1426 uses: "cat_map_cat".to_string(),
1427 code_info: CodeInfo {
1428 lang: CodeLang::Rust,
1429 code: Some("fn map_cat(cat: Cat) -> Cat { cat }".to_string()),
1430 ..Default::default()
1431 },
1432 ..Default::default()
1433 };
1434
1435 dataflow
1436 .add_inline_operator(function, OperatorType::Map, operator_placement)
1437 .expect("Failed to add imported operator");
1438
1439 let result_operator = dataflow.services.first().unwrap().transforms.steps[2].clone();
1440
1441 assert_eq!(
1442 result_operator,
1443 TransformOperator::Map(StepInvocation {
1444 uses: "cat_map_cat".to_string(),
1445 code_info: CodeInfo {
1446 lang: CodeLang::Rust,
1447 code: Some("fn map_cat(cat: Cat) -> Cat { cat }".to_string()),
1448 ..Default::default()
1449 },
1450 ..Default::default()
1451 })
1452 );
1453 }
1454
1455 #[test]
1456 fn test_add_inline_operator_has_no_code() {
1457 let mut dataflow = dataflow();
1458
1459 let function = StepInvocation {
1460 uses: "cat_map_cat".to_string(),
1461 code_info: CodeInfo {
1462 lang: CodeLang::Rust,
1463 code: None,
1464 ..Default::default()
1465 },
1466 ..Default::default()
1467 };
1468
1469 let res =
1470 dataflow.add_inline_operator(function, OperatorType::Map, OperatorPlacement::default());
1471
1472 assert!(res.is_err());
1473 assert!(res
1474 .unwrap_err()
1475 .to_string()
1476 .contains("inline operator must have code"))
1477 }
1478
1479 #[test]
1480 fn test_replace_inline_operator() {
1481 let mut dataflow = dataflow_b();
1482
1483 let operator_placement = OperatorPlacement {
1484 service_id: "listing-to-prospect-op".to_string(),
1485 transforms_index: Some(2),
1486 ..Default::default()
1487 };
1488
1489 let function = StepInvocation {
1490 uses: "cat_map_cat".to_string(),
1491 code_info: CodeInfo {
1492 lang: CodeLang::Rust,
1493 code: Some("fn map_cat(cat: Cat) -> Cat { cat }".to_string()),
1494 ..Default::default()
1495 },
1496 ..Default::default()
1497 };
1498
1499 let new_function = StepInvocation {
1500 uses: "cat_map_cat".to_string(),
1501 code_info: CodeInfo {
1502 lang: CodeLang::Rust,
1503 code: Some("fn map_dog(dog: Dog) -> Dog { dog }".to_string()),
1504 ..Default::default()
1505 },
1506 ..Default::default()
1507 };
1508
1509 dataflow
1510 .add_inline_operator(function, OperatorType::Map, operator_placement.clone())
1511 .expect("Failed to add imported operator");
1512
1513 dataflow
1514 .replace_inline_operator(new_function, OperatorType::FilterMap, operator_placement)
1515 .expect("Failed to replace inline operator");
1516
1517 let result_operator = dataflow.services.first().unwrap().transforms.steps[2].clone();
1518
1519 assert_eq!(
1520 result_operator,
1521 TransformOperator::FilterMap(StepInvocation {
1522 uses: "cat_map_cat".to_string(),
1523 code_info: CodeInfo {
1524 lang: CodeLang::Rust,
1525 code: Some("fn map_dog(dog: Dog) -> Dog { dog }".to_string()),
1526 ..Default::default()
1527 },
1528 ..Default::default()
1529 })
1530 );
1531 }
1532
1533 #[test]
1534 fn test_replace_inline_operator_has_no_code() {
1535 let mut dataflow = dataflow();
1536
1537 let function = StepInvocation {
1538 uses: "cat_map_cat".to_string(),
1539 code_info: CodeInfo {
1540 lang: CodeLang::Rust,
1541 code: None,
1542 ..Default::default()
1543 },
1544 ..Default::default()
1545 };
1546
1547 let res = dataflow.replace_inline_operator(
1548 function,
1549 OperatorType::Map,
1550 OperatorPlacement::default(),
1551 );
1552
1553 assert!(res.is_err());
1554 assert!(res
1555 .unwrap_err()
1556 .to_string()
1557 .contains("inline operator must have code"))
1558 }
1559
1560 #[test]
1561 fn test_delete_operator() {
1562 let mut dataflow = dataflow_b();
1563
1564 let operator_placement = OperatorPlacement {
1565 service_id: "listing-to-prospect-op".to_string(),
1566 transforms_index: Some(1),
1567 ..Default::default()
1568 };
1569
1570 let _ = dataflow.delete_operator(operator_placement);
1571
1572 let steps = &dataflow.services.first().unwrap().transforms.steps;
1573 let remaining_op = steps.first().unwrap();
1574 let function = match remaining_op {
1575 TransformOperator::FilterMap(f) => f,
1576 _ => panic!("Expected FilterMap operator"),
1577 };
1578
1579 assert_eq!(steps.len(), 1);
1580 assert_eq!(function.uses, "listing_map_job");
1581 }
1582
1583 #[test]
1584 fn test_delete_operator_fails_when_service_does_not_exist() {
1585 let mut dataflow = dataflow_b();
1586
1587 let operator_placement = OperatorPlacement {
1588 service_id: "my-missing-service".to_string(),
1589 transforms_index: Some(1),
1590 ..Default::default()
1591 };
1592
1593 let res = dataflow.delete_operator(operator_placement);
1594
1595 assert_eq!(
1596 res.unwrap_err().to_string(),
1597 "Service with id my-missing-service not found"
1598 );
1599 }
1600
1601 #[test]
1602 fn test_merge_package_import_merges_repeat_imports() {
1603 let mut dataflow = dataflow_b();
1604
1605 dataflow.merge_package_import(package_import());
1606 dataflow.merge_package_import(package_import_b());
1607
1608 assert_eq!(
1609 *dataflow
1610 .imports
1611 .first()
1612 .expect("Should be a package import"),
1613 package_import_merged()
1614 );
1615 }
1616
1617 #[test]
1618 fn test_merge_package_import_appends_new_imports() {
1619 let mut dataflow = dataflow_b();
1620
1621 let next_version_import = PackageImport {
1622 metadata: Header {
1623 name: "cats_package".to_string(),
1624 version: "0.1.1".to_string(),
1625 namespace: "inf-namespace".to_string(),
1626 },
1627 path: None,
1628 types: vec!["cat".to_string(), "dog".to_string()],
1629 states: vec![],
1630 functions: vec![FunctionImport {
1631 name: "cat_map_dog".to_string(),
1632 alias: None,
1633 }],
1634 };
1635
1636 dataflow.merge_package_import(package_import());
1637 dataflow.merge_package_import(next_version_import.clone());
1638
1639 assert_eq!(dataflow.imports.len(), 2);
1640 assert_eq!(
1641 *dataflow
1642 .imports
1643 .first()
1644 .expect("Should be a package import"),
1645 package_import()
1646 );
1647 assert_eq!(dataflow.imports[1], next_version_import);
1648 }
1649
1650 #[test]
1651 fn test_pkg_df_name_conflict() {
1652 let mut dataflow = dataflow();
1653
1654 dataflow.imports.push(PackageImport {
1656 metadata: Header {
1657 name: "example".to_string(),
1658 version: "0.1.0".to_string(),
1659 namespace: "example".to_string(),
1660 },
1661 path: None,
1662 types: vec![],
1663 states: vec![],
1664 functions: vec![],
1665 });
1666
1667 let res = dataflow.validate();
1668 assert!(res.is_err());
1669 let err = res.unwrap_err();
1670 assert!(err
1671 .to_string()
1672 .contains("Package example/example conflicts with dataflow name and namespace"),)
1673 }
1674
1675 fn package_import() -> PackageImport {
1676 PackageImport {
1677 metadata: Header {
1678 name: "cats_package".to_string(),
1679 version: "0.1.0".to_string(),
1680 namespace: "inf-namespace".to_string(),
1681 },
1682 path: None,
1683 types: vec!["cat".to_string()],
1684 states: vec![],
1685 functions: vec![FunctionImport {
1686 name: "cat_map_cat".to_string(),
1687 alias: None,
1688 }],
1689 }
1690 }
1691
1692 fn package_import_b() -> PackageImport {
1693 PackageImport {
1694 metadata: Header {
1695 name: "cats_package".to_string(),
1696 version: "0.1.0".to_string(),
1697 namespace: "inf-namespace".to_string(),
1698 },
1699 path: None,
1700 types: vec!["cat".to_string(), "dog".to_string()],
1701 states: vec![],
1702 functions: vec![FunctionImport {
1703 name: "cat_map_dog".to_string(),
1704 alias: None,
1705 }],
1706 }
1707 }
1708
1709 fn package_import_merged() -> PackageImport {
1710 PackageImport {
1711 metadata: Header {
1712 name: "cats_package".to_string(),
1713 version: "0.1.0".to_string(),
1714 namespace: "inf-namespace".to_string(),
1715 },
1716 path: None,
1717 types: vec!["cat".to_string(), "dog".to_string()],
1718 states: vec![],
1719 functions: vec![
1720 FunctionImport {
1721 name: "cat_map_cat".to_string(),
1722 alias: None,
1723 },
1724 FunctionImport {
1725 name: "cat_map_dog".to_string(),
1726 alias: None,
1727 },
1728 ],
1729 }
1730 }
1731}