sdf_parser_df/config/
mod.rs

1#[cfg(test)]
2mod tests;
3
4pub mod utils;
5
6use sdf_common::LATEST_STABLE_DATAFLOW;
7
8pub const SERVICE_DEFINITION_CONFIG_STABLE_VERSION: &str = LATEST_STABLE_DATAFLOW;
9pub const VERSION_NOT_SUPPORTED_ERROR: &str = "ApiVersion not supported, try upgrading to 0.5.0";
10
11pub use wrapper::*;
12
13mod wrapper {
14
15    use std::{
16        collections::BTreeMap,
17        ops::{Deref, DerefMut},
18    };
19
20    use anyhow::{anyhow, Result};
21    use schemars::JsonSchema;
22    use sdf_parser_package::pkg::PackageWrapperV0_5_0;
23    use serde::{Deserialize, Serialize};
24
25    use sdf_parser_core::{
26        config::{
27            dev::DevConfig,
28            import::PackageImport,
29            types::{MetadataTypesMap, MetadataTypesMapWrapper},
30            DefaultConfigs, Metadata, TopicWrapper,
31        },
32        MaybeValid,
33    };
34
35    use super::*;
36
37    use sdf_parser_core::config::transform::{
38        TransformOperator, PartitionOperatorWrapper, StateWrapper, TransformsWrapperV0_5_0,
39        WindowOperatorWrapper,
40    };
41
42    /// Current dataflow definition config, this should be
43    pub type CurrentDataflowDefinitionConfig = DataflowDefinitionWrapperV0_5_0;
44
45    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
46    #[serde(tag = "apiVersion")]
47    pub enum DataflowDefinitionConfig {
48        #[serde(rename = "0.5.0")]
49        V0_5_0(CurrentDataflowDefinitionConfig),
50        #[serde(rename = "0.6.0")]
51        V0_6_0(CurrentDataflowDefinitionConfig),
52        #[schemars(skip)]
53        #[serde(rename = "0.1.0", alias = "0.2.0", alias = "0.3.0", alias = "0.4.0")]
54        Unsupported(DataflowDefinitionUnsupportedVersion),
55    }
56
57    impl Deref for DataflowDefinitionConfig {
58        type Target = CurrentDataflowDefinitionConfig;
59
60        fn deref(&self) -> &Self::Target {
61            match self {
62                Self::V0_5_0(inner) => inner,
63                Self::V0_6_0(inner) => inner,
64                Self::Unsupported(_) => unreachable!("{VERSION_NOT_SUPPORTED_ERROR}"),
65            }
66        }
67    }
68
69    impl DerefMut for DataflowDefinitionConfig {
70        fn deref_mut(&mut self) -> &mut Self::Target {
71            match self {
72                Self::V0_5_0(inner) => inner,
73                Self::V0_6_0(inner) => inner,
74                Self::Unsupported(_) => unreachable!("{VERSION_NOT_SUPPORTED_ERROR}"),
75            }
76        }
77    }
78
79    impl CurrentDataflowDefinitionConfig {
80        pub fn name(&self) -> &str {
81            &self.meta.name
82        }
83
84        pub fn version(&self) -> &str {
85            &self.meta.version
86        }
87
88        pub fn namespace(&self) -> &str {
89            &self.meta.namespace
90        }
91
92        pub fn metadata(&self) -> &Metadata {
93            &self.meta
94        }
95
96        pub fn imports(&self) -> &Vec<PackageImport> {
97            &self.imports
98        }
99
100        pub fn services(&self) -> Result<Vec<(&String, OperationsWrapperV0_5_0)>> {
101            self.services
102                .iter()
103                .map(|(a, b)| match b.valid_data() {
104                    Some(b) => Ok((a, b.to_owned())),
105                    None => Err(anyhow!("Invalid service definition")),
106                })
107                .collect::<Result<_>>()
108        }
109
110        pub fn dev(&self) -> Option<&DevConfig> {
111            self.dev.as_ref()
112        }
113    }
114
115    pub type DataflowMetadata = Metadata;
116
117    #[derive(Serialize, Deserialize, Debug, Clone)]
118    pub struct DataflowDefinitionUnsupportedVersion {
119        pub meta: DataflowMetadata,
120    }
121
122    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
123    pub struct DataflowDefinitionWrapperV0_5_0 {
124        pub meta: DataflowMetadata,
125        #[serde(skip_serializing_if = "Vec::is_empty", default)]
126        pub imports: Vec<PackageImport>,
127        #[serde(deserialize_with = "serde_with::rust::maps_duplicate_key_is_error::deserialize")]
128        #[serde(serialize_with = "serde_with::rust::maps_duplicate_key_is_error::serialize")]
129        pub services: BTreeMap<String, MaybeValid<OperationsWrapperV0_5_0>>,
130        #[serde(default)]
131        pub types: MetadataTypesMapWrapper,
132        #[serde(
133            skip_serializing_if = "BTreeMap::is_empty",
134            deserialize_with = "serde_with::rust::maps_duplicate_key_is_error::deserialize",
135            serialize_with = "serde_with::rust::maps_duplicate_key_is_error::serialize",
136            default
137        )]
138        pub topics: BTreeMap<String, TopicWrapper>,
139        #[serde(skip_serializing_if = "Option::is_none", default)]
140        pub config: Option<DefaultConfigs>,
141        pub dev: Option<DevConfig>,
142        #[serde(skip_serializing_if = "Vec::is_empty", default)]
143        pub packages: Vec<PackageWrapperV0_5_0>,
144        #[serde(
145            skip_serializing_if = "BTreeMap::is_empty",
146            deserialize_with = "serde_with::rust::maps_duplicate_key_is_error::deserialize",
147            serialize_with = "serde_with::rust::maps_duplicate_key_is_error::serialize",
148            default
149        )]
150        pub schedule: BTreeMap<String, ScheduleWrapper>,
151    }
152
153    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
154    #[serde(rename_all = "kebab-case")]
155    pub enum ScheduleWrapper {
156        Cron(String),
157    }
158
159    impl DataflowDefinitionWrapperV0_5_0 {
160        pub fn types(&self) -> Result<MetadataTypesMap> {
161            let mut types: MetadataTypesMap = self.types.clone().try_into()?;
162
163            let services = self.services()?;
164
165            for (name, service) in services.iter() {
166                for (state_name, state) in &service.states {
167                    if state.is_invalid() {
168                        return Err(anyhow!(
169                            "Invalid state definition for service {}: {}",
170                            name,
171                            state_name
172                        ));
173                    }
174                }
175            }
176
177            services.iter().fold(&mut types, |acc, (_, op)| {
178                for (name, state) in &op.states {
179                    let Some(state) = state.valid_data() else {
180                        unreachable!();
181                    };
182                    if let Some(state_type) = state.inner_type() {
183                        acc.insert(name.into(), state_type.clone());
184                    }
185                }
186                acc
187            });
188            Ok(types)
189        }
190    }
191
192    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
193    pub struct OperationsWrapperV0_5_0 {
194        pub sources: Vec<IoRefWrapper>,
195        #[serde(default = "IoRefWrapper::default_vec")]
196        pub sinks: Vec<IoRefWrapper>,
197        #[serde(default)]
198        pub transforms: TransformsWrapperV0_5_0,
199        #[serde(flatten)]
200        pub post_transforms: Option<PostTransforms>,
201        #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
202        pub states: BTreeMap<String, MaybeValid<StateWrapper>>,
203    }
204
205    pub type PostTransforms = MaybeValid<PostTransformsInner>;
206
207    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
208    #[serde(rename_all = "kebab-case")]
209    pub enum PostTransformsInner {
210        Window(Box<WindowOperatorWrapper>),
211        Partition(Box<PartitionOperatorWrapper>),
212    }
213
214    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
215    #[serde(tag = "type", rename_all = "kebab-case")]
216    pub enum IoRefWrapper {
217        Topic(IoConfigRef),
218        NoTarget,
219        Schedule(IoConfigRef),
220    }
221
222    impl Default for IoRefWrapper {
223        fn default() -> Self {
224            Self::NoTarget
225        }
226    }
227
228    impl IoRefWrapper {
229        fn default_vec() -> Vec<IoRefWrapper> {
230            vec![Self::default()]
231        }
232        pub fn id(&self) -> Result<&str> {
233            match self {
234                Self::Topic(topic) => Ok(&topic.id),
235                Self::NoTarget => Err(anyhow!("No target specified")),
236                Self::Schedule(schedule) => Ok(&schedule.id),
237            }
238        }
239
240        pub fn transforms(&self) -> Vec<TransformOperator> {
241            match self {
242                Self::Topic(topic) => topic.transforms.clone(),
243                Self::NoTarget => vec![],
244                Self::Schedule(schedule) => schedule.transforms.clone(),
245            }
246        }
247    }
248
249    #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
250    pub struct IoConfigRef {
251        pub id: String,
252        #[serde(default)]
253        pub transforms: Vec<TransformOperator>,
254    }
255}