1use serde::Deserialize;
2
3use crate::config::{
4 ArchiveTarget, CatalogsConfig, ColumnConfig, EntityConfig, EntityStateConfig, IncrementalMode,
5 LineageConfig, MergeOptionsConfig, PiiConfig, PolicyConfig, PolicySeverity, SchemaConfig,
6 SchemaMismatchConfig, SinkConfig, SinkOptions, SinkTarget, SourceConfig, SourceOptions,
7 StoragesConfig, WriteMode,
8};
9use crate::{ConfigError, FloeResult};
10
11#[derive(Debug, Deserialize)]
14pub struct ManifestForRun {
15 pub spec_version: String,
16 pub report_base_uri: String,
17 pub entities: Vec<ManifestEntityForRun>,
18 pub storages: Option<serde_json::Value>,
19 pub catalogs: Option<serde_json::Value>,
20 pub lineage: Option<serde_json::Value>,
21}
22
23#[derive(Debug, Deserialize)]
24pub struct ManifestEntityForRun {
25 pub name: String,
26 pub domain: Option<String>,
27 pub source: ManifestSourceForRun,
28 pub sinks: ManifestSinksForRun,
29 pub policy_severity: Option<String>,
30 pub write_mode: Option<String>,
31 pub incremental_mode: Option<String>,
32 pub schema: ManifestEntitySchemaForRun,
33 pub pii: Option<serde_json::Value>,
34 pub state_path: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38pub struct ManifestSourceForRun {
39 pub format: String,
40 pub storage: String,
41 pub uri: String,
42 pub path: String,
43 pub cast_mode: Option<String>,
44 pub options: Option<serde_json::Value>,
45}
46
47#[derive(Debug, Deserialize)]
48pub struct ManifestSinksForRun {
49 pub accepted: ManifestSinkTargetForRun,
50 pub rejected: Option<ManifestSinkTargetForRun>,
51 pub archive: Option<ManifestArchiveTargetForRun>,
52}
53
54#[derive(Debug, Deserialize)]
55pub struct ManifestSinkTargetForRun {
56 pub format: String,
57 pub storage: String,
58 pub path: String,
59 pub options: Option<serde_json::Value>,
60 pub partition_by: Option<Vec<String>>,
61 pub merge: Option<serde_json::Value>,
62 pub iceberg: Option<serde_json::Value>,
63 pub delta: Option<serde_json::Value>,
64 pub write_mode: Option<String>,
65}
66
67#[derive(Debug, Deserialize)]
68pub struct ManifestArchiveTargetForRun {
69 pub storage: String,
70 pub path: String,
71}
72
73#[derive(Debug, Deserialize)]
74pub struct ManifestEntitySchemaForRun {
75 pub columns: Vec<ManifestColumnDefForRun>,
76 pub primary_key: Vec<String>,
77 pub unique_keys: Vec<Vec<String>>,
78 pub normalize_columns: Option<serde_json::Value>,
79 pub mismatch: Option<serde_json::Value>,
80 pub schema_evolution: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Deserialize)]
84pub struct ManifestColumnDefForRun {
85 pub name: String,
86 pub column_type: String,
87 pub source: Option<String>,
88 pub nullable: Option<bool>,
89 pub unique: Option<bool>,
90 pub width: Option<u64>,
91 pub trim: Option<bool>,
92}
93
94pub fn config_from_manifest_json(json: &str) -> FloeResult<(crate::config::RootConfig, String)> {
97 let manifest: ManifestForRun =
98 serde_json::from_str(json).map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
99 Box::new(ConfigError(format!("manifest parse error: {err}")))
100 })?;
101
102 let storages = manifest
103 .storages
104 .as_ref()
105 .and_then(|v| serde_json::from_value::<StoragesConfig>(v.clone()).ok());
106 let catalogs = manifest
107 .catalogs
108 .as_ref()
109 .and_then(|v| serde_json::from_value::<CatalogsConfig>(v.clone()).ok());
110 let lineage = manifest
111 .lineage
112 .as_ref()
113 .and_then(|v| serde_json::from_value::<LineageConfig>(v.clone()).ok());
114
115 let entities = manifest
116 .entities
117 .iter()
118 .map(entity_from_manifest)
119 .collect::<FloeResult<Vec<_>>>()?;
120
121 let config = crate::config::RootConfig {
122 version: manifest.spec_version,
123 metadata: None,
124 storages,
125 catalogs,
126 env: None,
127 domains: Vec::new(),
128 report: None,
129 lineage,
130 entities,
131 };
132
133 Ok((config, manifest.report_base_uri))
134}
135
136fn entity_from_manifest(m: &ManifestEntityForRun) -> FloeResult<EntityConfig> {
137 let policy_severity = parse_policy_severity(m.policy_severity.as_deref().unwrap_or("warn"));
138 let write_mode = parse_write_mode(m.write_mode.as_deref().unwrap_or("overwrite"));
139 let incremental_mode = parse_incremental_mode(m.incremental_mode.as_deref().unwrap_or("none"));
140
141 let source_options: Option<SourceOptions> = m
142 .source
143 .options
144 .as_ref()
145 .and_then(|v| serde_json::from_value(v.clone()).ok());
146
147 let source = SourceConfig {
148 format: m.source.format.clone(),
149 path: m.source.path.clone(),
150 storage: if m.source.storage == "local" {
151 None
152 } else {
153 Some(m.source.storage.clone())
154 },
155 options: source_options,
156 cast_mode: m.source.cast_mode.clone(),
157 };
158
159 let accepted = sink_target_from_manifest(&m.sinks.accepted, write_mode);
160 let rejected = m
161 .sinks
162 .rejected
163 .as_ref()
164 .map(|t| sink_target_from_manifest(t, write_mode));
165 let archive = m.sinks.archive.as_ref().map(|a| ArchiveTarget {
166 path: a.path.clone(),
167 storage: if a.storage == "local" {
168 None
169 } else {
170 Some(a.storage.clone())
171 },
172 });
173
174 let schema = schema_from_manifest(&m.schema)?;
175 let pii: Option<PiiConfig> = m
176 .pii
177 .as_ref()
178 .and_then(|v| serde_json::from_value(v.clone()).ok());
179
180 let state = m.state_path.as_ref().map(|p| EntityStateConfig {
181 path: Some(p.clone()),
182 });
183
184 Ok(EntityConfig {
185 name: m.name.clone(),
186 metadata: None,
187 domain: m.domain.clone(),
188 incremental_mode,
189 state,
190 source,
191 sink: SinkConfig {
192 write_mode,
193 accepted,
194 rejected,
195 archive,
196 },
197 policy: PolicyConfig {
198 severity: policy_severity,
199 },
200 schema,
201 pii,
202 })
203}
204
205fn sink_target_from_manifest(
206 m: &ManifestSinkTargetForRun,
207 default_write_mode: WriteMode,
208) -> SinkTarget {
209 let write_mode = m
210 .write_mode
211 .as_deref()
212 .map(parse_write_mode)
213 .unwrap_or(default_write_mode);
214 let options: Option<SinkOptions> = m
215 .options
216 .as_ref()
217 .and_then(|v| serde_json::from_value(v.clone()).ok());
218 let merge: Option<MergeOptionsConfig> = m
219 .merge
220 .as_ref()
221 .and_then(|v| serde_json::from_value(v.clone()).ok());
222 let iceberg = m
223 .iceberg
224 .as_ref()
225 .and_then(|v| serde_json::from_value(v.clone()).ok());
226 let delta = m
227 .delta
228 .as_ref()
229 .and_then(|v| serde_json::from_value(v.clone()).ok());
230
231 SinkTarget {
232 format: m.format.clone(),
233 path: m.path.clone(),
234 storage: if m.storage == "local" {
235 None
236 } else {
237 Some(m.storage.clone())
238 },
239 options,
240 merge,
241 iceberg,
242 delta,
243 partition_by: m.partition_by.clone(),
244 partition_spec: None,
245 write_mode,
246 }
247}
248
249fn schema_from_manifest(m: &ManifestEntitySchemaForRun) -> FloeResult<SchemaConfig> {
250 let columns = m
251 .columns
252 .iter()
253 .map(|c| ColumnConfig {
254 name: c.name.clone(),
255 source: c.source.clone(),
256 column_type: c.column_type.clone(),
257 nullable: c.nullable,
258 unique: c.unique,
259 width: c.width,
260 trim: c.trim,
261 })
262 .collect();
263
264 let normalize_columns: Option<crate::config::NormalizeColumnsConfig> = m
265 .normalize_columns
266 .as_ref()
267 .and_then(|v| serde_json::from_value(v.clone()).ok());
268 let mismatch: Option<SchemaMismatchConfig> = m
269 .mismatch
270 .as_ref()
271 .and_then(|v| serde_json::from_value(v.clone()).ok());
272 let schema_evolution = m
273 .schema_evolution
274 .as_ref()
275 .and_then(|v| serde_json::from_value(v.clone()).ok());
276
277 Ok(SchemaConfig {
278 columns,
279 normalize_columns,
280 mismatch,
281 schema_evolution,
282 primary_key: if m.primary_key.is_empty() {
283 None
284 } else {
285 Some(m.primary_key.clone())
286 },
287 unique_keys: if m.unique_keys.is_empty() {
288 None
289 } else {
290 Some(m.unique_keys.clone())
291 },
292 })
293}
294
295fn parse_policy_severity(s: &str) -> PolicySeverity {
296 match s {
297 "reject" => PolicySeverity::Reject,
298 "abort" => PolicySeverity::Abort,
299 _ => PolicySeverity::Warn,
300 }
301}
302
303fn parse_write_mode(s: &str) -> WriteMode {
304 match s {
305 "append" => WriteMode::Append,
306 "merge_scd1" => WriteMode::MergeScd1,
307 "merge_scd2" => WriteMode::MergeScd2,
308 _ => WriteMode::Overwrite,
309 }
310}
311
312fn parse_incremental_mode(s: &str) -> IncrementalMode {
313 match s {
314 "archive" => IncrementalMode::Archive,
315 "file" => IncrementalMode::File,
316 "row" => IncrementalMode::Row,
317 _ => IncrementalMode::None,
318 }
319}