1use std::collections::BTreeMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use sha2::{Digest, Sha256};
5
6use crate::config::{ConfigLocation, RootConfig, SourceOptions, StorageResolver};
7use crate::manifest::model::{
8 CommonManifest, ManifestArchiveTarget, ManifestColumnDef, ManifestDomain, ManifestEntity,
9 ManifestEntitySchema, ManifestExecution, ManifestExecutionDefaults, ManifestResultContract,
10 ManifestRunnerAuth, ManifestRunnerDefinition, ManifestRunnerResources, ManifestRunnerSecret,
11 ManifestRunners, ManifestSinkTarget, ManifestSinks, ManifestSource,
12};
13use crate::profile::ProfileConfig;
14use crate::FloeResult;
15
16#[derive(Debug, Default, PartialEq)]
18pub enum PathMode {
19 #[default]
21 Default,
22 ResolvedUri,
26}
27
28#[derive(Debug, Default)]
30pub struct ManifestOptions {
31 pub deterministic: bool,
35 pub manifest_name: Option<String>,
38 pub profile_uri: Option<String>,
41 pub profile_path: Option<std::path::PathBuf>,
45 pub manifest_uri: Option<String>,
49 pub default_domain: Option<String>,
52 pub path_mode: PathMode,
54}
55
56#[derive(Debug)]
57struct ResolvedOrRaw {
58 storage: String,
59 uri: String,
60 resolved: bool,
61}
62
63pub fn build_common_manifest_json(
64 config_location: &ConfigLocation,
65 config: &RootConfig,
66 selected_entities: &[String],
67 profile: Option<&ProfileConfig>,
68 options: &ManifestOptions,
69) -> FloeResult<String> {
70 let resolver = StorageResolver::new(config, config_location.base.clone())?;
71 let mut manifest = build_common_manifest(
72 config_location,
73 config,
74 selected_entities,
75 &resolver,
76 profile,
77 options,
78 );
79
80 let revision = compute_manifest_revision(&manifest)?;
83 manifest.manifest_revision = Some(revision);
84
85 Ok(serde_json::to_string_pretty(&manifest)?)
86}
87
88fn compute_manifest_revision(manifest: &CommonManifest) -> FloeResult<String> {
91 let mut value: serde_json::Value = serde_json::to_value(manifest)?;
92 if let Some(obj) = value.as_object_mut() {
93 obj.remove("generated_at_ts_ms");
94 obj.remove("manifest_revision");
95 }
96 let canonical = serde_json::to_string(&value)?;
97 Ok(sha256_hex(canonical.as_bytes()))
98}
99
100fn sha256_hex(bytes: &[u8]) -> String {
101 format!("sha256:{:x}", Sha256::digest(bytes))
102}
103
104fn build_common_manifest(
105 config_location: &ConfigLocation,
106 config: &RootConfig,
107 selected_entities: &[String],
108 resolver: &StorageResolver,
109 profile: Option<&ProfileConfig>,
110 options: &ManifestOptions,
111) -> CommonManifest {
112 let mut entities: Vec<_> = if selected_entities.is_empty() {
113 config.entities.iter().collect()
114 } else {
115 config
116 .entities
117 .iter()
118 .filter(|entity| selected_entities.iter().any(|name| name == &entity.name))
119 .collect()
120 };
121 entities.sort_by(|left, right| left.name.cmp(&right.name));
122
123 let report_path = config
124 .report
125 .as_ref()
126 .map(|report| report.path.as_str())
127 .unwrap_or("report");
128 let report_storage = config
129 .report
130 .as_ref()
131 .and_then(|report| report.storage.as_deref());
132 let report_base = resolve_or_raw(
133 resolver,
134 "__manifest__",
135 "report.path",
136 report_storage,
137 report_path,
138 );
139
140 let mut manifest_entities = Vec::with_capacity(entities.len());
141 for entity in entities {
142 let source = resolve_or_raw(
143 resolver,
144 &entity.name,
145 "source.path",
146 entity.source.storage.as_deref(),
147 &entity.source.path,
148 );
149 let accepted = resolve_or_raw(
150 resolver,
151 &entity.name,
152 "sink.accepted.path",
153 entity.sink.accepted.storage.as_deref(),
154 &entity.sink.accepted.path,
155 );
156 let rejected = entity.sink.rejected.as_ref().map(|target| {
157 resolve_or_raw(
158 resolver,
159 &entity.name,
160 "sink.rejected.path",
161 target.storage.as_deref(),
162 &target.path,
163 )
164 });
165 let archive = entity.sink.archive.as_ref().map(|target| {
166 resolve_or_raw(
167 resolver,
168 &entity.name,
169 "sink.archive.path",
170 target.storage.as_deref(),
171 &target.path,
172 )
173 });
174
175 let effective_domain = entity.domain.as_ref().or(options.default_domain.as_ref());
176 let (asset_key, group_name, entity_domain) = if let Some(domain) = effective_domain {
177 (
178 vec![domain.clone(), entity.name.clone()],
179 domain.clone(),
180 Some(domain.clone()),
181 )
182 } else {
183 (
184 vec!["default".to_string(), entity.name.clone()],
185 "default".to_string(),
186 None,
187 )
188 };
189
190 let mut tags = BTreeMap::new();
191 if let Some(metadata) = &entity.metadata {
192 if let Some(owner) = &metadata.owner {
193 tags.insert("owner".to_string(), owner.clone());
194 }
195 if let Some(product) = &metadata.data_product {
196 tags.insert("data_product".to_string(), product.clone());
197 }
198 if let Some(domain_tag) = &metadata.domain {
199 tags.insert("domain".to_string(), domain_tag.clone());
200 }
201 }
202 let tags = if tags.is_empty() { None } else { Some(tags) };
203
204 let schema = ManifestEntitySchema {
205 columns: entity
206 .schema
207 .columns
208 .iter()
209 .map(|c| ManifestColumnDef {
210 name: c.name.clone(),
211 column_type: c.column_type.clone(),
212 source: c.source.clone(),
213 nullable: c.nullable,
214 unique: c.unique,
215 width: c.width,
216 trim: c.trim,
217 })
218 .collect(),
219 primary_key: entity.schema.primary_key.clone().unwrap_or_default(),
220 unique_keys: entity.schema.unique_keys.clone().unwrap_or_default(),
221 normalize_columns: entity
222 .schema
223 .normalize_columns
224 .as_ref()
225 .and_then(|v| serde_json::to_value(v).ok()),
226 mismatch: entity
227 .schema
228 .mismatch
229 .as_ref()
230 .and_then(|v| serde_json::to_value(v).ok()),
231 schema_evolution: entity
232 .schema
233 .schema_evolution
234 .as_ref()
235 .and_then(|v| serde_json::to_value(v).ok()),
236 };
237
238 let pii = entity
239 .pii
240 .as_ref()
241 .and_then(|v| serde_json::to_value(v).ok());
242
243 let source_path = if options.path_mode == PathMode::ResolvedUri && source.resolved {
244 resolved_uri_to_path(&source.uri)
245 } else {
246 entity.source.path.clone()
247 };
248 let accepted_path = if options.path_mode == PathMode::ResolvedUri && accepted.resolved {
249 resolved_uri_to_path(&accepted.uri)
250 } else {
251 entity.sink.accepted.path.clone()
252 };
253
254 manifest_entities.push(ManifestEntity {
255 name: entity.name.clone(),
256 domain: entity_domain,
257 group_name,
258 asset_key,
259 source_format: entity.source.format.clone(),
260 accepted_sink_uri: accepted.uri.clone(),
261 rejected_sink_uri: rejected.as_ref().map(|value| value.uri.clone()),
262 tags,
263 source: ManifestSource {
264 format: entity.source.format.clone(),
265 storage: source.storage,
266 uri: source.uri,
267 path: source_path,
268 resolved: source.resolved,
269 cast_mode: entity.source.cast_mode.clone(),
270 options: map_source_options(entity.source.options.as_ref()),
271 },
272 sinks: ManifestSinks {
273 accepted: ManifestSinkTarget {
274 format: entity.sink.accepted.format.clone(),
275 storage: accepted.storage,
276 uri: accepted.uri,
277 path: accepted_path,
278 resolved: accepted.resolved,
279 options: entity
280 .sink
281 .accepted
282 .options
283 .as_ref()
284 .and_then(|v| serde_json::to_value(v).ok()),
285 partition_by: entity.sink.accepted.partition_by.clone(),
286 merge: entity
287 .sink
288 .accepted
289 .merge
290 .as_ref()
291 .and_then(|v| serde_json::to_value(v).ok()),
292 iceberg: entity
293 .sink
294 .accepted
295 .iceberg
296 .as_ref()
297 .and_then(|v| serde_json::to_value(v).ok()),
298 delta: entity
299 .sink
300 .accepted
301 .delta
302 .as_ref()
303 .and_then(|v| serde_json::to_value(v).ok()),
304 },
305 rejected: rejected.map(|value| {
306 let rej = entity.sink.rejected.as_ref();
307 let rej_raw_path = rej.map(|t| t.path.clone()).unwrap_or_default();
308 let rej_path = if options.path_mode == PathMode::ResolvedUri && value.resolved {
309 resolved_uri_to_path(&value.uri)
310 } else {
311 rej_raw_path
312 };
313 ManifestSinkTarget {
314 format: rej
315 .map(|t| t.format.clone())
316 .unwrap_or_else(|| "csv".to_string()),
317 storage: value.storage,
318 uri: value.uri,
319 path: rej_path,
320 resolved: value.resolved,
321 options: rej
322 .and_then(|t| t.options.as_ref())
323 .and_then(|v| serde_json::to_value(v).ok()),
324 partition_by: rej.and_then(|t| t.partition_by.clone()),
325 merge: rej
326 .and_then(|t| t.merge.as_ref())
327 .and_then(|v| serde_json::to_value(v).ok()),
328 iceberg: rej
329 .and_then(|t| t.iceberg.as_ref())
330 .and_then(|v| serde_json::to_value(v).ok()),
331 delta: rej
332 .and_then(|t| t.delta.as_ref())
333 .and_then(|v| serde_json::to_value(v).ok()),
334 }
335 }),
336 archive: archive.map(|value| {
337 let arc_raw_path = entity
338 .sink
339 .archive
340 .as_ref()
341 .map(|target| target.path.clone())
342 .unwrap_or_default();
343 let arc_path = if options.path_mode == PathMode::ResolvedUri && value.resolved {
344 resolved_uri_to_path(&value.uri)
345 } else {
346 arc_raw_path
347 };
348 ManifestArchiveTarget {
349 storage: value.storage,
350 uri: value.uri,
351 path: arc_path,
352 resolved: value.resolved,
353 }
354 }),
355 },
356 runner: None,
357 policy_severity: entity.policy.severity.as_str().to_string(),
358 write_mode: entity.sink.write_mode.as_str().to_string(),
359 incremental_mode: entity.incremental_mode.as_str().to_string(),
360 schema,
361 pii,
362 state_path: entity.state.as_ref().and_then(|s| s.path.clone()),
363 });
364 }
365
366 let config_uri = canonical_config_uri(&config_location.display);
367 let config_checksum = std::fs::read(&config_location.path)
368 .ok()
369 .map(|b| sha256_hex(&b));
370
371 let profile_checksum = options
372 .profile_path
373 .as_ref()
374 .and_then(|p| std::fs::read(p).ok())
375 .map(|b| sha256_hex(&b));
376
377 let generated_at_ts_ms = if options.deterministic {
378 0
379 } else {
380 now_ts_ms()
381 };
382
383 let storages = profile
385 .and_then(|p| p.storages.as_ref())
386 .and_then(|v| serde_json::to_value(v).ok());
387 let catalogs = profile
388 .and_then(|p| p.catalogs.as_ref())
389 .and_then(|v| serde_json::to_value(v).ok());
390 let lineage = profile
391 .and_then(|p| p.lineage.as_ref())
392 .and_then(|v| serde_json::to_value(v).ok());
393
394 CommonManifest {
395 schema: "floe.manifest.v1",
396 generated_at_ts_ms,
397 floe_version: env!("CARGO_PKG_VERSION"),
398 spec_version: config.version.clone(),
399 manifest_name: options.manifest_name.clone(),
400 manifest_id: build_manifest_id(&config_uri, config_checksum.as_deref()),
401 manifest_revision: None,
402 config_uri,
403 config_checksum,
404 profile_uri: options.profile_uri.clone(),
405 profile_checksum,
406 report_base_uri: report_base.uri,
407 domains: config
408 .domains
409 .iter()
410 .map(|domain| ManifestDomain {
411 name: domain.name.clone(),
412 incoming_dir: domain
413 .resolved_incoming_dir
414 .clone()
415 .unwrap_or_else(|| domain.incoming_dir.clone()),
416 })
417 .collect(),
418 execution: default_execution_contract(options),
419 runners: runners_contract(profile),
420 entities: manifest_entities,
421 storages,
422 catalogs,
423 lineage,
424 }
425}
426
427fn resolve_or_raw(
428 resolver: &StorageResolver,
429 entity_name: &str,
430 field: &str,
431 storage_name: Option<&str>,
432 raw_path: &str,
433) -> ResolvedOrRaw {
434 match resolver.resolve_path(entity_name, field, storage_name, raw_path) {
435 Ok(resolved) => ResolvedOrRaw {
436 storage: resolved.storage,
437 uri: resolved.uri,
438 resolved: true,
439 },
440 Err(_) => ResolvedOrRaw {
441 storage: storage_name.unwrap_or("local").to_string(),
442 uri: raw_path.to_string(),
443 resolved: false,
444 },
445 }
446}
447
448fn resolved_uri_to_path(uri: &str) -> String {
453 if let Some(path) = uri.strip_prefix("local://") {
454 path.to_string()
455 } else {
456 uri.to_string()
457 }
458}
459
460fn canonical_config_uri(display: &str) -> String {
461 if display.contains("://") {
462 display.to_string()
463 } else {
464 format!("local://{}", display)
465 }
466}
467
468fn build_manifest_id(config_uri: &str, config_checksum: Option<&str>) -> String {
469 const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
470 const FNV_PRIME: u64 = 0x100000001b3;
471
472 let mut hash = FNV_OFFSET_BASIS;
473 hash = fnv1a_update(hash, config_uri.as_bytes(), FNV_PRIME);
474 hash = fnv1a_update(hash, &[0], FNV_PRIME);
475 hash = fnv1a_update(hash, config_checksum.unwrap_or("").as_bytes(), FNV_PRIME);
476
477 format!("mfv1-{hash:016x}")
478}
479
480fn fnv1a_update(mut hash: u64, bytes: &[u8], prime: u64) -> u64 {
481 for byte in bytes {
482 hash ^= u64::from(*byte);
483 hash = hash.wrapping_mul(prime);
484 }
485 hash
486}
487
488fn map_source_options(options: Option<&SourceOptions>) -> Option<serde_json::Value> {
489 let options = options?;
490 let mut map = serde_json::Map::new();
491 map.insert("header".to_string(), serde_json::json!(options.header));
492 map.insert(
493 "separator".to_string(),
494 serde_json::json!(options.separator),
495 );
496 map.insert("encoding".to_string(), serde_json::json!(options.encoding));
497 map.insert(
498 "null_values".to_string(),
499 serde_json::json!(options.null_values),
500 );
501 map.insert(
502 "recursive".to_string(),
503 serde_json::json!(options.recursive),
504 );
505 map.insert("glob".to_string(), serde_json::json!(options.glob));
506 map.insert(
507 "json_mode".to_string(),
508 serde_json::json!(options.json_mode),
509 );
510 map.insert("sheet".to_string(), serde_json::json!(options.sheet));
511 map.insert(
512 "header_row".to_string(),
513 serde_json::json!(options.header_row),
514 );
515 map.insert("data_row".to_string(), serde_json::json!(options.data_row));
516 map.insert("row_tag".to_string(), serde_json::json!(options.row_tag));
517 map.insert(
518 "namespace".to_string(),
519 serde_json::json!(options.namespace),
520 );
521 map.insert(
522 "value_tag".to_string(),
523 serde_json::json!(options.value_tag),
524 );
525 Some(serde_json::Value::Object(map))
526}
527
528fn default_execution_contract(options: &ManifestOptions) -> ManifestExecution {
529 let mut exit_codes = BTreeMap::new();
530 exit_codes.insert("0", "success_or_rejected");
531 exit_codes.insert("1", "technical_failure");
532 exit_codes.insert("2", "aborted");
533
534 const PLACEHOLDER: &str = "{manifest_uri}";
535 let base_args = [
536 "run",
537 "--manifest",
538 PLACEHOLDER,
539 "--log-format",
540 "json",
541 "--quiet",
542 ]
543 .iter()
544 .map(|&a| {
545 if a == PLACEHOLDER {
546 options
547 .manifest_uri
548 .as_deref()
549 .unwrap_or(PLACEHOLDER)
550 .to_string()
551 } else {
552 a.to_string()
553 }
554 })
555 .collect();
556
557 ManifestExecution {
558 entrypoint: "floe",
559 base_args,
560 per_entity_args: vec!["--entities".to_string(), "{entity_name}".to_string()],
561 log_format: "json",
562 result_contract: ManifestResultContract {
563 run_finished_event: true,
564 summary_uri_field: "summary_uri",
565 exit_codes,
566 },
567 defaults: ManifestExecutionDefaults {
568 env: BTreeMap::new(),
569 workdir: None,
570 },
571 }
572}
573
574fn runners_contract(profile: Option<&ProfileConfig>) -> ManifestRunners {
575 let profile_runner_type = profile
576 .and_then(|p| p.execution.as_ref())
577 .map(|e| e.runner.runner_type.as_str());
578
579 match profile_runner_type {
580 Some("kubernetes_job") => {
581 let profile_runner = profile
582 .and_then(|p| p.execution.as_ref())
583 .map(|e| &e.runner);
584 let mut definitions = BTreeMap::new();
585 definitions.insert(
586 "default",
587 ManifestRunnerDefinition {
588 runner_type: "kubernetes_job",
589 command: profile_runner.and_then(|r| r.command.clone()),
590 args: profile_runner.and_then(|r| r.args.clone()),
591 timeout_seconds: profile_runner.and_then(|r| r.timeout_seconds),
592 ttl_seconds_after_finished: profile_runner
593 .and_then(|r| r.ttl_seconds_after_finished),
594 poll_interval_seconds: profile_runner.and_then(|r| r.poll_interval_seconds),
595 secrets: profile_runner.and_then(|r| {
596 r.secrets.as_ref().map(|secrets| {
597 secrets
598 .iter()
599 .map(|s| ManifestRunnerSecret {
600 name: s.name.clone(),
601 secret_name: s.secret_name.clone(),
602 key: s.key.clone(),
603 })
604 .collect()
605 })
606 }),
607 image: profile_runner.and_then(|r| r.image.clone()),
608 namespace: profile_runner.and_then(|r| r.namespace.clone()),
609 service_account: profile_runner.and_then(|r| r.service_account.clone()),
610 resources: profile_runner.and_then(|r| {
611 r.resources.as_ref().map(|res| ManifestRunnerResources {
612 cpu: res.cpu.clone(),
613 memory_mb: res.memory_mb,
614 })
615 }),
616 env: profile_runner.and_then(|r| {
617 r.env
618 .as_ref()
619 .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
620 }),
621 workspace_url: None,
622 existing_cluster_id: None,
623 config_uri: None,
624 python_file_uri: None,
625 job_name: None,
626 auth: None,
627 env_parameters: None,
628 },
629 );
630 ManifestRunners {
631 default: "default",
632 definitions,
633 }
634 }
635 Some("databricks_job") => {
636 let profile_runner = profile
637 .and_then(|p| p.execution.as_ref())
638 .map(|e| &e.runner);
639 let mut definitions = BTreeMap::new();
640 definitions.insert(
641 "default",
642 ManifestRunnerDefinition {
643 runner_type: "databricks_job",
644 command: profile_runner.and_then(|r| r.command.clone()),
645 args: profile_runner.and_then(|r| r.args.clone()),
646 timeout_seconds: profile_runner.and_then(|r| r.timeout_seconds),
647 ttl_seconds_after_finished: None,
648 poll_interval_seconds: profile_runner.and_then(|r| r.poll_interval_seconds),
649 secrets: None,
650 image: None,
651 namespace: None,
652 service_account: None,
653 resources: None,
654 env: None,
655 workspace_url: profile_runner.and_then(|r| r.workspace_url.clone()),
656 existing_cluster_id: profile_runner.and_then(|r| r.existing_cluster_id.clone()),
657 config_uri: profile_runner.and_then(|r| r.config_uri.clone()),
658 python_file_uri: profile_runner.and_then(|r| r.python_file_uri.clone()),
659 job_name: profile_runner
660 .and_then(|r| r.job_name.clone())
661 .or_else(|| Some("floe-{domain}-{env}".to_string())),
662 auth: profile_runner.and_then(|r| {
663 r.auth.as_ref().map(|auth| ManifestRunnerAuth {
664 service_principal_oauth_ref: auth.service_principal_oauth_ref.clone(),
665 })
666 }),
667 env_parameters: profile_runner.and_then(|r| {
668 r.env_parameters
669 .as_ref()
670 .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
671 }),
672 },
673 );
674 ManifestRunners {
675 default: "default",
676 definitions,
677 }
678 }
679 _ => {
681 let mut definitions = BTreeMap::new();
682 definitions.insert(
683 "local",
684 ManifestRunnerDefinition {
685 runner_type: "local_process",
686 command: None,
687 args: None,
688 timeout_seconds: None,
689 ttl_seconds_after_finished: None,
690 poll_interval_seconds: None,
691 secrets: None,
692 image: None,
693 namespace: None,
694 service_account: None,
695 resources: None,
696 env: None,
697 workspace_url: None,
698 existing_cluster_id: None,
699 config_uri: None,
700 python_file_uri: None,
701 job_name: None,
702 auth: None,
703 env_parameters: None,
704 },
705 );
706 ManifestRunners {
707 default: "local",
708 definitions,
709 }
710 }
711 }
712}
713
714fn now_ts_ms() -> u64 {
715 SystemTime::now()
716 .duration_since(UNIX_EPOCH)
717 .map(|duration| duration.as_millis() as u64)
718 .unwrap_or(0)
719}