sdf_metadata/metadata/operator/
post_transforms.rs

1use anyhow::Result;
2
3use crate::{
4    metadata::io::topic::KVSchemaType,
5    util::{
6        sdf_types_map::SdfTypesMap, validation_error::ValidationError,
7        validation_failure::ValidationFailure,
8    },
9    wit::{
10        dataflow::PostTransforms,
11        operator::{OperatorType, StepInvocation},
12    },
13};
14
15impl PostTransforms {
16    pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
17        match self {
18            PostTransforms::AssignTimestamp(window) => window.operators(),
19            PostTransforms::Partition(partition) => partition.operators(),
20        }
21    }
22    pub fn output_type(&self, input_type: KVSchemaType) -> Result<KVSchemaType, ValidationError> {
23        match self {
24            PostTransforms::AssignTimestamp(window) => window.output_type(input_type),
25            PostTransforms::Partition(partition) => partition.output_type(input_type),
26        }
27    }
28
29    pub fn validate(
30        &self,
31        types: &SdfTypesMap,
32        expected_input_type: &KVSchemaType,
33    ) -> Result<(), ValidationFailure> {
34        let mut errors = ValidationFailure::new();
35
36        match self {
37            PostTransforms::AssignTimestamp(window) => {
38                if let Err(window_error) =
39                    window.validate(types, expected_input_type, "transforms block")
40                {
41                    errors.concat_with_context("Window", &window_error)
42                }
43            }
44            PostTransforms::Partition(partition) => {
45                if let Err(partition_error) =
46                    partition.validate(types, expected_input_type, "transforms block")
47                {
48                    errors.concat_with_context("Partition", &partition_error)
49                }
50            }
51        }
52
53        if errors.any() {
54            Err(errors)
55        } else {
56            Ok(())
57        }
58    }
59
60    #[cfg(feature = "parser")]
61    pub fn update_inline_operators(&mut self) -> Result<()> {
62        match self {
63            PostTransforms::AssignTimestamp(window) => window.update_inline_operators(),
64            PostTransforms::Partition(partition) => partition.update_inline_operators(),
65        }
66    }
67}
68
69#[cfg(test)]
70mod test {
71    use crate::wit::operator::{
72        PartitionOperator, TransformOperator, PostTransforms, StepInvocation, Transforms, Window,
73    };
74
75    #[test]
76    fn test_operators() {
77        let post_transforms = PostTransforms::AssignTimestamp(Window {
78            partition: Some(PartitionOperator {
79                assign_key: StepInvocation {
80                    uses: "my-assign".into(),
81                    ..Default::default()
82                },
83                transforms: Transforms {
84                    steps: vec![TransformOperator::Map(StepInvocation {
85                        uses: "my-map".into(),
86                        ..Default::default()
87                    })],
88                },
89                update_state: None,
90            }),
91            flush: Some(StepInvocation {
92                uses: "my-flush".into(),
93                ..Default::default()
94            }),
95            assign_timestamp: StepInvocation {
96                uses: "my-assign-timestamp".into(),
97                ..Default::default()
98            },
99            ..Default::default()
100        });
101
102        let op = post_transforms.operators();
103        assert_eq!(op.len(), 4);
104        assert_eq!(op[0].0.uses, "my-assign-timestamp");
105        assert_eq!(op[1].0.uses, "my-assign");
106        assert_eq!(op[2].0.uses, "my-map");
107        assert_eq!(op[3].0.uses, "my-flush");
108    }
109}