Skip to main content

floe_core/manifest/
reconstruct.rs

1use serde::Deserialize;
2
3use crate::config::{
4    ArchiveTarget, CatalogsConfig, ColumnConfig, EntityConfig, EntityStateConfig, IncrementalMode,
5    LineageConfig, MergeOptionsConfig, PiiConfig, PolicyConfig, PolicySeverity, SchemaConfig,
6    SchemaMismatchConfig, SinkConfig, SinkOptions, SinkTarget, SourceConfig, SourceOptions,
7    StoragesConfig, WriteMode,
8};
9use crate::{ConfigError, FloeResult};
10
11// Minimal deserializable mirror of CommonManifest — only the fields needed to reconstruct
12// a RootConfig and run an entity.
13#[derive(Debug, Deserialize)]
14pub struct ManifestForRun {
15    pub spec_version: String,
16    pub report_base_uri: String,
17    pub entities: Vec<ManifestEntityForRun>,
18    pub storages: Option<serde_json::Value>,
19    pub catalogs: Option<serde_json::Value>,
20    pub lineage: Option<serde_json::Value>,
21}
22
23#[derive(Debug, Deserialize)]
24pub struct ManifestEntityForRun {
25    pub name: String,
26    pub domain: Option<String>,
27    pub source: ManifestSourceForRun,
28    pub sinks: ManifestSinksForRun,
29    pub policy_severity: Option<String>,
30    pub write_mode: Option<String>,
31    pub incremental_mode: Option<String>,
32    pub schema: ManifestEntitySchemaForRun,
33    pub pii: Option<serde_json::Value>,
34    pub state_path: Option<String>,
35}
36
37#[derive(Debug, Deserialize)]
38pub struct ManifestSourceForRun {
39    pub format: String,
40    pub storage: String,
41    pub uri: String,
42    pub path: String,
43    pub cast_mode: Option<String>,
44    pub options: Option<serde_json::Value>,
45}
46
47#[derive(Debug, Deserialize)]
48pub struct ManifestSinksForRun {
49    pub accepted: ManifestSinkTargetForRun,
50    pub rejected: Option<ManifestSinkTargetForRun>,
51    pub archive: Option<ManifestArchiveTargetForRun>,
52}
53
54#[derive(Debug, Deserialize)]
55pub struct ManifestSinkTargetForRun {
56    pub format: String,
57    pub storage: String,
58    pub path: String,
59    pub options: Option<serde_json::Value>,
60    pub partition_by: Option<Vec<String>>,
61    pub merge: Option<serde_json::Value>,
62    pub iceberg: Option<serde_json::Value>,
63    pub delta: Option<serde_json::Value>,
64    pub write_mode: Option<String>,
65}
66
67#[derive(Debug, Deserialize)]
68pub struct ManifestArchiveTargetForRun {
69    pub storage: String,
70    pub path: String,
71}
72
73#[derive(Debug, Deserialize)]
74pub struct ManifestEntitySchemaForRun {
75    pub columns: Vec<ManifestColumnDefForRun>,
76    pub primary_key: Vec<String>,
77    pub unique_keys: Vec<Vec<String>>,
78    pub normalize_columns: Option<serde_json::Value>,
79    pub mismatch: Option<serde_json::Value>,
80    pub schema_evolution: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Deserialize)]
84pub struct ManifestColumnDefForRun {
85    pub name: String,
86    pub column_type: String,
87    pub source: Option<String>,
88    pub nullable: Option<bool>,
89    pub unique: Option<bool>,
90    pub width: Option<u64>,
91    pub trim: Option<bool>,
92}
93
94/// Parse a manifest JSON string and reconstruct a minimal RootConfig.
95/// Returns (config, report_base_uri).
96pub fn config_from_manifest_json(json: &str) -> FloeResult<(crate::config::RootConfig, String)> {
97    let manifest: ManifestForRun =
98        serde_json::from_str(json).map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
99            Box::new(ConfigError(format!("manifest parse error: {err}")))
100        })?;
101
102    let storages = manifest
103        .storages
104        .as_ref()
105        .and_then(|v| serde_json::from_value::<StoragesConfig>(v.clone()).ok());
106    let catalogs = manifest
107        .catalogs
108        .as_ref()
109        .and_then(|v| serde_json::from_value::<CatalogsConfig>(v.clone()).ok());
110    let lineage = manifest
111        .lineage
112        .as_ref()
113        .and_then(|v| serde_json::from_value::<LineageConfig>(v.clone()).ok());
114
115    let entities = manifest
116        .entities
117        .iter()
118        .map(entity_from_manifest)
119        .collect::<FloeResult<Vec<_>>>()?;
120
121    let config = crate::config::RootConfig {
122        version: manifest.spec_version,
123        metadata: None,
124        storages,
125        catalogs,
126        env: None,
127        domains: Vec::new(),
128        report: None,
129        lineage,
130        entities,
131    };
132
133    Ok((config, manifest.report_base_uri))
134}
135
136fn entity_from_manifest(m: &ManifestEntityForRun) -> FloeResult<EntityConfig> {
137    let policy_severity = parse_policy_severity(m.policy_severity.as_deref().unwrap_or("warn"));
138    let write_mode = parse_write_mode(m.write_mode.as_deref().unwrap_or("overwrite"));
139    let incremental_mode = parse_incremental_mode(m.incremental_mode.as_deref().unwrap_or("none"));
140
141    let source_options: Option<SourceOptions> = m
142        .source
143        .options
144        .as_ref()
145        .and_then(|v| serde_json::from_value(v.clone()).ok());
146
147    let source = SourceConfig {
148        format: m.source.format.clone(),
149        path: m.source.path.clone(),
150        storage: if m.source.storage == "local" {
151            None
152        } else {
153            Some(m.source.storage.clone())
154        },
155        options: source_options,
156        cast_mode: m.source.cast_mode.clone(),
157    };
158
159    let accepted = sink_target_from_manifest(&m.sinks.accepted, write_mode);
160    let rejected = m
161        .sinks
162        .rejected
163        .as_ref()
164        .map(|t| sink_target_from_manifest(t, write_mode));
165    let archive = m.sinks.archive.as_ref().map(|a| ArchiveTarget {
166        path: a.path.clone(),
167        storage: if a.storage == "local" {
168            None
169        } else {
170            Some(a.storage.clone())
171        },
172    });
173
174    let schema = schema_from_manifest(&m.schema)?;
175    let pii: Option<PiiConfig> = m
176        .pii
177        .as_ref()
178        .and_then(|v| serde_json::from_value(v.clone()).ok());
179
180    let state = m.state_path.as_ref().map(|p| EntityStateConfig {
181        path: Some(p.clone()),
182    });
183
184    Ok(EntityConfig {
185        name: m.name.clone(),
186        metadata: None,
187        domain: m.domain.clone(),
188        incremental_mode,
189        state,
190        source,
191        sink: SinkConfig {
192            write_mode,
193            accepted,
194            rejected,
195            archive,
196        },
197        policy: PolicyConfig {
198            severity: policy_severity,
199        },
200        schema,
201        pii,
202    })
203}
204
205fn sink_target_from_manifest(
206    m: &ManifestSinkTargetForRun,
207    default_write_mode: WriteMode,
208) -> SinkTarget {
209    let write_mode = m
210        .write_mode
211        .as_deref()
212        .map(parse_write_mode)
213        .unwrap_or(default_write_mode);
214    let options: Option<SinkOptions> = m
215        .options
216        .as_ref()
217        .and_then(|v| serde_json::from_value(v.clone()).ok());
218    let merge: Option<MergeOptionsConfig> = m
219        .merge
220        .as_ref()
221        .and_then(|v| serde_json::from_value(v.clone()).ok());
222    let iceberg = m
223        .iceberg
224        .as_ref()
225        .and_then(|v| serde_json::from_value(v.clone()).ok());
226    let delta = m
227        .delta
228        .as_ref()
229        .and_then(|v| serde_json::from_value(v.clone()).ok());
230
231    SinkTarget {
232        format: m.format.clone(),
233        path: m.path.clone(),
234        storage: if m.storage == "local" {
235            None
236        } else {
237            Some(m.storage.clone())
238        },
239        options,
240        merge,
241        iceberg,
242        delta,
243        partition_by: m.partition_by.clone(),
244        partition_spec: None,
245        write_mode,
246    }
247}
248
249fn schema_from_manifest(m: &ManifestEntitySchemaForRun) -> FloeResult<SchemaConfig> {
250    let columns = m
251        .columns
252        .iter()
253        .map(|c| ColumnConfig {
254            name: c.name.clone(),
255            source: c.source.clone(),
256            column_type: c.column_type.clone(),
257            nullable: c.nullable,
258            unique: c.unique,
259            width: c.width,
260            trim: c.trim,
261        })
262        .collect();
263
264    let normalize_columns: Option<crate::config::NormalizeColumnsConfig> = m
265        .normalize_columns
266        .as_ref()
267        .and_then(|v| serde_json::from_value(v.clone()).ok());
268    let mismatch: Option<SchemaMismatchConfig> = m
269        .mismatch
270        .as_ref()
271        .and_then(|v| serde_json::from_value(v.clone()).ok());
272    let schema_evolution = m
273        .schema_evolution
274        .as_ref()
275        .and_then(|v| serde_json::from_value(v.clone()).ok());
276
277    Ok(SchemaConfig {
278        columns,
279        normalize_columns,
280        mismatch,
281        schema_evolution,
282        primary_key: if m.primary_key.is_empty() {
283            None
284        } else {
285            Some(m.primary_key.clone())
286        },
287        unique_keys: if m.unique_keys.is_empty() {
288            None
289        } else {
290            Some(m.unique_keys.clone())
291        },
292    })
293}
294
295fn parse_policy_severity(s: &str) -> PolicySeverity {
296    match s {
297        "reject" => PolicySeverity::Reject,
298        "abort" => PolicySeverity::Abort,
299        _ => PolicySeverity::Warn,
300    }
301}
302
303fn parse_write_mode(s: &str) -> WriteMode {
304    match s {
305        "append" => WriteMode::Append,
306        "merge_scd1" => WriteMode::MergeScd1,
307        "merge_scd2" => WriteMode::MergeScd2,
308        _ => WriteMode::Overwrite,
309    }
310}
311
312fn parse_incremental_mode(s: &str) -> IncrementalMode {
313    match s {
314        "archive" => IncrementalMode::Archive,
315        "file" => IncrementalMode::File,
316        "row" => IncrementalMode::Row,
317        _ => IncrementalMode::None,
318    }
319}