1#[cfg(test)]
2mod tests;
3
4pub mod utils;
5
6use sdf_common::LATEST_STABLE_DATAFLOW;
7
8pub const SERVICE_DEFINITION_CONFIG_STABLE_VERSION: &str = LATEST_STABLE_DATAFLOW;
9pub const VERSION_NOT_SUPPORTED_ERROR: &str = "ApiVersion not supported, try upgrading to 0.5.0";
10
11pub use wrapper::*;
12
13mod wrapper {
14
15 use std::{
16 collections::BTreeMap,
17 ops::{Deref, DerefMut},
18 };
19
20 use anyhow::{anyhow, Result};
21 use schemars::JsonSchema;
22 use sdf_parser_package::pkg::PackageWrapperV0_5_0;
23 use serde::{Deserialize, Serialize};
24
25 use sdf_parser_core::{
26 config::{
27 dev::DevConfig,
28 import::PackageImport,
29 types::{MetadataTypesMap, MetadataTypesMapWrapper},
30 DefaultConfigs, Metadata, TopicWrapper,
31 },
32 MaybeValid,
33 };
34
35 use super::*;
36
37 use sdf_parser_core::config::transform::{
38 TransformOperator, PartitionOperatorWrapper, StateWrapper, TransformsWrapperV0_5_0,
39 WindowOperatorWrapper,
40 };
41
42 pub type CurrentDataflowDefinitionConfig = DataflowDefinitionWrapperV0_5_0;
44
45 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
46 #[serde(tag = "apiVersion")]
47 pub enum DataflowDefinitionConfig {
48 #[serde(rename = "0.5.0")]
49 V0_5_0(CurrentDataflowDefinitionConfig),
50 #[serde(rename = "0.6.0")]
51 V0_6_0(CurrentDataflowDefinitionConfig),
52 #[schemars(skip)]
53 #[serde(rename = "0.1.0", alias = "0.2.0", alias = "0.3.0", alias = "0.4.0")]
54 Unsupported(DataflowDefinitionUnsupportedVersion),
55 }
56
57 impl Deref for DataflowDefinitionConfig {
58 type Target = CurrentDataflowDefinitionConfig;
59
60 fn deref(&self) -> &Self::Target {
61 match self {
62 Self::V0_5_0(inner) => inner,
63 Self::V0_6_0(inner) => inner,
64 Self::Unsupported(_) => unreachable!("{VERSION_NOT_SUPPORTED_ERROR}"),
65 }
66 }
67 }
68
69 impl DerefMut for DataflowDefinitionConfig {
70 fn deref_mut(&mut self) -> &mut Self::Target {
71 match self {
72 Self::V0_5_0(inner) => inner,
73 Self::V0_6_0(inner) => inner,
74 Self::Unsupported(_) => unreachable!("{VERSION_NOT_SUPPORTED_ERROR}"),
75 }
76 }
77 }
78
79 impl CurrentDataflowDefinitionConfig {
80 pub fn name(&self) -> &str {
81 &self.meta.name
82 }
83
84 pub fn version(&self) -> &str {
85 &self.meta.version
86 }
87
88 pub fn namespace(&self) -> &str {
89 &self.meta.namespace
90 }
91
92 pub fn metadata(&self) -> &Metadata {
93 &self.meta
94 }
95
96 pub fn imports(&self) -> &Vec<PackageImport> {
97 &self.imports
98 }
99
100 pub fn services(&self) -> Result<Vec<(&String, OperationsWrapperV0_5_0)>> {
101 self.services
102 .iter()
103 .map(|(a, b)| match b.valid_data() {
104 Some(b) => Ok((a, b.to_owned())),
105 None => Err(anyhow!("Invalid service definition")),
106 })
107 .collect::<Result<_>>()
108 }
109
110 pub fn dev(&self) -> Option<&DevConfig> {
111 self.dev.as_ref()
112 }
113 }
114
115 pub type DataflowMetadata = Metadata;
116
117 #[derive(Serialize, Deserialize, Debug, Clone)]
118 pub struct DataflowDefinitionUnsupportedVersion {
119 pub meta: DataflowMetadata,
120 }
121
122 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
123 pub struct DataflowDefinitionWrapperV0_5_0 {
124 pub meta: DataflowMetadata,
125 #[serde(skip_serializing_if = "Vec::is_empty", default)]
126 pub imports: Vec<PackageImport>,
127 #[serde(deserialize_with = "serde_with::rust::maps_duplicate_key_is_error::deserialize")]
128 #[serde(serialize_with = "serde_with::rust::maps_duplicate_key_is_error::serialize")]
129 pub services: BTreeMap<String, MaybeValid<OperationsWrapperV0_5_0>>,
130 #[serde(default)]
131 pub types: MetadataTypesMapWrapper,
132 #[serde(
133 skip_serializing_if = "BTreeMap::is_empty",
134 deserialize_with = "serde_with::rust::maps_duplicate_key_is_error::deserialize",
135 serialize_with = "serde_with::rust::maps_duplicate_key_is_error::serialize",
136 default
137 )]
138 pub topics: BTreeMap<String, TopicWrapper>,
139 #[serde(skip_serializing_if = "Option::is_none", default)]
140 pub config: Option<DefaultConfigs>,
141 pub dev: Option<DevConfig>,
142 #[serde(skip_serializing_if = "Vec::is_empty", default)]
143 pub packages: Vec<PackageWrapperV0_5_0>,
144 #[serde(
145 skip_serializing_if = "BTreeMap::is_empty",
146 deserialize_with = "serde_with::rust::maps_duplicate_key_is_error::deserialize",
147 serialize_with = "serde_with::rust::maps_duplicate_key_is_error::serialize",
148 default
149 )]
150 pub schedule: BTreeMap<String, ScheduleWrapper>,
151 }
152
153 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
154 #[serde(rename_all = "kebab-case")]
155 pub enum ScheduleWrapper {
156 Cron(String),
157 }
158
159 impl DataflowDefinitionWrapperV0_5_0 {
160 pub fn types(&self) -> Result<MetadataTypesMap> {
161 let mut types: MetadataTypesMap = self.types.clone().try_into()?;
162
163 let services = self.services()?;
164
165 for (name, service) in services.iter() {
166 for (state_name, state) in &service.states {
167 if state.is_invalid() {
168 return Err(anyhow!(
169 "Invalid state definition for service {}: {}",
170 name,
171 state_name
172 ));
173 }
174 }
175 }
176
177 services.iter().fold(&mut types, |acc, (_, op)| {
178 for (name, state) in &op.states {
179 let Some(state) = state.valid_data() else {
180 unreachable!();
181 };
182 if let Some(state_type) = state.inner_type() {
183 acc.insert(name.into(), state_type.clone());
184 }
185 }
186 acc
187 });
188 Ok(types)
189 }
190 }
191
192 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
193 pub struct OperationsWrapperV0_5_0 {
194 pub sources: Vec<IoRefWrapper>,
195 #[serde(default = "IoRefWrapper::default_vec")]
196 pub sinks: Vec<IoRefWrapper>,
197 #[serde(default)]
198 pub transforms: TransformsWrapperV0_5_0,
199 #[serde(flatten)]
200 pub post_transforms: Option<PostTransforms>,
201 #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
202 pub states: BTreeMap<String, MaybeValid<StateWrapper>>,
203 }
204
205 pub type PostTransforms = MaybeValid<PostTransformsInner>;
206
207 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
208 #[serde(rename_all = "kebab-case")]
209 pub enum PostTransformsInner {
210 Window(Box<WindowOperatorWrapper>),
211 Partition(Box<PartitionOperatorWrapper>),
212 }
213
214 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
215 #[serde(tag = "type", rename_all = "kebab-case")]
216 pub enum IoRefWrapper {
217 Topic(IoConfigRef),
218 NoTarget,
219 Schedule(IoConfigRef),
220 }
221
222 impl Default for IoRefWrapper {
223 fn default() -> Self {
224 Self::NoTarget
225 }
226 }
227
228 impl IoRefWrapper {
229 fn default_vec() -> Vec<IoRefWrapper> {
230 vec![Self::default()]
231 }
232 pub fn id(&self) -> Result<&str> {
233 match self {
234 Self::Topic(topic) => Ok(&topic.id),
235 Self::NoTarget => Err(anyhow!("No target specified")),
236 Self::Schedule(schedule) => Ok(&schedule.id),
237 }
238 }
239
240 pub fn transforms(&self) -> Vec<TransformOperator> {
241 match self {
242 Self::Topic(topic) => topic.transforms.clone(),
243 Self::NoTarget => vec![],
244 Self::Schedule(schedule) => schedule.transforms.clone(),
245 }
246 }
247 }
248
249 #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
250 pub struct IoConfigRef {
251 pub id: String,
252 #[serde(default)]
253 pub transforms: Vec<TransformOperator>,
254 }
255}