sdf_metadata/metadata/operator/
transform_operator.rs1use anyhow::Result;
2
3use sdf_common::constants::{
4 FILTER_MAP_OPERATOR_ID, FILTER_OPERATOR_ID, FLAT_MAP_OPERATOR_ID, MAP_OPERATOR_ID,
5};
6
7use crate::{
8 metadata::io::topic::KVSchemaType,
9 util::{sdf_types_map::SdfTypesMap, validation_failure::ValidationFailure},
10 wit::{
11 dataflow::PackageImport,
12 operator::{OperatorType, StepInvocation, TransformOperator},
13 },
14};
15
16impl TransformOperator {
17 pub(crate) fn new(
18 operator_type: OperatorType,
19 step_invocation: StepInvocation,
20 ) -> Option<TransformOperator> {
21 match operator_type {
22 OperatorType::Map => Some(TransformOperator::Map(step_invocation)),
23 OperatorType::FilterMap => Some(TransformOperator::FilterMap(step_invocation)),
24 OperatorType::Filter => Some(TransformOperator::Filter(step_invocation)),
25 OperatorType::FlatMap => Some(TransformOperator::FlatMap(step_invocation)),
26 _ => None,
27 }
28 }
29
30 pub fn is_imported(&self, imports: &[PackageImport]) -> bool {
31 self.inner().is_imported(imports)
32 }
33
34 pub fn name(&self) -> &str {
35 match self {
36 Self::Map(step_inv)
37 | Self::FilterMap(step_inv)
38 | Self::Filter(step_inv)
39 | Self::FlatMap(step_inv) => &step_inv.uses,
40 }
41 }
42
43 pub(crate) fn operator_str(&self) -> &str {
44 match self {
45 Self::Map(_) => MAP_OPERATOR_ID,
46 Self::Filter(_) => FILTER_OPERATOR_ID,
47 Self::FilterMap(_) => FILTER_MAP_OPERATOR_ID,
48 Self::FlatMap(_) => FLAT_MAP_OPERATOR_ID,
49 }
50 }
51
52 pub fn inner(&self) -> &StepInvocation {
53 match self {
54 Self::Map(step_inv)
55 | Self::FilterMap(step_inv)
56 | Self::Filter(step_inv)
57 | Self::FlatMap(step_inv) => step_inv,
58 }
59 }
60
61 pub fn output_type(&self) -> Option<KVSchemaType> {
62 match self {
63 Self::Map(step_inv)
64 | Self::FilterMap(step_inv)
65 | Self::Filter(step_inv)
66 | Self::FlatMap(step_inv) => {
67 let parameter = step_inv.output.clone();
68
69 parameter.map(|p| p.type_.into())
70 }
71 }
72 }
73
74 pub fn input_type(&self) -> Option<KVSchemaType> {
75 match self {
76 Self::Map(step_inv)
77 | Self::FilterMap(step_inv)
78 | Self::Filter(step_inv)
79 | Self::FlatMap(step_inv) => {
80 if step_inv.requires_key_param() {
81 let key = step_inv.inputs.first();
82 let value = step_inv.inputs.get(1);
83
84 key.map(|k| {
85 (
86 Some(k.type_.clone()),
87 value.map(|v| v.type_.clone()).unwrap_or_else(|| {
88 panic!("Missing value parameter for operator: {}", self.name())
89 }),
90 )
91 .into()
92 })
93 } else {
94 let parameter = step_inv.inputs.first();
95 parameter.map(|p| (None, p.type_.clone()).into())
96 }
97 }
98 }
99 }
100
101 pub fn validate(&self, types: &SdfTypesMap) -> Result<(), ValidationFailure> {
102 match self {
103 Self::Map(function) => function.validate_map(types),
104 Self::FilterMap(function) => function.validate_filter_map(types),
105 Self::Filter(function) => function.validate_filter(types),
106 Self::FlatMap(function) => function.validate_flat_map(types),
107 }
108 }
109
110 #[cfg(feature = "parser")]
111 pub fn update_signature_from_code(&mut self) -> Result<()> {
112 match self {
113 Self::Map(function) => function.update_signature_from_code(),
114 Self::FilterMap(function) => function.update_signature_from_code(),
115 Self::Filter(function) => function.update_signature_from_code(),
116 Self::FlatMap(function) => function.update_signature_from_code(),
117 }
118 }
119}