sdf_metadata/metadata/operator/
post_transforms.rs1use anyhow::Result;
2
3use crate::{
4 metadata::io::topic::KVSchemaType,
5 util::{
6 sdf_types_map::SdfTypesMap, validation_error::ValidationError,
7 validation_failure::ValidationFailure,
8 },
9 wit::{
10 dataflow::PostTransforms,
11 operator::{OperatorType, StepInvocation},
12 },
13};
14
15impl PostTransforms {
16 pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
17 match self {
18 PostTransforms::AssignTimestamp(window) => window.operators(),
19 PostTransforms::Partition(partition) => partition.operators(),
20 }
21 }
22 pub fn output_type(&self, input_type: KVSchemaType) -> Result<KVSchemaType, ValidationError> {
23 match self {
24 PostTransforms::AssignTimestamp(window) => window.output_type(input_type),
25 PostTransforms::Partition(partition) => partition.output_type(input_type),
26 }
27 }
28
29 pub fn validate(
30 &self,
31 types: &SdfTypesMap,
32 expected_input_type: &KVSchemaType,
33 ) -> Result<(), ValidationFailure> {
34 let mut errors = ValidationFailure::new();
35
36 match self {
37 PostTransforms::AssignTimestamp(window) => {
38 if let Err(window_error) =
39 window.validate(types, expected_input_type, "transforms block")
40 {
41 errors.concat_with_context("Window", &window_error)
42 }
43 }
44 PostTransforms::Partition(partition) => {
45 if let Err(partition_error) =
46 partition.validate(types, expected_input_type, "transforms block")
47 {
48 errors.concat_with_context("Partition", &partition_error)
49 }
50 }
51 }
52
53 if errors.any() {
54 Err(errors)
55 } else {
56 Ok(())
57 }
58 }
59
60 #[cfg(feature = "parser")]
61 pub fn update_inline_operators(&mut self) -> Result<()> {
62 match self {
63 PostTransforms::AssignTimestamp(window) => window.update_inline_operators(),
64 PostTransforms::Partition(partition) => partition.update_inline_operators(),
65 }
66 }
67}
68
69#[cfg(test)]
70mod test {
71 use crate::wit::operator::{
72 PartitionOperator, TransformOperator, PostTransforms, StepInvocation, Transforms, Window,
73 };
74
75 #[test]
76 fn test_operators() {
77 let post_transforms = PostTransforms::AssignTimestamp(Window {
78 partition: Some(PartitionOperator {
79 assign_key: StepInvocation {
80 uses: "my-assign".into(),
81 ..Default::default()
82 },
83 transforms: Transforms {
84 steps: vec![TransformOperator::Map(StepInvocation {
85 uses: "my-map".into(),
86 ..Default::default()
87 })],
88 },
89 update_state: None,
90 }),
91 flush: Some(StepInvocation {
92 uses: "my-flush".into(),
93 ..Default::default()
94 }),
95 assign_timestamp: StepInvocation {
96 uses: "my-assign-timestamp".into(),
97 ..Default::default()
98 },
99 ..Default::default()
100 });
101
102 let op = post_transforms.operators();
103 assert_eq!(op.len(), 4);
104 assert_eq!(op[0].0.uses, "my-assign-timestamp");
105 assert_eq!(op[1].0.uses, "my-assign");
106 assert_eq!(op[2].0.uses, "my-map");
107 assert_eq!(op[3].0.uses, "my-flush");
108 }
109}