use serde::Deserialize;
use crate::config::{
ArchiveTarget, CatalogsConfig, ColumnConfig, EntityConfig, EntityStateConfig, IncrementalMode,
LineageConfig, MergeOptionsConfig, PiiConfig, PolicyConfig, PolicySeverity, SchemaConfig,
SchemaMismatchConfig, SinkConfig, SinkOptions, SinkTarget, SourceConfig, SourceOptions,
StoragesConfig, WriteMode,
};
use crate::{ConfigError, FloeResult};
#[derive(Debug, Deserialize)]
pub struct ManifestForRun {
pub spec_version: String,
pub report_base_uri: String,
pub entities: Vec<ManifestEntityForRun>,
pub storages: Option<serde_json::Value>,
pub catalogs: Option<serde_json::Value>,
pub lineage: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
pub struct ManifestEntityForRun {
pub name: String,
pub domain: Option<String>,
pub source: ManifestSourceForRun,
pub sinks: ManifestSinksForRun,
pub policy_severity: Option<String>,
pub write_mode: Option<String>,
pub incremental_mode: Option<String>,
pub schema: ManifestEntitySchemaForRun,
pub pii: Option<serde_json::Value>,
pub state_path: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ManifestSourceForRun {
pub format: String,
pub storage: String,
pub uri: String,
pub path: String,
pub cast_mode: Option<String>,
pub options: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
pub struct ManifestSinksForRun {
pub accepted: ManifestSinkTargetForRun,
pub rejected: Option<ManifestSinkTargetForRun>,
pub archive: Option<ManifestArchiveTargetForRun>,
}
#[derive(Debug, Deserialize)]
pub struct ManifestSinkTargetForRun {
pub format: String,
pub storage: String,
pub path: String,
pub options: Option<serde_json::Value>,
pub partition_by: Option<Vec<String>>,
pub merge: Option<serde_json::Value>,
pub iceberg: Option<serde_json::Value>,
pub delta: Option<serde_json::Value>,
pub write_mode: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ManifestArchiveTargetForRun {
pub storage: String,
pub path: String,
}
#[derive(Debug, Deserialize)]
pub struct ManifestEntitySchemaForRun {
pub columns: Vec<ManifestColumnDefForRun>,
pub primary_key: Vec<String>,
pub unique_keys: Vec<Vec<String>>,
pub normalize_columns: Option<serde_json::Value>,
pub mismatch: Option<serde_json::Value>,
pub schema_evolution: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
pub struct ManifestColumnDefForRun {
pub name: String,
pub column_type: String,
pub source: Option<String>,
pub nullable: Option<bool>,
pub unique: Option<bool>,
pub width: Option<u64>,
pub trim: Option<bool>,
}
pub fn config_from_manifest_json(json: &str) -> FloeResult<(crate::config::RootConfig, String)> {
let manifest: ManifestForRun =
serde_json::from_str(json).map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
Box::new(ConfigError(format!("manifest parse error: {err}")))
})?;
let storages = manifest
.storages
.as_ref()
.and_then(|v| serde_json::from_value::<StoragesConfig>(v.clone()).ok());
let catalogs = manifest
.catalogs
.as_ref()
.and_then(|v| serde_json::from_value::<CatalogsConfig>(v.clone()).ok());
let lineage = manifest
.lineage
.as_ref()
.and_then(|v| serde_json::from_value::<LineageConfig>(v.clone()).ok());
let entities = manifest
.entities
.iter()
.map(entity_from_manifest)
.collect::<FloeResult<Vec<_>>>()?;
let config = crate::config::RootConfig {
version: manifest.spec_version,
metadata: None,
storages,
catalogs,
env: None,
domains: Vec::new(),
report: None,
lineage,
entities,
};
Ok((config, manifest.report_base_uri))
}
fn entity_from_manifest(m: &ManifestEntityForRun) -> FloeResult<EntityConfig> {
let policy_severity = parse_policy_severity(m.policy_severity.as_deref().unwrap_or("warn"));
let write_mode = parse_write_mode(m.write_mode.as_deref().unwrap_or("overwrite"));
let incremental_mode = parse_incremental_mode(m.incremental_mode.as_deref().unwrap_or("none"));
let source_options: Option<SourceOptions> = m
.source
.options
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let source = SourceConfig {
format: m.source.format.clone(),
path: m.source.path.clone(),
storage: if m.source.storage == "local" {
None
} else {
Some(m.source.storage.clone())
},
options: source_options,
cast_mode: m.source.cast_mode.clone(),
};
let accepted = sink_target_from_manifest(&m.sinks.accepted, write_mode);
let rejected = m
.sinks
.rejected
.as_ref()
.map(|t| sink_target_from_manifest(t, write_mode));
let archive = m.sinks.archive.as_ref().map(|a| ArchiveTarget {
path: a.path.clone(),
storage: if a.storage == "local" {
None
} else {
Some(a.storage.clone())
},
});
let schema = schema_from_manifest(&m.schema)?;
let pii: Option<PiiConfig> = m
.pii
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let state = m.state_path.as_ref().map(|p| EntityStateConfig {
path: Some(p.clone()),
});
Ok(EntityConfig {
name: m.name.clone(),
metadata: None,
domain: m.domain.clone(),
incremental_mode,
state,
source,
sink: SinkConfig {
write_mode,
accepted,
rejected,
archive,
},
policy: PolicyConfig {
severity: policy_severity,
},
schema,
pii,
})
}
fn sink_target_from_manifest(
m: &ManifestSinkTargetForRun,
default_write_mode: WriteMode,
) -> SinkTarget {
let write_mode = m
.write_mode
.as_deref()
.map(parse_write_mode)
.unwrap_or(default_write_mode);
let options: Option<SinkOptions> = m
.options
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let merge: Option<MergeOptionsConfig> = m
.merge
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let iceberg = m
.iceberg
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let delta = m
.delta
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
SinkTarget {
format: m.format.clone(),
path: m.path.clone(),
storage: if m.storage == "local" {
None
} else {
Some(m.storage.clone())
},
options,
merge,
iceberg,
delta,
partition_by: m.partition_by.clone(),
partition_spec: None,
write_mode,
}
}
fn schema_from_manifest(m: &ManifestEntitySchemaForRun) -> FloeResult<SchemaConfig> {
let columns = m
.columns
.iter()
.map(|c| ColumnConfig {
name: c.name.clone(),
source: c.source.clone(),
column_type: c.column_type.clone(),
nullable: c.nullable,
unique: c.unique,
width: c.width,
trim: c.trim,
})
.collect();
let normalize_columns: Option<crate::config::NormalizeColumnsConfig> = m
.normalize_columns
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let mismatch: Option<SchemaMismatchConfig> = m
.mismatch
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let schema_evolution = m
.schema_evolution
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
Ok(SchemaConfig {
columns,
normalize_columns,
mismatch,
schema_evolution,
primary_key: if m.primary_key.is_empty() {
None
} else {
Some(m.primary_key.clone())
},
unique_keys: if m.unique_keys.is_empty() {
None
} else {
Some(m.unique_keys.clone())
},
})
}
fn parse_policy_severity(s: &str) -> PolicySeverity {
match s {
"reject" => PolicySeverity::Reject,
"abort" => PolicySeverity::Abort,
_ => PolicySeverity::Warn,
}
}
fn parse_write_mode(s: &str) -> WriteMode {
match s {
"append" => WriteMode::Append,
"merge_scd1" => WriteMode::MergeScd1,
"merge_scd2" => WriteMode::MergeScd2,
_ => WriteMode::Overwrite,
}
}
fn parse_incremental_mode(s: &str) -> IncrementalMode {
match s {
"archive" => IncrementalMode::Archive,
"file" => IncrementalMode::File,
"row" => IncrementalMode::Row,
_ => IncrementalMode::None,
}
}