1use std::collections::BTreeMap;
2
3use anyhow::{Result, anyhow};
4
5use crate::{
6 importer::{function::imported_operator_config, states::inject_states},
7 metadata::{io::topic::KVSchemaType, operator::transforms::validate_transforms_steps},
8 util::{
9 config_error::{ConfigError, INDENT},
10 operator_placement::OperatorPlacement,
11 sdf_types_map::SdfTypesMap,
12 validate::validate_all,
13 validation_error::ValidationError,
14 validation_failure::ValidationFailure,
15 },
16 wit::{
17 dataflow::{
18 IoType, Operations, PackageDefinition, PackageImport, PostTransforms, ScheduleConfig,
19 Topic,
20 },
21 operator::StepInvocation,
22 package_interface::OperatorType,
23 },
24};
25
26use super::io_ref::IoRefValidationFailure;
27
28#[derive(Debug, Clone, Eq, PartialEq)]
29pub struct ServiceValidationFailure {
30 pub name: String,
31 pub errors: Vec<ServiceValidationError>,
32}
33
34impl ConfigError for ServiceValidationFailure {
35 fn readable(&self, indents: usize) -> String {
36 let mut result = format!(
37 "{}Service `{}` is invalid:\n",
38 INDENT.repeat(indents),
39 self.name
40 );
41
42 for error in &self.errors {
43 result.push_str(&error.readable(indents + 1));
44 }
45
46 result
47 }
48}
49
50#[derive(Debug, Clone, Eq, PartialEq)]
51pub enum ServiceValidationError {
52 NameEmpty,
53 MissingSourceTopic(String),
54 NoSources,
55 InvalidSource(IoRefValidationFailure),
56 InvalidSink(IoRefValidationFailure),
57 SourceTypeMismatch(String),
58 SinkTypeMismatch(String),
59 InvalidState(ValidationError),
60 InvalidTransformsSteps(ValidationFailure),
61 InvalidPostTransforms(ValidationFailure),
62}
63
64impl ConfigError for ServiceValidationError {
65 fn readable(&self, indents: usize) -> String {
66 let indent = INDENT.repeat(indents);
67
68 match self {
69 Self::NameEmpty => format!("{}Service name cannot be empty\n", indent),
70 Self::MissingSourceTopic(topic) => {
71 format!("{}Source topic `{}` not found\n", indent, topic)
72 }
73 Self::NoSources => format!("{}Service must have at least one source\n", indent),
74 Self::InvalidSource(error) => format!(
75 "{}Source `{}` is invalid:\n{}",
76 indent,
77 error.name,
78 error.readable(indents + 1),
79 ),
80 Self::InvalidSink(error) => format!(
81 "{}Sink `{}` is invalid:\n{}",
82 indent,
83 error.name,
84 error.readable(indents + 1),
85 ),
86 Self::SourceTypeMismatch(types) => format!(
87 "{}Sources for service must be identical, but the sources had the following types:\n{}{}{}\n",
88 indent, indent, INDENT, types
89 ),
90 Self::SinkTypeMismatch(types) => format!(
91 "{}Sinks for service must be identical, but the sinks had the following types:\n{}{}{}\n",
92 indent, indent, INDENT, types
93 ),
94 Self::InvalidState(error) => format!(
95 "{}State is invalid:\n{}",
96 indent,
97 error.readable(indents + 1)
98 ),
99 Self::InvalidTransformsSteps(error) => format!(
100 "{}Transforms block is invalid:\n{}",
101 indent,
102 error.readable(indents + 1)
103 ),
104 Self::InvalidPostTransforms(error) => error.readable(indents),
105 }
106 }
107}
108
109impl Default for Operations {
110 fn default() -> Self {
111 Operations {
112 name: "".to_string(),
113 sources: vec![],
114 sinks: vec![],
115 transforms: Default::default(),
116 post_transforms: None,
117 states: vec![],
118 }
119 }
120}
121
122impl Operations {
123 pub fn add_operator(
124 &mut self,
125 operator_type: OperatorType,
126 operator_placement: OperatorPlacement,
127 step_invocation: StepInvocation,
128 ) -> Result<()> {
129 if operator_placement.window {
130 match self.post_transforms {
131 Some(PostTransforms::AssignTimestamp(ref mut window)) => window.add_operator(
132 operator_placement.transforms_index,
133 operator_placement.partition,
134 operator_type,
135 step_invocation,
136 ),
137 _ => Err(anyhow!(
138 "Cannot add operator. Window was specified but service does not have a window"
139 )),
140 }
141 } else if operator_placement.partition {
142 match self.post_transforms {
143 Some(PostTransforms::Partition(ref mut partition)) => partition.add_operator(
144 operator_placement.transforms_index,
145 operator_type,
146 step_invocation,
147 ),
148 Some(PostTransforms::AssignTimestamp(_)) => Err(anyhow!(
149 "Cannot add operator. Service does not have top level partition. To delete operator from window partition, please specify window"
150 )),
151 None => Err(anyhow!(
152 "Cannot add operator. Parition was specified but service does not have a partition"
153 )),
154 }
155 } else {
156 self.transforms.insert_operator(
157 operator_placement.transforms_index,
158 operator_type,
159 step_invocation,
160 )
161 }
162 }
163
164 pub fn delete_operator(&mut self, operator_placement: OperatorPlacement) -> Result<()> {
165 if operator_placement.window {
166 match self.post_transforms {
167 Some(PostTransforms::AssignTimestamp(ref mut window)) => window.delete_operator(
168 operator_placement.transforms_index,
169 operator_placement.partition,
170 ),
171 _ => Err(anyhow!(
172 "Cannot delete operator. Window was specified but service does not have a window"
173 )),
174 }
175 } else if operator_placement.partition {
176 match self.post_transforms {
177 Some(PostTransforms::Partition(ref mut partition)) => {
178 partition.delete_operator(operator_placement.transforms_index)
179 }
180 Some(PostTransforms::AssignTimestamp(_)) => Err(anyhow!(
181 "Cannot delete operator. Service does not have top level partition. To delete operator from window partition, please specify window"
182 )),
183 None => Err(anyhow!(
184 "Cannot delete operator. Parition was specified but service does not have a partition"
185 )),
186 }
187 } else {
188 match operator_placement.transforms_index {
189 Some(index) => self.transforms.delete_operator(index),
190 None => Err(anyhow!(
191 "Transforms index required to delete operator from transforms"
192 )),
193 }
194 }
195 }
196
197 pub fn import_operator_configs(
198 &mut self,
199 imports: &[PackageImport],
200 packages: &[PackageDefinition],
201 ) -> Result<()> {
202 let mut service_states: BTreeMap<_, _> = self
203 .states
204 .iter()
205 .map(|s| (s.name().to_owned(), s.clone()))
206 .collect();
207 for source in &mut self.sources {
208 if let IoType::Topic = source.type_ {
209 for step in &mut source.steps {
210 if step.is_imported(imports) {
211 *step = imported_operator_config(step, imports, packages)?;
212 }
213 }
214 }
215 }
216
217 for sink in &mut self.sinks {
218 if let IoType::Topic = sink.type_ {
219 for step in &mut sink.steps {
220 if step.is_imported(imports) {
221 *step = imported_operator_config(step, imports, packages)?;
222
223 inject_states(&mut service_states, &step.inner().states)?;
224 }
225 }
226 }
227 }
228
229 for step in &mut self.transforms.steps {
230 if step.is_imported(imports) {
231 *step = imported_operator_config(step, imports, packages)?;
232 inject_states(&mut service_states, &step.inner().states)?;
233 }
234 }
235
236 if let Some(ref mut post_transforms) = self.post_transforms {
237 match post_transforms {
238 PostTransforms::AssignTimestamp(window) => {
239 window.import_operator_configs(imports, packages, &mut service_states)?;
240 }
241 PostTransforms::Partition(partition) => {
242 partition.import_operator_configs(imports, packages, &mut service_states)?;
243 }
244 }
245 }
246
247 self.states = service_states.into_values().collect();
248
249 Ok(())
250 }
251
252 #[cfg(feature = "parser")]
253 pub fn update_inline_operators(&mut self) -> Result<()> {
255 for source in &mut self.sources {
256 if let IoType::Topic = source.type_ {
257 for step in &mut source.steps {
258 step.update_signature_from_code()?;
259 }
260 }
261 }
262
263 for sink in &mut self.sinks {
264 if let IoType::Topic = sink.type_ {
265 for step in &mut sink.steps {
266 step.update_signature_from_code()?;
267 }
268 }
269 }
270
271 for step in &mut self.transforms.steps {
272 step.update_signature_from_code()?;
273 }
274
275 if let Some(ref mut post_transforms) = self.post_transforms {
276 post_transforms.update_inline_operators()?;
277 }
278 Ok(())
279 }
280
281 pub(crate) fn validate(
282 &self,
283 types: &SdfTypesMap,
284 topics: &[(String, Topic)],
285 schedules: Option<&[ScheduleConfig]>,
286 ) -> Result<(), ServiceValidationFailure> {
287 let mut failure = ServiceValidationFailure {
288 name: self.name.clone(),
289 errors: vec![],
290 };
291
292 if self.name.is_empty() {
293 failure.errors.push(ServiceValidationError::NameEmpty);
294 }
295
296 let service_input_type = match self.service_input_type(topics) {
297 Ok(ty) => ty,
298 Err(e) => {
299 failure.errors.push(e);
300 return Err(failure);
301 }
302 };
303
304 if let Err(e) = self.validate_sources(types, topics, schedules) {
305 failure.errors = [failure.errors, e].concat();
306 }
307
308 if let Err(e) = self.validate_states() {
309 for error in e.errors {
310 failure
311 .errors
312 .push(ServiceValidationError::InvalidState(error));
313 }
314 }
315
316 if let Err(transforms_validation_errors) = validate_transforms_steps(
317 &self.transforms.steps,
318 types,
319 service_input_type.clone(),
320 "sources".to_string(),
321 ) {
322 failure
323 .errors
324 .push(ServiceValidationError::InvalidTransformsSteps(
325 transforms_validation_errors,
326 ));
327 }
328
329 let transforms_output_type = match self.transforms.output_type(service_input_type) {
330 Ok(ty) => ty,
331 Err(e) => {
332 failure
333 .errors
334 .push(ServiceValidationError::InvalidTransformsSteps(
335 ValidationFailure { errors: vec![e] },
336 ));
337
338 return Err(failure);
339 }
340 };
341
342 if let Some(post_transforms) = &self.post_transforms {
343 if let Err(post_transforms_error) =
344 post_transforms.validate(types, &transforms_output_type)
345 {
346 failure
347 .errors
348 .push(ServiceValidationError::InvalidPostTransforms(
349 post_transforms_error,
350 ));
351 }
352 }
353
354 let service_output_type = match self.post_transforms_output_type(transforms_output_type) {
355 Ok(ty) => ty,
356 Err(_e) => {
357 return Err(failure);
359 }
360 };
361
362 if let Err(e) = self.validate_sinks(types, topics, service_output_type) {
363 failure.errors = [failure.errors, e].concat();
364 }
365
366 if failure.errors.is_empty() {
367 Ok(())
368 } else {
369 Err(failure)
370 }
371 }
372
373 fn validate_states(&self) -> Result<(), ValidationFailure> {
374 validate_all(&self.states)
375 }
376
377 fn validate_sources(
378 &self,
379 types: &SdfTypesMap,
380 topics: &[(String, Topic)],
381 schedules: Option<&[ScheduleConfig]>,
382 ) -> Result<(), Vec<ServiceValidationError>> {
383 let mut errors = vec![];
384 let mut source_types = vec![];
385
386 for source in &self.sources {
387 if let Err(source_error) = source.validate_source(types, topics, schedules) {
388 errors.push(ServiceValidationError::InvalidSource(source_error));
389 }
390
391 if let Ok(source_type) = source.source_type(topics) {
392 source_types.push((source.id.clone(), source_type));
393 }
394 }
395
396 if !types_are_identical(&source_types) {
398 errors.push(ServiceValidationError::SourceTypeMismatch(
399 source_types
400 .iter()
401 .map(|(source_name, type_)| {
402 if let Some(key) = &type_.key {
403 format!(
404 "{}: {}(key) - {}(value)",
405 source_name, key.name, type_.value.name
406 )
407 } else {
408 format!("{}: {}(value)", source_name, type_.value.name)
409 }
410 })
411 .collect::<Vec<_>>()
412 .join(", "),
413 ))
414 }
415
416 if errors.is_empty() {
417 Ok(())
418 } else {
419 Err(errors)
420 }
421 }
422
423 fn validate_sinks(
424 &self,
425 types: &SdfTypesMap,
426 topics: &[(String, Topic)],
427 service_output_type: KVSchemaType,
428 ) -> Result<(), Vec<ServiceValidationError>> {
429 let mut errors = vec![];
430 let mut sink_types = vec![];
431
432 for sink in &self.sinks {
433 if let Err(sink_error) = sink.validate_sink(types, topics, &service_output_type) {
434 errors.push(ServiceValidationError::InvalidSink(sink_error));
435 }
436
437 if let Ok(Some(source_type)) = sink.sink_type(topics) {
438 sink_types.push((sink.id.clone(), source_type));
439 }
440 }
441
442 if !types_are_identical(&sink_types) {
443 errors.push(ServiceValidationError::SinkTypeMismatch(
444 sink_types
445 .iter()
446 .map(|(sink_name, type_)| {
447 if let Some(key) = &type_.key {
448 format!(
449 "{}: {}(key) - {}(value)",
450 sink_name, key.name, type_.value.name
451 )
452 } else {
453 format!("{}: {}(value)", sink_name, type_.value.name)
454 }
455 })
456 .collect::<Vec<_>>()
457 .join(", "),
458 ));
459 }
460
461 if errors.is_empty() {
462 Ok(())
463 } else {
464 Err(errors)
465 }
466 }
467
468 fn service_input_type(
469 &self,
470 topics: &[(String, Topic)],
471 ) -> Result<KVSchemaType, ServiceValidationError> {
472 match self.sources.first() {
473 Some(source) => match source.source_type(topics) {
474 Ok(ty) => Ok(ty),
475 _ => Err(ServiceValidationError::MissingSourceTopic(
476 source.id.clone(),
477 )),
478 },
479 None => Err(ServiceValidationError::NoSources),
480 }
481 }
482
483 fn post_transforms_output_type(
484 &self,
485 transforms_output_type: KVSchemaType,
486 ) -> Result<KVSchemaType, ValidationError> {
487 if let Some(post_transforms) = &self.post_transforms {
488 post_transforms.output_type(transforms_output_type)
489 } else {
490 Ok(transforms_output_type)
491 }
492 }
493
494 pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
495 let ops = self
496 .transforms
497 .steps
498 .iter()
499 .map(|op| (op.inner().to_owned(), op.clone().into()))
500 .chain(
501 self.post_transforms
502 .iter()
503 .flat_map(|post_transforms| post_transforms.operators()),
504 )
505 .collect();
506
507 ops
508 }
509}
510
511fn types_are_identical(types: &[(String, KVSchemaType)]) -> bool {
512 let Some((_, mut sample_ty)) = types.first().cloned() else {
513 return true;
514 };
515
516 for (_, current_ty) in types.iter().skip(1) {
517 if current_ty.value != sample_ty.value {
518 return false;
519 }
520
521 match (¤t_ty.key, &sample_ty.key) {
522 (Some(k), Some(fk)) => {
523 if k != fk {
524 return false;
525 }
526 }
527 (Some(k), None) => sample_ty.key = Some(k.clone()),
528 _ => (),
529 }
530 }
531 true
532}
533
534#[cfg(test)]
535mod test {
536
537 use sdf_common::constants::DATAFLOW_STABLE_VERSION;
538
539 use crate::{
540 metadata::dataflow::{
541 io_ref::{IoRefValidationError, IoRefValidationFailure},
542 operations::ServiceValidationError,
543 },
544 util::{
545 config_error::ConfigError, operator_placement::OperatorPlacement,
546 sdf_types_map::SdfTypesMap, validation_error::ValidationError,
547 validation_failure::ValidationFailure,
548 },
549 wit::{
550 dataflow::{
551 IoRef, IoType, Operations, PackageDefinition, PackageImport, PostTransforms, Topic,
552 },
553 io::{SchemaSerDe, TopicSchema, TypeRef},
554 metadata::{
555 NamedParameter, OutputType, Parameter, ParameterKind, SdfKeyedState,
556 SdfKeyedStateValue,
557 },
558 operator::{
559 PartitionOperator, StepInvocation, StepState, TransformOperator, Transforms,
560 TumblingWindow, Window, WindowProperties, WindowKind, WatermarkConfig,
561 },
562 package_interface::{FunctionImport, Header, OperatorType, StateTyped},
563 states::{State, StateRef},
564 },
565 };
566
567 fn packages() -> Vec<PackageDefinition> {
568 vec![PackageDefinition {
569 api_version: DATAFLOW_STABLE_VERSION.to_owned(),
570 meta: Header {
571 name: "my-pkg".to_string(),
572 namespace: "my-ns".to_string(),
573 version: "0.1.0".to_string(),
574 },
575 functions: vec![my_fn()],
576 imports: vec![],
577 types: vec![],
578 states: vec![StateTyped {
579 name: "map-state".to_string(),
580 type_: SdfKeyedState {
581 key: TypeRef {
582 name: "string".to_string(),
583 },
584 value: SdfKeyedStateValue::U32,
585 },
586 }],
587 dev: None,
588 }]
589 }
590
591 fn my_fn() -> (StepInvocation, OperatorType) {
592 (
593 StepInvocation {
594 uses: "map-fn".to_string(),
595 inputs: vec![NamedParameter {
596 name: "map-input".to_string(),
597 type_: TypeRef {
598 name: "u8".to_string(),
599 },
600 optional: false,
601 kind: ParameterKind::Value,
602 }],
603 output: Some(Parameter {
604 type_: TypeRef {
605 name: "u8".to_string(),
606 }
607 .into(),
608 ..Default::default()
609 }),
610 states: vec![StepState::Resolved(StateTyped {
611 name: "map-state".to_string(),
612 type_: SdfKeyedState {
613 key: TypeRef {
614 name: "string".to_string(),
615 },
616 value: SdfKeyedStateValue::U32,
617 },
618 })],
619 ..Default::default()
620 },
621 OperatorType::Map,
622 )
623 }
624
625 fn imports() -> Vec<PackageImport> {
626 vec![PackageImport {
627 metadata: Header {
628 name: "my-pkg".to_string(),
629 namespace: "my-ns".to_string(),
630 version: "0.1.0".to_string(),
631 },
632 functions: vec![FunctionImport {
633 name: "map-fn".to_string(),
634 alias: None,
635 }],
636 path: Some("path/to/my-pkg".to_string()),
637 types: vec![],
638 states: vec![],
639 }]
640 }
641
642 fn operations() -> Operations {
643 Operations {
644 name: "my-service".to_string(),
645 sources: vec![IoRef {
646 type_: IoType::Topic,
647 id: "my-source".to_string(),
648 steps: vec![TransformOperator::Map(StepInvocation {
649 uses: "map-fn".to_string(),
650 ..Default::default()
651 })],
652 }],
653 sinks: vec![IoRef {
654 type_: IoType::Topic,
655 id: "my-source".to_string(),
656 steps: vec![TransformOperator::Map(StepInvocation {
657 uses: "map-fn".to_string(),
658 ..Default::default()
659 })],
660 }],
661 transforms: Transforms {
662 steps: vec![
663 TransformOperator::Map(StepInvocation {
664 uses: "map-fn".to_string(),
665 ..Default::default()
666 }),
667 TransformOperator::Map(StepInvocation {
668 uses: "map-fn".to_string(),
669 ..Default::default()
670 }),
671 ],
672 },
673 post_transforms: None,
674 states: vec![],
675 }
676 }
677
678 fn topics() -> Vec<(String, Topic)> {
679 vec![
680 (
681 "my-topic".to_string(),
682 Topic {
683 name: "my-topic".to_string(),
684 schema: TopicSchema {
685 key: None,
686 value: SchemaSerDe {
687 converter: None,
688 type_: TypeRef {
689 name: "u8".to_string(),
690 },
691 },
692 },
693 consumer: None,
694 producer: None,
695 profile: None,
696 },
697 ),
698 (
699 "my-other-topic".to_string(),
700 Topic {
701 name: "my-other-topic".to_string(),
702 schema: TopicSchema {
703 key: None,
704 value: SchemaSerDe {
705 converter: None,
706 type_: TypeRef {
707 name: "u16".to_string(),
708 },
709 },
710 },
711 consumer: None,
712 producer: None,
713 profile: None,
714 },
715 ),
716 (
717 "my-third-topic".to_string(),
718 Topic {
719 name: "my-other-topic".to_string(),
720 schema: TopicSchema {
721 key: None,
722 value: SchemaSerDe {
723 converter: None,
724 type_: TypeRef {
725 name: "u16".to_string(),
726 },
727 },
728 },
729 consumer: None,
730 producer: None,
731 profile: None,
732 },
733 ),
734 (
735 "my-topic-with-key".to_string(),
736 Topic {
737 name: "my-topic-with-key".to_string(),
738 schema: TopicSchema {
739 key: Some(SchemaSerDe {
740 converter: None,
741 type_: TypeRef {
742 name: "string".to_string(),
743 },
744 }),
745 value: SchemaSerDe {
746 converter: None,
747 type_: TypeRef {
748 name: "u8".to_string(),
749 },
750 },
751 },
752 consumer: None,
753 producer: None,
754 profile: None,
755 },
756 ),
757 (
758 "my-topic-with-another-key".to_string(),
759 Topic {
760 name: "my-topic-with-another-key".to_string(),
761 schema: TopicSchema {
762 key: Some(SchemaSerDe {
763 converter: None,
764 type_: TypeRef {
765 name: "bytes".to_string(),
766 },
767 }),
768 value: SchemaSerDe {
769 converter: None,
770 type_: TypeRef {
771 name: "u8".to_string(),
772 },
773 },
774 },
775 consumer: None,
776 producer: None,
777 profile: None,
778 },
779 ),
780 ]
781 }
782
783 fn function() -> StepInvocation {
784 StepInvocation {
785 uses: "cat_map_cat".to_string(),
786 inputs: vec![NamedParameter {
787 name: "cat".to_string(),
788 type_: TypeRef {
789 name: "string".to_string(),
790 },
791 optional: false,
792 kind: ParameterKind::Value,
793 }],
794 output: Some(Parameter {
795 type_: OutputType::Ref(TypeRef {
796 name: "string".to_string(),
797 }),
798 ..Default::default()
799 }),
800 ..Default::default()
801 }
802 }
803
804 fn service_with_window() -> Operations {
805 let sources = vec![IoRef {
806 type_: IoType::Topic,
807 id: "listing".to_string(),
808 steps: vec![],
809 }];
810 let sinks = vec![IoRef {
811 type_: IoType::Topic,
812 id: "prospect".to_string(),
813 steps: vec![],
814 }];
815 let transforms = Transforms {
816 steps: vec![
817 TransformOperator::FilterMap(StepInvocation {
818 uses: "listing_map_job".to_string(),
819 ..Default::default()
820 }),
821 TransformOperator::Map(StepInvocation {
822 uses: "job_map_prospect".to_string(),
823 ..Default::default()
824 }),
825 ],
826 };
827
828 let post_transforms = Some(PostTransforms::AssignTimestamp(Window {
829 assign_timestamp: StepInvocation {
830 uses: "assign_timestamp".to_string(),
831 ..Default::default()
832 },
833 transforms: Transforms {
834 steps: vec![TransformOperator::Map(StepInvocation {
835 uses: "prospect_map_prospect".to_string(),
836 ..Default::default()
837 })],
838 },
839 partition: Some(PartitionOperator {
840 assign_key: StepInvocation {
841 uses: "assign_key".to_string(),
842 ..Default::default()
843 },
844 transforms: Transforms {
845 steps: vec![TransformOperator::Map(StepInvocation {
846 uses: "prospect_map_prospect2".to_string(),
847 ..Default::default()
848 })],
849 },
850 update_state: None,
851 }),
852 flush: Some(StepInvocation {
853 uses: "job_aggregate".to_string(),
854 ..Default::default()
855 }),
856 properties: WindowProperties {
857 kind: WindowKind::Tumbling(TumblingWindow {
858 duration: 3600000,
859 offset: 0,
860 }),
861 watermark_config: WatermarkConfig::default(),
862 },
863 }));
864
865 Operations {
866 name: "listing-to-prospect-op".to_string(),
867 sources,
868 sinks,
869 transforms,
870 post_transforms,
871 states: vec![],
872 }
873 }
874
875 fn service_with_partition() -> Operations {
876 let sources = vec![IoRef {
877 type_: IoType::Topic,
878 id: "listing".to_string(),
879 steps: vec![],
880 }];
881 let sinks = vec![IoRef {
882 type_: IoType::Topic,
883 id: "prospect".to_string(),
884 steps: vec![],
885 }];
886 let transforms = Transforms {
887 steps: vec![
888 TransformOperator::FilterMap(StepInvocation {
889 uses: "listing_map_job".to_string(),
890 ..Default::default()
891 }),
892 TransformOperator::Map(StepInvocation {
893 uses: "job_map_prospect".to_string(),
894 ..Default::default()
895 }),
896 ],
897 };
898
899 let post_transforms = Some(PostTransforms::Partition(PartitionOperator {
900 assign_key: StepInvocation {
901 uses: "assign_key".to_string(),
902 ..Default::default()
903 },
904 transforms: Transforms {
905 steps: vec![TransformOperator::Map(StepInvocation {
906 uses: "prospect_map_prospect2".to_string(),
907 ..Default::default()
908 })],
909 },
910 update_state: None,
911 }));
912
913 Operations {
914 name: "listing-to-prospect-op".to_string(),
915 sources,
916 sinks,
917 transforms,
918 post_transforms,
919 states: vec![],
920 }
921 }
922
923 #[test]
924 fn test_import_operator_configs_merges_operator_signatures() {
925 let mut operations = operations();
926
927 let source_steps = &operations.sources.first().unwrap().steps;
928 assert!(source_steps.first().unwrap().inner().inputs.is_empty());
929 assert!(source_steps.first().unwrap().inner().output.is_none());
930 assert!(source_steps.first().unwrap().inner().states.is_empty());
931
932 let sink_steps = &operations.sinks.first().unwrap().steps;
933 assert!(sink_steps.first().unwrap().inner().inputs.is_empty());
934 assert!(sink_steps.first().unwrap().inner().output.is_none());
935 assert!(sink_steps.first().unwrap().inner().states.is_empty());
936
937 let steps = &operations.transforms.steps;
938 assert!(steps.first().unwrap().inner().inputs.is_empty());
939 assert!(steps.first().unwrap().inner().output.is_none());
940 assert!(steps.first().unwrap().inner().states.is_empty());
941
942 assert!(operations.states.is_empty());
943
944 operations
945 .import_operator_configs(&imports(), &packages())
946 .unwrap();
947
948 let source_steps = &operations.sources.first().unwrap().steps;
949 assert_eq!(source_steps.first().unwrap().inner().inputs.len(), 1);
950 assert!(source_steps.first().unwrap().inner().output.is_some());
951 assert_eq!(source_steps.first().unwrap().inner().states.len(), 1);
952
953 let sink_steps = &operations.sinks.first().unwrap().steps;
954 assert_eq!(sink_steps.first().unwrap().inner().inputs.len(), 1);
955 assert!(sink_steps.first().unwrap().inner().output.is_some());
956 assert_eq!(sink_steps.first().unwrap().inner().states.len(), 1);
957
958 let steps = &operations.transforms.steps;
959 assert_eq!(steps.first().unwrap().inner().inputs.len(), 1);
960 assert!(steps.first().unwrap().inner().output.is_some());
961 assert_eq!(steps.first().unwrap().inner().states.len(), 1);
962
963 assert_eq!(operations.states.len(), 1);
964 }
965
966 #[test]
967 fn test_validate_rejects_service_without_name() {
968 let types = SdfTypesMap::default();
969 let service = Operations {
970 name: "".to_string(),
971 sources: vec![],
972 sinks: vec![],
973 transforms: Transforms { steps: vec![] },
974 post_transforms: None,
975 states: vec![],
976 };
977
978 let res = service
979 .validate(&types, &[], None)
980 .expect_err("should error for missing service name");
981
982 assert!(res.errors.contains(&ServiceValidationError::NameEmpty));
983
984 assert!(res.readable(0).contains(
985 r#"Service `` is invalid:
986 Service name cannot be empty
987"#
988 ));
989 }
990
991 #[test]
992 fn test_validate_validates_states() {
993 let types = SdfTypesMap::default();
994 let service = Operations {
995 name: "my-service".to_string(),
996 sources: vec![IoRef {
997 type_: IoType::Topic,
998 id: "my-topic".to_string(),
999 steps: vec![],
1000 }],
1001 sinks: vec![],
1002 transforms: Transforms { steps: vec![] },
1003 post_transforms: None,
1004 states: vec![State::Reference(StateRef {
1005 name: "my-state".to_string(),
1006 ref_service: "".to_string(),
1007 })],
1008 };
1009
1010 let res = service
1011 .validate(&types, &topics(), None)
1012 .expect_err("should error for invalid ref state");
1013
1014 assert!(res.errors.contains(&ServiceValidationError::InvalidState(ValidationError::new(
1015 "service name missing for state reference. state reference must be of the form <service>.<state>"
1016 ))));
1017 assert_eq!(
1018 res.readable(0),
1019 r#"Service `my-service` is invalid:
1020 State is invalid:
1021 service name missing for state reference. state reference must be of the form <service>.<state>
1022"#
1023 );
1024 }
1025
1026 #[test]
1027 fn test_validate_rejects_service_without_sources() {
1028 let types = SdfTypesMap::default();
1029 let service = Operations {
1030 name: "my-service".to_string(),
1031 sources: vec![],
1032 sinks: vec![],
1033 transforms: Transforms { steps: vec![] },
1034 post_transforms: None,
1035 states: vec![],
1036 };
1037
1038 let res = service
1039 .validate(&types, &[], None)
1040 .expect_err("should error for missing sources");
1041
1042 assert!(res.errors.contains(&ServiceValidationError::NoSources));
1043 assert_eq!(
1044 res.readable(0),
1045 r#"Service `my-service` is invalid:
1046 Service must have at least one source
1047"#
1048 );
1049 }
1050
1051 #[test]
1052 fn test_validate_sources_validates_each_source() {
1053 let types = SdfTypesMap::default();
1054 let service = Operations {
1055 name: "my-service".to_string(),
1056 sources: vec![IoRef {
1057 type_: IoType::Topic,
1058 id: "my-source-topic".to_string(),
1059 steps: vec![],
1060 }],
1061 sinks: vec![],
1062 transforms: Transforms { steps: vec![] },
1063 post_transforms: None,
1064 states: vec![],
1065 };
1066
1067 let res = service
1068 .validate(&types, &[], None)
1069 .expect_err("should error for missing sources");
1070
1071 assert!(res
1072 .errors
1073 .contains(&ServiceValidationError::MissingSourceTopic(
1074 "my-source-topic".to_string()
1075 )));
1076 assert_eq!(
1077 res.readable(0),
1078 r#"Service `my-service` is invalid:
1079 Source topic `my-source-topic` not found
1080"#
1081 );
1082 }
1083
1084 #[test]
1085 fn test_validate_rejects_sources_when_key_types_differ() {
1086 let topics = topics();
1087 let types = SdfTypesMap::default();
1088
1089 let service = Operations {
1090 name: "my-service".to_string(),
1091 sources: vec![
1092 IoRef {
1093 type_: IoType::Topic,
1094 id: "my-topic".to_string(),
1095 steps: vec![],
1096 },
1097 IoRef {
1098 type_: IoType::Topic,
1099 id: "my-topic-with-key".to_string(),
1100 steps: vec![],
1101 },
1102 IoRef {
1103 type_: IoType::Topic,
1104 id: "my-topic-with-another-key".to_string(),
1105 steps: vec![],
1106 },
1107 ],
1108 sinks: vec![],
1109 transforms: Transforms { steps: vec![] },
1110 post_transforms: None,
1111 states: vec![],
1112 };
1113
1114 let res = service
1115 .validate(&types, &topics, None)
1116 .expect_err("should error for source type mismatch");
1117
1118 assert!(res.errors.contains(&ServiceValidationError::SourceTypeMismatch(
1119 "my-topic: u8(value), my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)".to_string()
1120 )));
1121 assert_eq!(
1122 res.readable(0),
1123 r#"Service `my-service` is invalid:
1124 Sources for service must be identical, but the sources had the following types:
1125 my-topic: u8(value), my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)
1126"#
1127 );
1128 }
1129
1130 #[test]
1131 fn test_validate_rejects_sources_when_type_values_differ() {
1132 let topics = topics();
1133 let types = SdfTypesMap::default();
1134
1135 let service = Operations {
1136 name: "my-service".to_string(),
1137 sources: vec![
1138 IoRef {
1139 type_: IoType::Topic,
1140 id: "my-topic".to_string(),
1141 steps: vec![],
1142 },
1143 IoRef {
1144 type_: IoType::Topic,
1145 id: "my-other-topic".to_string(),
1146 steps: vec![],
1147 },
1148 ],
1149 sinks: vec![],
1150 transforms: Transforms { steps: vec![] },
1151 post_transforms: None,
1152 states: vec![],
1153 };
1154
1155 let res = service
1156 .validate(&types, &topics, None)
1157 .expect_err("should error for source type mismatch");
1158
1159 assert!(res
1160 .errors
1161 .contains(&ServiceValidationError::SourceTypeMismatch(
1162 "my-topic: u8(value), my-other-topic: u16(value)".to_string()
1163 )));
1164
1165 let service = Operations {
1167 name: "my-service".to_string(),
1168 sources: vec![
1169 IoRef {
1170 type_: IoType::Topic,
1171 id: "my-topic".to_string(),
1172 steps: vec![],
1173 },
1174 IoRef {
1175 type_: IoType::Topic,
1176 id: "my-other-topic".to_string(),
1177 steps: vec![TransformOperator::Map(StepInvocation {
1178 uses: "my-function".to_string(),
1179 inputs: vec![NamedParameter {
1180 name: "input".to_string(),
1181 type_: TypeRef {
1182 name: "u16".to_string(),
1183 },
1184 optional: false,
1185 kind: ParameterKind::Value,
1186 }],
1187 output: Some(Parameter {
1188 type_: TypeRef {
1189 name: "string".to_string(),
1190 }
1191 .into(),
1192 ..Default::default()
1193 }),
1194 ..Default::default()
1195 })],
1196 },
1197 ],
1198 sinks: vec![],
1199 transforms: Transforms { steps: vec![] },
1200 post_transforms: None,
1201 states: vec![],
1202 };
1203
1204 let res = service
1205 .validate(&types, &topics, None)
1206 .expect_err("should error for source type mismatch");
1207
1208 assert!(res
1209 .errors
1210 .contains(&ServiceValidationError::SourceTypeMismatch(
1211 "my-topic: u8(value), my-other-topic: string(value)".to_string()
1212 )));
1213
1214 assert_eq!(
1215 res.readable(0),
1216 r#"Service `my-service` is invalid:
1217 Sources for service must be identical, but the sources had the following types:
1218 my-topic: u8(value), my-other-topic: string(value)
1219"#
1220 );
1221 }
1222
1223 #[test]
1224 fn test_validate_accepts_valid_sources() {
1225 let topics = topics();
1226 let types = SdfTypesMap::default();
1227
1228 let service = Operations {
1229 name: "my-service".to_string(),
1230 sources: vec![
1231 IoRef {
1232 type_: IoType::Topic,
1233 id: "my-topic".to_string(),
1234 steps: vec![],
1235 },
1236 IoRef {
1237 type_: IoType::Topic,
1238 id: "my-other-topic".to_string(),
1239 steps: vec![TransformOperator::Map(StepInvocation {
1240 uses: "my-function".to_string(),
1241 inputs: vec![NamedParameter {
1242 name: "input".to_string(),
1243 type_: TypeRef {
1244 name: "u16".to_string(),
1245 },
1246 optional: false,
1247 kind: ParameterKind::Value,
1248 }],
1249 output: Some(Parameter {
1250 type_: TypeRef {
1251 name: "u8".to_string(),
1252 }
1253 .into(),
1254 ..Default::default()
1255 }),
1256 ..Default::default()
1257 })],
1258 },
1259 ],
1260 sinks: vec![],
1261 transforms: Transforms { steps: vec![] },
1262 post_transforms: None,
1263 states: vec![],
1264 };
1265
1266 service
1267 .validate(&types, &topics, None)
1268 .expect("should validate")
1269 }
1270
1271 #[test]
1272 fn test_validate_sinks_validates_each_sink() {
1273 let types = SdfTypesMap::default();
1274 let service = Operations {
1275 name: "my-service".to_string(),
1276 sources: vec![IoRef {
1277 type_: IoType::Topic,
1278 id: "my-topic".to_string(),
1279 steps: vec![],
1280 }],
1281 sinks: vec![IoRef {
1282 type_: IoType::Topic,
1283 id: "my-sink-topic".to_string(),
1284 steps: vec![],
1285 }],
1286 transforms: Transforms { steps: vec![] },
1287 post_transforms: None,
1288 states: vec![],
1289 };
1290
1291 let res = service
1292 .validate(&types, &topics(), None)
1293 .expect_err("should error for missing sink topic");
1294
1295 assert!(res.errors.contains(&ServiceValidationError::InvalidSink(
1296 IoRefValidationFailure {
1297 name: "my-sink-topic".to_string(),
1298 errors: vec![IoRefValidationError::InvalidRef(
1299 "my-sink-topic".to_string()
1300 )]
1301 }
1302 )));
1303
1304 assert_eq!(
1305 res.readable(0),
1306 r#"Service `my-service` is invalid:
1307 Sink `my-sink-topic` is invalid:
1308 Referenced topic `my-sink-topic` not found
1309"#
1310 );
1311 }
1312
1313 #[test]
1314 fn test_validate_rejects_sinks_when_key_types_differ() {
1315 let topics = topics();
1316 let types = SdfTypesMap::default();
1317 let service = Operations {
1318 name: "my-service".to_string(),
1319 sources: vec![IoRef {
1320 type_: IoType::Topic,
1321 id: "my-topic".to_string(),
1322 steps: vec![],
1323 }],
1324 sinks: vec![
1325 IoRef {
1326 type_: IoType::Topic,
1327 id: "my-topic-with-key".to_string(),
1328 steps: vec![],
1329 },
1330 IoRef {
1331 type_: IoType::Topic,
1332 id: "my-topic-with-another-key".to_string(),
1333 steps: vec![],
1334 },
1335 ],
1336 transforms: Transforms { steps: vec![] },
1337 post_transforms: None,
1338 states: vec![],
1339 };
1340
1341 let res = service
1342 .validate(&types, &topics, None)
1343 .expect_err("should error for sink type mismatch");
1344
1345 assert!(res.errors.contains(&ServiceValidationError::SinkTypeMismatch(
1346 "my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)".to_string()
1347 )));
1348
1349 assert_eq!(
1350 res.readable(0),
1351 r#"Service `my-service` is invalid:
1352 Sinks for service must be identical, but the sinks had the following types:
1353 my-topic-with-key: string(key) - u8(value), my-topic-with-another-key: bytes(key) - u8(value)
1354"#
1355 );
1356 }
1357
1358 #[test]
1359 fn test_validate_rejects_sinks_when_value_types_differ() {
1360 let topics = topics();
1361 let types = SdfTypesMap::default();
1362
1363 let service = Operations {
1364 name: "my-service".to_string(),
1365 sources: vec![IoRef {
1366 type_: IoType::Topic,
1367 id: "my-topic".to_string(),
1368 steps: vec![],
1369 }],
1370 sinks: vec![
1371 IoRef {
1372 type_: IoType::Topic,
1373 id: "my-topic".to_string(),
1374 steps: vec![],
1375 },
1376 IoRef {
1377 type_: IoType::Topic,
1378 id: "my-other-topic".to_string(),
1379 steps: vec![],
1380 },
1381 ],
1382 transforms: Transforms { steps: vec![] },
1383 post_transforms: None,
1384 states: vec![],
1385 };
1386
1387 let res = service
1388 .validate(&types, &topics, None)
1389 .expect_err("should error for sink type mismatch");
1390
1391 assert!(res
1392 .errors
1393 .contains(&ServiceValidationError::SinkTypeMismatch(
1394 "my-topic: u8(value), my-other-topic: u16(value)".to_string()
1395 )));
1396
1397 assert_eq!(
1398 res.readable(0),
1399 r#"Service `my-service` is invalid:
1400 Sink `my-other-topic` is invalid:
1401 Transforms block is invalid:
1402 service output type `u8` does not match sink input type `u16`
1403 Sinks for service must be identical, but the sinks had the following types:
1404 my-topic: u8(value), my-other-topic: u16(value)
1405"#
1406 );
1407 }
1408
1409 #[test]
1410 fn test_validate_accepts_valid_sinks() {
1411 let topics = topics();
1412 let types = SdfTypesMap::default();
1413
1414 let service = Operations {
1415 name: "my-service".to_string(),
1416 sources: vec![IoRef {
1417 type_: IoType::Topic,
1418 id: "my-topic".to_string(),
1419 steps: vec![],
1420 }],
1421 sinks: vec![
1422 IoRef {
1423 type_: IoType::Topic,
1424 id: "my-topic".to_string(),
1425 steps: vec![],
1426 },
1427 IoRef {
1428 type_: IoType::Topic,
1429 id: "my-other-topic".to_string(),
1430 steps: vec![TransformOperator::Map(StepInvocation {
1431 uses: "my-function".to_string(),
1432 inputs: vec![NamedParameter {
1433 name: "input".to_string(),
1434 type_: TypeRef {
1435 name: "u8".to_string(),
1436 },
1437 optional: false,
1438 kind: ParameterKind::Value,
1439 }],
1440 output: Some(Parameter {
1441 type_: TypeRef {
1442 name: "u16".to_string(),
1443 }
1444 .into(),
1445 ..Default::default()
1446 }),
1447 ..Default::default()
1448 })],
1449 },
1450 ],
1451 transforms: Transforms { steps: vec![] },
1452 post_transforms: None,
1453 states: vec![],
1454 };
1455
1456 service
1457 .validate(&types, &topics, None)
1458 .expect("should validate")
1459 }
1460
1461 #[test]
1462 fn test_validate_validates_transforms() {
1463 let topics = topics();
1464 let types = SdfTypesMap::default();
1465
1466 let service = Operations {
1467 name: "my-service".to_string(),
1468 sources: vec![IoRef {
1469 type_: IoType::Topic,
1470 id: "my-topic".to_string(),
1471 steps: vec![],
1472 }],
1473 transforms: Transforms {
1474 steps: vec![TransformOperator::Map(StepInvocation {
1475 uses: "my-function".to_string(),
1476 inputs: vec![NamedParameter {
1477 name: "input".to_string(),
1478 type_: TypeRef {
1479 name: "u8".to_string(),
1480 },
1481 optional: false,
1482 kind: ParameterKind::Value,
1483 }],
1484 output: Some(Parameter {
1485 type_: TypeRef {
1486 name: "foobar".to_string(),
1487 }
1488 .into(),
1489 ..Default::default()
1490 }),
1491 ..Default::default()
1492 })],
1493 },
1494 sinks: vec![],
1495 post_transforms: None,
1496 states: vec![],
1497 };
1498
1499 let res = service
1500 .validate(&types, &topics, None)
1501 .expect_err("should fail for output type not in scope");
1502
1503 assert!(res.errors.contains(&ServiceValidationError::InvalidTransformsSteps(
1504 ValidationFailure {
1505 errors: vec![ValidationError::new("function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types")]
1506 }
1507 )));
1508
1509 assert_eq!(
1510 res.readable(0),
1511 r#"Service `my-service` is invalid:
1512 Transforms block is invalid:
1513 function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types
1514"#
1515 );
1516 }
1517
1518 #[test]
1519 fn test_validate_transforms_with_different_keys() {
1520 let topics = topics();
1521 let types = SdfTypesMap::default();
1522 let service = Operations {
1523 name: "my-service".to_string(),
1524 sources: vec![IoRef {
1525 type_: IoType::Topic,
1526 id: "my-topic-with-key".to_string(),
1527 steps: vec![],
1528 }],
1529 sinks: vec![],
1530 transforms: Transforms {
1531 steps: vec![
1532 TransformOperator::Map(StepInvocation {
1533 uses: "my-function".to_string(),
1534 inputs: vec![NamedParameter {
1535 name: "input".to_string(),
1536 type_: TypeRef {
1537 name: "u8".to_string(),
1538 },
1539 optional: false,
1540 kind: ParameterKind::Value,
1541 }],
1542 output: Some(Parameter {
1543 type_: TypeRef {
1544 name: "string".to_string(),
1545 }
1546 .into(),
1547 ..Default::default()
1548 }),
1549 ..Default::default()
1550 }),
1551 TransformOperator::Filter(StepInvocation {
1552 uses: "my-other-function".to_string(),
1553 inputs: vec![
1554 NamedParameter {
1555 name: "key".to_string(),
1556 type_: TypeRef {
1557 name: "bytes".to_string(),
1558 },
1559 optional: false,
1560 kind: ParameterKind::Key,
1561 },
1562 NamedParameter {
1563 name: "input".to_string(),
1564 type_: TypeRef {
1565 name: "string".to_string(),
1566 },
1567 optional: false,
1568 kind: ParameterKind::Value,
1569 },
1570 ],
1571 output: Some(Parameter {
1572 type_: TypeRef {
1573 name: "bool".to_string(),
1574 }
1575 .into(),
1576 ..Default::default()
1577 }),
1578 ..Default::default()
1579 }),
1580 ],
1581 },
1582
1583 states: vec![],
1584 post_transforms: None,
1585 };
1586
1587 let res = service
1588 .validate(&types, &topics, None)
1589 .expect_err("should fail for steps with different keys");
1590
1591 assert!(res
1592 .errors
1593 .contains(&ServiceValidationError::InvalidTransformsSteps(
1594 ValidationFailure {
1595 errors: vec![ValidationError::new(
1596 "in `my-other-function`, key type does not match expected key type. bytes != string"
1597 )],
1598 }
1599 )));
1600
1601 assert_eq!(
1602 res.readable(0),
1603 r#"Service `my-service` is invalid:
1604 Transforms block is invalid:
1605 in `my-other-function`, key type does not match expected key type. bytes != string
1606"#
1607 );
1608 }
1609
1610 #[test]
1611 fn test_validate_validates_partition() {
1612 let topics = topics();
1613 let types = SdfTypesMap::default();
1614
1615 let service = Operations {
1616 name: "my-service".to_string(),
1617 sources: vec![IoRef {
1618 type_: IoType::Topic,
1619 id: "my-topic".to_string(),
1620 steps: vec![],
1621 }],
1622 transforms: Transforms { steps: vec![] },
1623 sinks: vec![],
1624 post_transforms: Some(PostTransforms::Partition(PartitionOperator {
1625 assign_key: StepInvocation {
1626 uses: "my-assign-key".to_string(),
1627 inputs: vec![NamedParameter {
1628 name: "input".to_string(),
1629 type_: TypeRef {
1630 name: "u8".to_string(),
1631 },
1632 optional: false,
1633 kind: ParameterKind::Value,
1634 }],
1635 output: Some(Parameter {
1636 type_: TypeRef {
1637 name: "string".to_string(),
1638 }
1639 .into(),
1640 ..Default::default()
1641 }),
1642 ..Default::default()
1643 },
1644 transforms: Transforms {
1645 steps: vec![TransformOperator::Map(StepInvocation {
1646 uses: "my-function".to_string(),
1647 inputs: vec![NamedParameter {
1648 name: "input".to_string(),
1649 type_: TypeRef {
1650 name: "u8".to_string(),
1651 },
1652 optional: false,
1653 kind: ParameterKind::Value,
1654 }],
1655 output: Some(Parameter {
1656 type_: TypeRef {
1657 name: "foobar".to_string(),
1658 }
1659 .into(),
1660 ..Default::default()
1661 }),
1662 ..Default::default()
1663 })],
1664 },
1665 update_state: None,
1666 })),
1667 states: vec![],
1668 };
1669
1670 let res = service
1671 .validate(&types, &topics, None)
1672 .expect_err("should fail for output type not in scope");
1673
1674 assert!(res.errors.contains(&ServiceValidationError::InvalidPostTransforms(
1675 ValidationFailure {
1676 errors: vec![ValidationError::new("Partition transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types")]
1677 }
1678 )));
1679
1680 assert_eq!(
1681 res.readable(0),
1682 r#"Service `my-service` is invalid:
1683 Partition transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types
1684"#
1685 );
1686 }
1687
1688 #[test]
1689 fn test_validate_validates_window() {
1690 let topics = topics();
1691 let types = SdfTypesMap::default();
1692
1693 let service = Operations {
1694 name: "my-service".to_string(),
1695 sources: vec![IoRef {
1696 type_: IoType::Topic,
1697 id: "my-topic".to_string(),
1698 steps: vec![],
1699 }],
1700 transforms: Transforms { steps: vec![] },
1701 sinks: vec![],
1702 post_transforms: Some(PostTransforms::AssignTimestamp(Window {
1703 transforms: Transforms {
1704 steps: vec![TransformOperator::Map(StepInvocation {
1705 uses: "my-function".to_string(),
1706 inputs: vec![NamedParameter {
1707 name: "input".to_string(),
1708 type_: TypeRef {
1709 name: "u8".to_string(),
1710 },
1711 optional: false,
1712 kind: ParameterKind::Value,
1713 }],
1714 output: Some(Parameter {
1715 type_: TypeRef {
1716 name: "foobar".to_string(),
1717 }
1718 .into(),
1719 ..Default::default()
1720 }),
1721 ..Default::default()
1722 })],
1723 },
1724 ..Default::default()
1725 })),
1726 states: vec![],
1727 };
1728
1729 let res = service
1730 .validate(&types, &topics, None)
1731 .expect_err("should fail for output type not in scope");
1732 println!("{:#?}", res);
1733
1734 assert!(res.errors.iter().any(|e|{
1735 if let ServiceValidationError::InvalidPostTransforms(failure) = e {
1736 if failure.errors.contains(
1737 &ValidationError::new("Window transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types")
1738 ) {
1739 return true;
1740 }
1741 }
1742
1743 false
1744 }));
1745
1746 assert_eq!(
1747 res.readable(0),
1748 r#"Service `my-service` is invalid:
1749 Window assign-timestamp type function `` should have exactly 2 input type, found 0
1750 Window assign-timestamp type function `` requires an output type
1751 Window transforms block is invalid: function `my-function` has invalid output type, Referenced type `foobar` not found in config or imported types
1752"#
1753 );
1754 }
1755
1756 fn map_fn(name: String, input_type: String, output_type: String) -> TransformOperator {
1757 TransformOperator::Map(StepInvocation {
1758 uses: name,
1759 inputs: vec![NamedParameter {
1760 name: "input".to_string(),
1761 type_: TypeRef { name: input_type },
1762 optional: false,
1763 kind: ParameterKind::Value,
1764 }],
1765 output: Some(Parameter {
1766 type_: TypeRef { name: output_type }.into(),
1767 ..Default::default()
1768 }),
1769 ..Default::default()
1770 })
1771 }
1772
1773 fn assign_timestamp_fn(input_type: String) -> StepInvocation {
1774 StepInvocation {
1775 uses: "assign-timestamp".to_string(),
1776 inputs: vec![
1777 NamedParameter {
1778 name: "input".to_string(),
1779 type_: TypeRef { name: input_type },
1780 optional: false,
1781 kind: ParameterKind::Value,
1782 },
1783 NamedParameter {
1784 name: "timestamp".to_string(),
1785 type_: TypeRef {
1786 name: "i64".to_string(),
1787 },
1788 optional: false,
1789 kind: ParameterKind::Value,
1790 },
1791 ],
1792 output: Some(Parameter {
1793 type_: TypeRef {
1794 name: "i64".to_string(),
1795 }
1796 .into(),
1797 ..Default::default()
1798 }),
1799 ..Default::default()
1800 }
1801 }
1802
1803 fn assign_key_fn(input_type: String) -> StepInvocation {
1804 StepInvocation {
1805 uses: "assign-timestamp".to_string(),
1806 inputs: vec![NamedParameter {
1807 name: "input".to_string(),
1808 type_: TypeRef { name: input_type },
1809 optional: false,
1810 kind: ParameterKind::Value,
1811 }],
1812 output: Some(Parameter {
1813 type_: TypeRef {
1814 name: "string".to_string(),
1815 }
1816 .into(),
1817 ..Default::default()
1818 }),
1819 ..Default::default()
1820 }
1821 }
1822
1823 fn window_aggregate_fn(output_type: String) -> StepInvocation {
1824 StepInvocation {
1825 uses: "window-aggregate".to_string(),
1826 inputs: vec![],
1827 output: Some(Parameter {
1828 type_: TypeRef { name: output_type }.into(),
1829 ..Default::default()
1830 }),
1831 ..Default::default()
1832 }
1833 }
1834
1835 #[test]
1836 fn test_types_validate_throughout() {
1837 let topics = topics();
1838 let types = SdfTypesMap::default();
1839
1840 let service = Operations {
1841 name: "my-service".to_string(),
1842 sources: vec![IoRef {
1843 type_: IoType::Topic,
1844 id: "my-topic".to_string(),
1845 steps: vec![map_fn("1".to_string(), "u8".to_string(), "u32".to_string())],
1846 }],
1847 transforms: Transforms {
1848 steps: vec![map_fn(
1849 "2".to_string(),
1850 "u32".to_string(),
1851 "u64".to_string(),
1852 )],
1853 },
1854 post_transforms: Some(PostTransforms::AssignTimestamp(Window {
1855 assign_timestamp: assign_timestamp_fn("u64".to_string()),
1856 transforms: Transforms {
1857 steps: vec![map_fn("3".to_string(), "u64".to_string(), "i8".to_string())],
1858 },
1859 partition: Some(PartitionOperator {
1860 assign_key: assign_key_fn("i8".to_string()),
1861 transforms: Transforms {
1862 steps: vec![map_fn("4".to_string(), "i8".to_string(), "i16".to_string())],
1863 },
1864 update_state: None,
1865 }),
1866 flush: Some(window_aggregate_fn("u16".to_string())),
1867 ..Default::default()
1868 })),
1869 sinks: vec![IoRef {
1870 type_: IoType::Topic,
1871 id: "my-other-topic".to_string(),
1872 steps: vec![],
1873 }],
1874 states: vec![],
1875 };
1876
1877 service
1878 .validate(&types, &topics, None)
1879 .expect("should validate");
1880 }
1881
1882 #[test]
1883 fn test_operators() {
1884 let operations = Operations {
1885 name: "my-service".to_string(),
1886 sources: vec![IoRef {
1887 type_: IoType::Topic,
1888 id: "my-topic".to_string(),
1889 steps: vec![map_fn(
1890 "my-step".to_string(),
1891 "u8".to_string(),
1892 "u32".to_string(),
1893 )],
1894 }],
1895 transforms: Transforms {
1896 steps: vec![map_fn(
1897 "my-map".to_string(),
1898 "u32".to_string(),
1899 "u64".to_string(),
1900 )],
1901 },
1902 post_transforms: None,
1903 sinks: vec![IoRef {
1904 type_: IoType::Topic,
1905 id: "my-other-topic".to_string(),
1906 steps: vec![],
1907 }],
1908 states: vec![],
1909 };
1910
1911 let ops = operations.operators();
1912
1913 assert_eq!(ops.len(), 1);
1914 assert_eq!(ops[0].0.uses, "my-map");
1915 }
1916
1917 #[test]
1918 fn test_validate_types_when_there_are_no_transforms_steps() {
1919 let topics = topics();
1920 let types = SdfTypesMap::default();
1921
1922 let service = Operations {
1923 name: "my-service".to_string(),
1924 sources: vec![IoRef {
1925 type_: IoType::Topic,
1926 id: "my-topic".to_string(),
1927 steps: vec![],
1928 }],
1929 transforms: Transforms { steps: vec![] },
1930 post_transforms: None,
1931 sinks: vec![IoRef {
1932 type_: IoType::Topic,
1933 id: "my-other-topic".to_string(),
1934 steps: vec![],
1935 }],
1936 states: vec![],
1937 };
1938
1939 let res = service
1940 .validate(&types, &topics, None)
1941 .expect_err("should validate");
1942
1943 assert!(res.errors.contains(&ServiceValidationError::InvalidSink(
1944 IoRefValidationFailure {
1945 name: "my-other-topic".to_string(),
1946 errors: vec![IoRefValidationError::InvalidTransformsBlock(vec![
1947 ValidationError::new(
1948 "service output type `u8` does not match sink input type `u16`"
1949 )
1950 ])]
1951 }
1952 )));
1953
1954 assert_eq!(
1955 res.readable(0),
1956 r#"Service `my-service` is invalid:
1957 Sink `my-other-topic` is invalid:
1958 Transforms block is invalid:
1959 service output type `u8` does not match sink input type `u16`
1960"#
1961 );
1962 }
1963
1964 #[test]
1965 fn test_validate_topic_without_key_with_function_that_requires_key() {
1966 let topics = topics();
1967 let types = SdfTypesMap::default();
1968
1969 let service = Operations {
1970 name: "my-service".to_string(),
1971 sources: vec![IoRef {
1972 type_: IoType::Topic,
1973 id: "my-topic".to_string(),
1974 steps: vec![],
1975 }],
1976 transforms: Transforms {
1977 steps: vec![TransformOperator::Map(StepInvocation {
1978 uses: "my-function".to_string(),
1979 inputs: vec![
1980 NamedParameter {
1981 name: "k".to_string(),
1982 type_: TypeRef {
1983 name: "string".to_string(),
1984 },
1985 optional: false,
1986 kind: ParameterKind::Key,
1987 },
1988 NamedParameter {
1989 name: "value".to_string(),
1990 type_: TypeRef {
1991 name: "u8".to_string(),
1992 },
1993 optional: false,
1994 kind: ParameterKind::Value,
1995 },
1996 ],
1997 output: Some(Parameter {
1998 type_: TypeRef {
1999 name: "u8".to_string(),
2000 }
2001 .into(),
2002 ..Default::default()
2003 }),
2004 ..Default::default()
2005 })],
2006 },
2007 post_transforms: None,
2008 sinks: vec![IoRef {
2009 type_: IoType::Topic,
2010 id: "my-other-topic".to_string(),
2011 steps: vec![],
2012 }],
2013 states: vec![],
2014 };
2015
2016 let resp = service
2017 .validate(&types, &topics, None)
2018 .expect_err("should validate");
2019
2020 assert!(
2021 resp.errors.contains(&ServiceValidationError::InvalidTransformsSteps(
2022 ValidationFailure {
2023 errors: vec![ValidationError::new(
2024 "my-function function requires a key, but none was found. Make sure that you define the right key in the topic configuration"
2025 )]
2026 }
2027 ))
2028 );
2029 }
2030
2031 #[test]
2032 fn test_add_operator() {
2033 let mut service = service_with_window();
2034
2035 let operator_placement = OperatorPlacement {
2036 service_id: "listing-to-prospect-op".to_string(),
2037 transforms_index: Some(2),
2038 ..Default::default()
2039 };
2040
2041 let function = function();
2042
2043 service
2044 .add_operator(OperatorType::Map, operator_placement, function.clone())
2045 .expect("Failed to add imported operator");
2046
2047 let result_operator = service.transforms.steps[2].clone();
2048
2049 assert_eq!(result_operator, TransformOperator::Map(function));
2050 }
2051
2052 #[test]
2053 fn test_add_partition_transforms_operator() {
2054 let mut service = service_with_partition();
2055
2056 let operator_placement = OperatorPlacement {
2057 service_id: "listing-to-prospect-op".to_string(),
2058 transforms_index: Some(1),
2059 partition: true,
2060 window: false,
2061 };
2062
2063 let function = function();
2064
2065 service
2066 .add_operator(OperatorType::Map, operator_placement, function.clone())
2067 .expect("Failed to add imported operator");
2068
2069 let result_operator = match service.post_transforms {
2070 Some(PostTransforms::Partition(partition)) => partition.transforms.steps[1].clone(),
2071 _ => panic!("expected partition"),
2072 };
2073
2074 assert_eq!(result_operator, TransformOperator::Map(function));
2075 }
2076
2077 #[test]
2078 fn test_add_partition_operator_when_window_incorrectly_specified() {
2079 let mut service = service_with_window();
2080
2081 let operator_placement = OperatorPlacement {
2082 service_id: "listing-to-prospect-op".to_string(),
2083 window: false,
2084 partition: true,
2085 transforms_index: Some(0),
2086 };
2087
2088 let function = function();
2089
2090 let res = service.add_operator(OperatorType::Map, operator_placement, function);
2091
2092 assert!(res.is_err());
2093 assert_eq!(
2094 res.unwrap_err().to_string(),
2095 "Cannot add operator. Service does not have top level partition. To delete operator from window partition, please specify window"
2096 )
2097 }
2098
2099 #[test]
2100 fn test_add_partition_operator_with_no_partition() {
2101 let mut service = Operations {
2102 name: "listing-to-prospect-op".to_string(),
2103 sources: vec![],
2104 sinks: vec![],
2105 transforms: Transforms { steps: vec![] },
2106 post_transforms: None,
2107 states: vec![],
2108 };
2109
2110 let operator_placement = OperatorPlacement {
2111 service_id: "listing-to-prospect-op".to_string(),
2112 window: false,
2113 partition: true,
2114 transforms_index: Some(0),
2115 };
2116
2117 let function = function();
2118
2119 let res = service.add_operator(OperatorType::Map, operator_placement, function);
2120
2121 assert!(res.is_err());
2122 assert_eq!(
2123 res.unwrap_err().to_string(),
2124 "Cannot add operator. Parition was specified but service does not have a partition"
2125 )
2126 }
2127
2128 #[test]
2129 fn test_add_window_operator() {
2130 let mut service = service_with_window();
2131
2132 let operator_placement = OperatorPlacement {
2133 service_id: "listing-to-prospect-op".to_string(),
2134 window: true,
2135 partition: false,
2136 transforms_index: Some(0),
2137 };
2138
2139 let function = function();
2140 let res = service.add_operator(OperatorType::Map, operator_placement, function);
2141
2142 let post_transforms = service.post_transforms.as_ref().unwrap();
2143 let window = match post_transforms {
2144 PostTransforms::AssignTimestamp(w) => w,
2145 _ => panic!("expected window"),
2146 };
2147
2148 assert!(res.is_ok());
2149 assert_eq!(window.transforms.steps.len(), 2);
2150 }
2151
2152 #[test]
2153 fn test_add_window_operator_when_partition_but_no_window() {
2154 let mut service = service_with_partition();
2155
2156 let operator_placement = OperatorPlacement {
2157 service_id: "listing-to-prospect-op".to_string(),
2158 window: true,
2159 partition: false,
2160 transforms_index: Some(0),
2161 };
2162
2163 let function = function();
2164 let res = service.add_operator(OperatorType::Map, operator_placement, function);
2165
2166 assert!(res.is_err());
2167 assert_eq!(
2168 res.unwrap_err().to_string(),
2169 "Cannot add operator. Window was specified but service does not have a window"
2170 )
2171 }
2172
2173 #[test]
2174 fn test_add_window_operator_when_no_window() {
2175 let mut service = Operations {
2176 name: "listing-to-prospect-op".to_string(),
2177 sources: vec![],
2178 sinks: vec![],
2179 transforms: Transforms { steps: vec![] },
2180 post_transforms: None,
2181 states: vec![],
2182 };
2183
2184 let operator_placement = OperatorPlacement {
2185 service_id: "listing-to-prospect-op".to_string(),
2186 window: true,
2187 partition: false,
2188 transforms_index: Some(0),
2189 };
2190
2191 let function = function();
2192 let res = service.add_operator(OperatorType::Map, operator_placement, function);
2193
2194 assert!(res.is_err());
2195 assert_eq!(
2196 res.unwrap_err().to_string(),
2197 "Cannot add operator. Window was specified but service does not have a window"
2198 )
2199 }
2200
2201 #[test]
2223 fn test_delete_operator_deletes_op_from_transforms() {
2224 let mut service = service_with_window();
2225
2226 let operator_placement = OperatorPlacement {
2227 service_id: "listing-to-prospect-op".to_string(),
2228 window: false,
2229 partition: false,
2230 transforms_index: Some(0),
2231 };
2232
2233 let res = service.delete_operator(operator_placement);
2234
2235 assert!(res.is_ok());
2236 assert_eq!(service.transforms.steps.len(), 1);
2237 }
2238
2239 #[test]
2240 fn test_delete_operator_deletes_op_from_partition() {
2241 let mut service = service_with_partition();
2242
2243 let operator_placement = OperatorPlacement {
2244 service_id: "listing-to-prospect-op".to_string(),
2245 window: false,
2246 partition: true,
2247 transforms_index: Some(0),
2248 };
2249
2250 let res = service.delete_operator(operator_placement);
2251
2252 let post_transforms = service.post_transforms.as_ref().unwrap();
2253 let partition = match post_transforms {
2254 PostTransforms::Partition(p) => p,
2255 _ => panic!("expected partition"),
2256 };
2257
2258 assert!(res.is_ok());
2259 assert_eq!(partition.transforms.steps.len(), 0);
2260
2261 assert_eq!(service.transforms.steps.len(), 2)
2263 }
2264
2265 #[test]
2266 fn test_delete_partition_operator_when_window_incorrectly_specified() {
2267 let mut service = service_with_window();
2268
2269 let operator_placement = OperatorPlacement {
2270 service_id: "listing-to-prospect-op".to_string(),
2271 window: false,
2272 partition: true,
2273 transforms_index: Some(0),
2274 };
2275
2276 let res = service.delete_operator(operator_placement);
2277
2278 assert!(res.is_err());
2279 assert_eq!(
2280 res.unwrap_err().to_string(),
2281 "Cannot delete operator. Service does not have top level partition. To delete operator from window partition, please specify window"
2282 )
2283 }
2284
2285 #[test]
2286 fn test_delete_partition_operator_with_no_partition() {
2287 let mut service = Operations {
2288 name: "listing-to-prospect-op".to_string(),
2289 sources: vec![],
2290 sinks: vec![],
2291 transforms: Transforms { steps: vec![] },
2292 post_transforms: None,
2293 states: vec![],
2294 };
2295
2296 let operator_placement = OperatorPlacement {
2297 service_id: "listing-to-prospect-op".to_string(),
2298 window: false,
2299 partition: true,
2300 transforms_index: Some(0),
2301 };
2302
2303 let res = service.delete_operator(operator_placement);
2304
2305 assert!(res.is_err());
2306 assert_eq!(
2307 res.unwrap_err().to_string(),
2308 "Cannot delete operator. Parition was specified but service does not have a partition"
2309 )
2310 }
2311
2312 #[test]
2313 fn test_delete_operator_deletes_op_from_window() {
2314 let mut service = service_with_window();
2315
2316 let operator_placement = OperatorPlacement {
2317 service_id: "listing-to-prospect-op".to_string(),
2318 window: true,
2319 partition: false,
2320 transforms_index: Some(0),
2321 };
2322
2323 let res = service.delete_operator(operator_placement);
2324
2325 let post_transforms = service.post_transforms.as_ref().unwrap();
2326 let window = match post_transforms {
2327 PostTransforms::AssignTimestamp(w) => w,
2328 _ => panic!("expected window"),
2329 };
2330
2331 assert!(res.is_ok());
2332 assert_eq!(window.transforms.steps.len(), 0);
2333
2334 assert_eq!(service.transforms.steps.len(), 2)
2336 }
2337
2338 #[test]
2339 fn test_delete_window_operator_when_partition_but_no_window() {
2340 let mut service = service_with_partition();
2341
2342 let operator_placement = OperatorPlacement {
2343 service_id: "listing-to-prospect-op".to_string(),
2344 window: true,
2345 partition: false,
2346 transforms_index: Some(0),
2347 };
2348
2349 let res = service.delete_operator(operator_placement);
2350
2351 assert!(res.is_err());
2352 assert_eq!(
2353 res.unwrap_err().to_string(),
2354 "Cannot delete operator. Window was specified but service does not have a window"
2355 )
2356 }
2357
2358 #[test]
2359 fn test_delete_window_operator_when_no_window() {
2360 let mut service = Operations {
2361 name: "listing-to-prospect-op".to_string(),
2362 sources: vec![],
2363 sinks: vec![],
2364 transforms: Transforms { steps: vec![] },
2365 post_transforms: None,
2366 states: vec![],
2367 };
2368
2369 let operator_placement = OperatorPlacement {
2370 service_id: "listing-to-prospect-op".to_string(),
2371 window: true,
2372 partition: false,
2373 transforms_index: Some(0),
2374 };
2375
2376 let res = service.delete_operator(operator_placement);
2377
2378 assert!(res.is_err());
2379 assert_eq!(
2380 res.unwrap_err().to_string(),
2381 "Cannot delete operator. Window was specified but service does not have a window"
2382 )
2383 }
2384}