sdf_metadata/metadata/operator/
transform_operator.rs

1use anyhow::Result;
2
3use sdf_common::constants::{
4    FILTER_MAP_OPERATOR_ID, FILTER_OPERATOR_ID, FLAT_MAP_OPERATOR_ID, MAP_OPERATOR_ID,
5};
6
7use crate::{
8    metadata::io::topic::KVSchemaType,
9    util::{sdf_types_map::SdfTypesMap, validation_failure::ValidationFailure},
10    wit::{
11        dataflow::PackageImport,
12        operator::{OperatorType, StepInvocation, TransformOperator},
13    },
14};
15
16impl TransformOperator {
17    pub(crate) fn new(
18        operator_type: OperatorType,
19        step_invocation: StepInvocation,
20    ) -> Option<TransformOperator> {
21        match operator_type {
22            OperatorType::Map => Some(TransformOperator::Map(step_invocation)),
23            OperatorType::FilterMap => Some(TransformOperator::FilterMap(step_invocation)),
24            OperatorType::Filter => Some(TransformOperator::Filter(step_invocation)),
25            OperatorType::FlatMap => Some(TransformOperator::FlatMap(step_invocation)),
26            _ => None,
27        }
28    }
29
30    pub fn is_imported(&self, imports: &[PackageImport]) -> bool {
31        self.inner().is_imported(imports)
32    }
33
34    pub fn name(&self) -> &str {
35        match self {
36            Self::Map(step_inv)
37            | Self::FilterMap(step_inv)
38            | Self::Filter(step_inv)
39            | Self::FlatMap(step_inv) => &step_inv.uses,
40        }
41    }
42
43    pub(crate) fn operator_str(&self) -> &str {
44        match self {
45            Self::Map(_) => MAP_OPERATOR_ID,
46            Self::Filter(_) => FILTER_OPERATOR_ID,
47            Self::FilterMap(_) => FILTER_MAP_OPERATOR_ID,
48            Self::FlatMap(_) => FLAT_MAP_OPERATOR_ID,
49        }
50    }
51
52    pub fn inner(&self) -> &StepInvocation {
53        match self {
54            Self::Map(step_inv)
55            | Self::FilterMap(step_inv)
56            | Self::Filter(step_inv)
57            | Self::FlatMap(step_inv) => step_inv,
58        }
59    }
60
61    pub fn output_type(&self) -> Option<KVSchemaType> {
62        match self {
63            Self::Map(step_inv)
64            | Self::FilterMap(step_inv)
65            | Self::Filter(step_inv)
66            | Self::FlatMap(step_inv) => {
67                let parameter = step_inv.output.clone();
68
69                parameter.map(|p| p.type_.into())
70            }
71        }
72    }
73
74    pub fn input_type(&self) -> Option<KVSchemaType> {
75        match self {
76            Self::Map(step_inv)
77            | Self::FilterMap(step_inv)
78            | Self::Filter(step_inv)
79            | Self::FlatMap(step_inv) => {
80                if step_inv.requires_key_param() {
81                    let key = step_inv.inputs.first();
82                    let value = step_inv.inputs.get(1);
83
84                    key.map(|k| {
85                        (
86                            Some(k.type_.clone()),
87                            value.map(|v| v.type_.clone()).unwrap_or_else(|| {
88                                panic!("Missing value parameter for operator: {}", self.name())
89                            }),
90                        )
91                            .into()
92                    })
93                } else {
94                    let parameter = step_inv.inputs.first();
95                    parameter.map(|p| (None, p.type_.clone()).into())
96                }
97            }
98        }
99    }
100
101    pub fn validate(&self, types: &SdfTypesMap) -> Result<(), ValidationFailure> {
102        match self {
103            Self::Map(function) => function.validate_map(types),
104            Self::FilterMap(function) => function.validate_filter_map(types),
105            Self::Filter(function) => function.validate_filter(types),
106            Self::FlatMap(function) => function.validate_flat_map(types),
107        }
108    }
109
110    #[cfg(feature = "parser")]
111    pub fn update_signature_from_code(&mut self) -> Result<()> {
112        match self {
113            Self::Map(function) => function.update_signature_from_code(),
114            Self::FilterMap(function) => function.update_signature_from_code(),
115            Self::Filter(function) => function.update_signature_from_code(),
116            Self::FlatMap(function) => function.update_signature_from_code(),
117        }
118    }
119}