sdf_parser_core/config/transform/
mod.rs

1use schemars::JsonSchema;
2use serde::{Deserialize, Serialize};
3
4use super::{import::StateImport, types::MetadataType};
5use self::code::Dependency;
6pub use self::code::{FunctionDefinition, StepInvocationDefinition, Lang};
7
8pub mod code;
9
10pub type TransformsWrapperV0_5_0 = Vec<TransformOperator>;
11
12#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
13pub struct WindowOperatorWrapper {
14    #[serde(flatten)]
15    pub properties: WindowProperties,
16    #[serde(alias = "assign-timestamp")]
17    pub assign_timestamp: StepInvocationWrapperV0_5_0,
18    pub flush: Option<StepInvocationWrapperV0_5_0>,
19    #[serde(flatten)]
20    pub transforms: WindowTransforms,
21}
22
23#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
24pub struct WindowTransforms {
25    #[serde(default)]
26    pub transforms: TransformsWrapperV0_5_0,
27    #[serde(default)]
28    pub partition: Option<PartitionOperatorWrapper>,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
32pub struct PartitionOperatorWrapper {
33    #[serde(alias = "assign-key")]
34    pub assign_key: StepInvocationWrapperV0_5_0,
35    #[serde(default)]
36    pub transforms: TransformsWrapperV0_5_0,
37    #[serde(alias = "update-state", default)]
38    pub update_state: Option<StepInvocationWrapperV0_5_0>,
39}
40
41#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
42#[serde(untagged)]
43pub enum StateWrapper {
44    Ref(RefState),
45    System(SystemState),
46    Typed(TypedState),
47}
48
49impl StateWrapper {
50    pub fn inner_type(&self) -> Option<&MetadataType> {
51        match self {
52            Self::Typed(typed) => Some(&typed.inner_type),
53            Self::Ref(_) => None,
54            Self::System(_) => None,
55        }
56    }
57}
58#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
59pub struct TypedState {
60    #[serde(flatten)]
61    pub inner_type: MetadataType,
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
65#[serde(rename_all = "kebab-case")]
66pub struct RefState {
67    pub from: String,
68}
69
70impl RefState {
71    pub fn into_pair(&self) -> (String, String) {
72        let mut iter = self.from.split('.');
73        let service = iter
74            .next()
75            .expect("Shouldn't fail if called after validate")
76            .to_string();
77        let state = iter
78            .next()
79            .expect("Shouldn't fail if called after validate")
80            .to_string();
81
82        (service, state)
83    }
84}
85
86#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
87#[serde(rename_all = "kebab-case")]
88pub struct SystemState {
89    pub system: String,
90}
91
92#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
93#[serde(rename_all = "kebab-case")]
94pub struct StepInvocationWrapperV0_5_0 {
95    #[serde(flatten)]
96    pub definition: StepInvocationDefinition,
97}
98
99impl StepInvocationWrapperV0_5_0 {
100    pub fn state_imports(&self) -> &Vec<StateImport> {
101        match &self.definition {
102            StepInvocationDefinition::Code(code) => &code.state_imports,
103            StepInvocationDefinition::Function(function) => &function.state_imports,
104        }
105    }
106}
107
108#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
109#[serde(tag = "operator")]
110#[serde(rename_all = "kebab-case")]
111pub enum TransformOperator {
112    Map(StepInvocationWrapperV0_5_0),
113    Filter(StepInvocationWrapperV0_5_0),
114    FilterMap(StepInvocationWrapperV0_5_0),
115    FlatMap(StepInvocationWrapperV0_5_0),
116}
117
118impl TransformOperator {
119    pub fn extra_deps(&self) -> Vec<Dependency> {
120        self.inner().definition.extra_deps()
121    }
122
123    pub fn inner(&self) -> &StepInvocationWrapperV0_5_0 {
124        match self {
125            Self::Map(map) => map,
126            Self::Filter(filter) => filter,
127            Self::FilterMap(filter_map) => filter_map,
128            Self::FlatMap(flat_map) => flat_map,
129        }
130    }
131}
132
133#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
134pub struct NamedParameterWrapper {
135    pub name: String,
136    #[serde(flatten)]
137    pub ty: MetadataType,
138    #[serde(default)]
139    pub optional: bool,
140    #[serde(default)]
141    pub kind: ParameterKindWrapper,
142}
143
144#[derive(Default, Serialize, Deserialize, Debug, Clone, JsonSchema)]
145#[serde(rename_all = "kebab-case")]
146pub enum ParameterKindWrapper {
147    Key,
148    #[default]
149    Value,
150}
151
152#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
153pub struct ParameterWrapper {
154    #[serde(flatten)]
155    pub ty: MetadataType,
156    #[serde(default)]
157    pub optional: bool,
158    #[serde(default)]
159    pub list: bool,
160}
161
162#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
163pub struct WindowProperties {
164    #[serde(flatten)]
165    pub kind: WindowKind,
166    #[serde(default)]
167    pub watermark: WatermarkConfig,
168}
169
170#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
171#[serde(rename_all = "kebab-case")]
172pub enum WindowKind {
173    Tumbling(TumblingWindow),
174    Sliding(SlidingWindow),
175}
176
177#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
178pub struct TumblingWindow {
179    pub duration: String,
180    pub offset: Option<String>,
181}
182
183#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
184pub struct SlidingWindow {
185    pub duration: String,
186    pub offset: Option<String>,
187    pub slide: String,
188}
189
190#[derive(Serialize, Deserialize, Default, Debug, Clone, JsonSchema)]
191#[serde(rename_all = "kebab-case")]
192pub struct WatermarkConfig {
193    pub idleness: Option<String>,
194    #[serde(alias = "grace_period")]
195    pub grace_period: Option<String>,
196}
197
198#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Copy, Clone)]
199#[serde(rename_all = "kebab-case")]
200pub enum OperatorAdaptor {
201    Http,
202}
203
204#[cfg(test)]
205mod yaml_test {
206    use std::collections::BTreeMap;
207
208    use super::StateWrapper;
209
210    #[test]
211    fn parse_test_system_wrapper() {
212        let yaml = r#"
213        system: cmd
214        name: my-state
215        "#;
216
217        let state: StateWrapper = serde_yaml::from_str(yaml).unwrap();
218
219        match state {
220            StateWrapper::System(sys_state) => {
221                assert_eq!(sys_state.system, "cmd");
222            }
223            _ => panic!("failed to parse state wrapper"),
224        }
225    }
226
227    #[test]
228    fn parse_ref_state_wrapper() {
229        let yaml = r#"
230        from: another.state
231        "#;
232
233        let state: StateWrapper = serde_yaml::from_str(yaml).unwrap();
234
235        match state {
236            StateWrapper::Ref(rf) => {
237                assert_eq!(rf.from, "another.state");
238            }
239            _ => panic!("failed to parse state wrapper"),
240        }
241    }
242
243    #[test]
244    fn parse_type_state_wrapper() {
245        let yaml = r#"
246        my-state:
247          type: keyed-state
248          properties:
249            key:
250              type: string
251            value:
252              type: string
253        "#;
254
255        let state: BTreeMap<String, StateWrapper> = serde_yaml::from_str(yaml).unwrap();
256
257        match state.get("my-state").unwrap() {
258            StateWrapper::Typed(_) => {}
259            StateWrapper::Ref(_) => panic!("parse as ref state"),
260            StateWrapper::System(_) => panic!("parse as system state"),
261        }
262    }
263}