sdf_parser_core/config/transform/
mod.rs1use schemars::JsonSchema;
2use serde::{Deserialize, Serialize};
3
4use super::{import::StateImport, types::MetadataType};
5use self::code::Dependency;
6pub use self::code::{FunctionDefinition, StepInvocationDefinition, Lang};
7
8pub mod code;
9
10pub type TransformsWrapperV0_5_0 = Vec<TransformOperator>;
11
12#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
13pub struct WindowOperatorWrapper {
14 #[serde(flatten)]
15 pub properties: WindowProperties,
16 #[serde(alias = "assign-timestamp")]
17 pub assign_timestamp: StepInvocationWrapperV0_5_0,
18 pub flush: Option<StepInvocationWrapperV0_5_0>,
19 #[serde(flatten)]
20 pub transforms: WindowTransforms,
21}
22
23#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
24pub struct WindowTransforms {
25 #[serde(default)]
26 pub transforms: TransformsWrapperV0_5_0,
27 #[serde(default)]
28 pub partition: Option<PartitionOperatorWrapper>,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
32pub struct PartitionOperatorWrapper {
33 #[serde(alias = "assign-key")]
34 pub assign_key: StepInvocationWrapperV0_5_0,
35 #[serde(default)]
36 pub transforms: TransformsWrapperV0_5_0,
37 #[serde(alias = "update-state", default)]
38 pub update_state: Option<StepInvocationWrapperV0_5_0>,
39}
40
41#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
42#[serde(untagged)]
43pub enum StateWrapper {
44 Ref(RefState),
45 System(SystemState),
46 Typed(TypedState),
47}
48
49impl StateWrapper {
50 pub fn inner_type(&self) -> Option<&MetadataType> {
51 match self {
52 Self::Typed(typed) => Some(&typed.inner_type),
53 Self::Ref(_) => None,
54 Self::System(_) => None,
55 }
56 }
57}
58#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
59pub struct TypedState {
60 #[serde(flatten)]
61 pub inner_type: MetadataType,
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
65#[serde(rename_all = "kebab-case")]
66pub struct RefState {
67 pub from: String,
68}
69
70impl RefState {
71 pub fn into_pair(&self) -> (String, String) {
72 let mut iter = self.from.split('.');
73 let service = iter
74 .next()
75 .expect("Shouldn't fail if called after validate")
76 .to_string();
77 let state = iter
78 .next()
79 .expect("Shouldn't fail if called after validate")
80 .to_string();
81
82 (service, state)
83 }
84}
85
86#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
87#[serde(rename_all = "kebab-case")]
88pub struct SystemState {
89 pub system: String,
90}
91
92#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
93#[serde(rename_all = "kebab-case")]
94pub struct StepInvocationWrapperV0_5_0 {
95 #[serde(flatten)]
96 pub definition: StepInvocationDefinition,
97}
98
99impl StepInvocationWrapperV0_5_0 {
100 pub fn state_imports(&self) -> &Vec<StateImport> {
101 match &self.definition {
102 StepInvocationDefinition::Code(code) => &code.state_imports,
103 StepInvocationDefinition::Function(function) => &function.state_imports,
104 }
105 }
106}
107
108#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
109#[serde(tag = "operator")]
110#[serde(rename_all = "kebab-case")]
111pub enum TransformOperator {
112 Map(StepInvocationWrapperV0_5_0),
113 Filter(StepInvocationWrapperV0_5_0),
114 FilterMap(StepInvocationWrapperV0_5_0),
115 FlatMap(StepInvocationWrapperV0_5_0),
116}
117
118impl TransformOperator {
119 pub fn extra_deps(&self) -> Vec<Dependency> {
120 self.inner().definition.extra_deps()
121 }
122
123 pub fn inner(&self) -> &StepInvocationWrapperV0_5_0 {
124 match self {
125 Self::Map(map) => map,
126 Self::Filter(filter) => filter,
127 Self::FilterMap(filter_map) => filter_map,
128 Self::FlatMap(flat_map) => flat_map,
129 }
130 }
131}
132
133#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
134pub struct NamedParameterWrapper {
135 pub name: String,
136 #[serde(flatten)]
137 pub ty: MetadataType,
138 #[serde(default)]
139 pub optional: bool,
140 #[serde(default)]
141 pub kind: ParameterKindWrapper,
142}
143
144#[derive(Default, Serialize, Deserialize, Debug, Clone, JsonSchema)]
145#[serde(rename_all = "kebab-case")]
146pub enum ParameterKindWrapper {
147 Key,
148 #[default]
149 Value,
150}
151
152#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
153pub struct ParameterWrapper {
154 #[serde(flatten)]
155 pub ty: MetadataType,
156 #[serde(default)]
157 pub optional: bool,
158 #[serde(default)]
159 pub list: bool,
160}
161
162#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
163pub struct WindowProperties {
164 #[serde(flatten)]
165 pub kind: WindowKind,
166 #[serde(default)]
167 pub watermark: WatermarkConfig,
168}
169
170#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
171#[serde(rename_all = "kebab-case")]
172pub enum WindowKind {
173 Tumbling(TumblingWindow),
174 Sliding(SlidingWindow),
175}
176
177#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
178pub struct TumblingWindow {
179 pub duration: String,
180 pub offset: Option<String>,
181}
182
183#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
184pub struct SlidingWindow {
185 pub duration: String,
186 pub offset: Option<String>,
187 pub slide: String,
188}
189
190#[derive(Serialize, Deserialize, Default, Debug, Clone, JsonSchema)]
191#[serde(rename_all = "kebab-case")]
192pub struct WatermarkConfig {
193 pub idleness: Option<String>,
194 #[serde(alias = "grace_period")]
195 pub grace_period: Option<String>,
196}
197
198#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Copy, Clone)]
199#[serde(rename_all = "kebab-case")]
200pub enum OperatorAdaptor {
201 Http,
202}
203
204#[cfg(test)]
205mod yaml_test {
206 use std::collections::BTreeMap;
207
208 use super::StateWrapper;
209
210 #[test]
211 fn parse_test_system_wrapper() {
212 let yaml = r#"
213 system: cmd
214 name: my-state
215 "#;
216
217 let state: StateWrapper = serde_yaml::from_str(yaml).unwrap();
218
219 match state {
220 StateWrapper::System(sys_state) => {
221 assert_eq!(sys_state.system, "cmd");
222 }
223 _ => panic!("failed to parse state wrapper"),
224 }
225 }
226
227 #[test]
228 fn parse_ref_state_wrapper() {
229 let yaml = r#"
230 from: another.state
231 "#;
232
233 let state: StateWrapper = serde_yaml::from_str(yaml).unwrap();
234
235 match state {
236 StateWrapper::Ref(rf) => {
237 assert_eq!(rf.from, "another.state");
238 }
239 _ => panic!("failed to parse state wrapper"),
240 }
241 }
242
243 #[test]
244 fn parse_type_state_wrapper() {
245 let yaml = r#"
246 my-state:
247 type: keyed-state
248 properties:
249 key:
250 type: string
251 value:
252 type: string
253 "#;
254
255 let state: BTreeMap<String, StateWrapper> = serde_yaml::from_str(yaml).unwrap();
256
257 match state.get("my-state").unwrap() {
258 StateWrapper::Typed(_) => {}
259 StateWrapper::Ref(_) => panic!("parse as ref state"),
260 StateWrapper::System(_) => panic!("parse as system state"),
261 }
262 }
263}