Skip to main content

floe_core/manifest/
builder.rs

1use std::collections::BTreeMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use sha2::{Digest, Sha256};
5
6use crate::config::{ConfigLocation, RootConfig, SourceOptions, StorageResolver};
7use crate::manifest::model::{
8    CommonManifest, ManifestArchiveTarget, ManifestColumnDef, ManifestDomain, ManifestEntity,
9    ManifestEntitySchema, ManifestExecution, ManifestExecutionDefaults, ManifestResultContract,
10    ManifestRunnerAuth, ManifestRunnerDefinition, ManifestRunnerResources, ManifestRunnerSecret,
11    ManifestRunners, ManifestSinkTarget, ManifestSinks, ManifestSource,
12};
13use crate::profile::ProfileConfig;
14use crate::FloeResult;
15
16/// Controls how the `path` field is populated in source and sink entries.
17#[derive(Debug, Default, PartialEq)]
18pub enum PathMode {
19    /// Keep the original relative path from the config file (default).
20    #[default]
21    Default,
22    /// When a path has been resolved to a full URI, set `path = uri`.
23    /// This makes the manifest self-contained for remote replay without
24    /// needing the original config base directory.
25    ResolvedUri,
26}
27
28/// Options that control manifest generation behaviour.
29#[derive(Debug, Default)]
30pub struct ManifestOptions {
31    /// When true, `generated_at_ts_ms` is set to `0` so that the same inputs
32    /// always produce byte-identical output. Use this for committed deployment
33    /// artifacts that are diffed in CI.
34    pub deterministic: bool,
35    /// Optional stable logical name for this manifest (e.g. `"sales.prod"`).
36    /// Stored as `manifest_name` in the output JSON.
37    pub manifest_name: Option<String>,
38    /// Display URI of the profile file (e.g. `local:///path/to/prod.yml`).
39    /// Stored as `profile_uri` in the output JSON.
40    pub profile_uri: Option<String>,
41    /// Local filesystem path to the profile file used during generation.
42    /// When supplied, a SHA-256 checksum of the file is computed and stored
43    /// as `profile_checksum`.
44    pub profile_path: Option<std::path::PathBuf>,
45    /// When set, replaces the `{manifest_uri}` placeholder in
46    /// `execution.base_args` with this URI. Use this to bake the deployed
47    /// manifest location into a self-contained deployment contract.
48    pub manifest_uri: Option<String>,
49    /// Default domain applied to entities that have no explicit `domain`
50    /// field. Drives `domain`, `group_name`, and `asset_key` generation.
51    pub default_domain: Option<String>,
52    /// Controls how the `path` field is set in source and sink entries.
53    pub path_mode: PathMode,
54}
55
56#[derive(Debug)]
57struct ResolvedOrRaw {
58    storage: String,
59    uri: String,
60    resolved: bool,
61}
62
63pub fn build_common_manifest_json(
64    config_location: &ConfigLocation,
65    config: &RootConfig,
66    selected_entities: &[String],
67    profile: Option<&ProfileConfig>,
68    options: &ManifestOptions,
69) -> FloeResult<String> {
70    let resolver = StorageResolver::new(config, config_location.base.clone())?;
71    let mut manifest = build_common_manifest(
72        config_location,
73        config,
74        selected_entities,
75        &resolver,
76        profile,
77        options,
78    );
79
80    // Compute manifest_revision: SHA-256 of the canonical content, which
81    // excludes volatile fields (generated_at_ts_ms, manifest_revision itself).
82    let revision = compute_manifest_revision(&manifest)?;
83    manifest.manifest_revision = Some(revision);
84
85    Ok(serde_json::to_string_pretty(&manifest)?)
86}
87
88/// Compute a stable content hash over the manifest, excluding fields that
89/// change on every generation (timestamp) or are themselves the hash output.
90fn compute_manifest_revision(manifest: &CommonManifest) -> FloeResult<String> {
91    let mut value: serde_json::Value = serde_json::to_value(manifest)?;
92    if let Some(obj) = value.as_object_mut() {
93        obj.remove("generated_at_ts_ms");
94        obj.remove("manifest_revision");
95    }
96    let canonical = serde_json::to_string(&value)?;
97    Ok(sha256_hex(canonical.as_bytes()))
98}
99
100fn sha256_hex(bytes: &[u8]) -> String {
101    format!("sha256:{:x}", Sha256::digest(bytes))
102}
103
104fn build_common_manifest(
105    config_location: &ConfigLocation,
106    config: &RootConfig,
107    selected_entities: &[String],
108    resolver: &StorageResolver,
109    profile: Option<&ProfileConfig>,
110    options: &ManifestOptions,
111) -> CommonManifest {
112    let mut entities: Vec<_> = if selected_entities.is_empty() {
113        config.entities.iter().collect()
114    } else {
115        config
116            .entities
117            .iter()
118            .filter(|entity| selected_entities.iter().any(|name| name == &entity.name))
119            .collect()
120    };
121    entities.sort_by(|left, right| left.name.cmp(&right.name));
122
123    let report_path = config
124        .report
125        .as_ref()
126        .map(|report| report.path.as_str())
127        .unwrap_or("report");
128    let report_storage = config
129        .report
130        .as_ref()
131        .and_then(|report| report.storage.as_deref());
132    let report_base = resolve_or_raw(
133        resolver,
134        "__manifest__",
135        "report.path",
136        report_storage,
137        report_path,
138    );
139
140    let mut manifest_entities = Vec::with_capacity(entities.len());
141    for entity in entities {
142        let source = resolve_or_raw(
143            resolver,
144            &entity.name,
145            "source.path",
146            entity.source.storage.as_deref(),
147            &entity.source.path,
148        );
149        let accepted = resolve_or_raw(
150            resolver,
151            &entity.name,
152            "sink.accepted.path",
153            entity.sink.accepted.storage.as_deref(),
154            &entity.sink.accepted.path,
155        );
156        let rejected = entity.sink.rejected.as_ref().map(|target| {
157            resolve_or_raw(
158                resolver,
159                &entity.name,
160                "sink.rejected.path",
161                target.storage.as_deref(),
162                &target.path,
163            )
164        });
165        let archive = entity.sink.archive.as_ref().map(|target| {
166            resolve_or_raw(
167                resolver,
168                &entity.name,
169                "sink.archive.path",
170                target.storage.as_deref(),
171                &target.path,
172            )
173        });
174
175        let effective_domain = entity.domain.as_ref().or(options.default_domain.as_ref());
176        let (asset_key, group_name, entity_domain) = if let Some(domain) = effective_domain {
177            (
178                vec![domain.clone(), entity.name.clone()],
179                domain.clone(),
180                Some(domain.clone()),
181            )
182        } else {
183            (
184                vec!["default".to_string(), entity.name.clone()],
185                "default".to_string(),
186                None,
187            )
188        };
189
190        let mut tags = BTreeMap::new();
191        if let Some(metadata) = &entity.metadata {
192            if let Some(owner) = &metadata.owner {
193                tags.insert("owner".to_string(), owner.clone());
194            }
195            if let Some(product) = &metadata.data_product {
196                tags.insert("data_product".to_string(), product.clone());
197            }
198            if let Some(domain_tag) = &metadata.domain {
199                tags.insert("domain".to_string(), domain_tag.clone());
200            }
201        }
202        let tags = if tags.is_empty() { None } else { Some(tags) };
203
204        let schema = ManifestEntitySchema {
205            columns: entity
206                .schema
207                .columns
208                .iter()
209                .map(|c| ManifestColumnDef {
210                    name: c.name.clone(),
211                    column_type: c.column_type.clone(),
212                    source: c.source.clone(),
213                    nullable: c.nullable,
214                    unique: c.unique,
215                    width: c.width,
216                    trim: c.trim,
217                })
218                .collect(),
219            primary_key: entity.schema.primary_key.clone().unwrap_or_default(),
220            unique_keys: entity.schema.unique_keys.clone().unwrap_or_default(),
221            normalize_columns: entity
222                .schema
223                .normalize_columns
224                .as_ref()
225                .and_then(|v| serde_json::to_value(v).ok()),
226            mismatch: entity
227                .schema
228                .mismatch
229                .as_ref()
230                .and_then(|v| serde_json::to_value(v).ok()),
231            schema_evolution: entity
232                .schema
233                .schema_evolution
234                .as_ref()
235                .and_then(|v| serde_json::to_value(v).ok()),
236        };
237
238        let pii = entity
239            .pii
240            .as_ref()
241            .and_then(|v| serde_json::to_value(v).ok());
242
243        let source_path = if options.path_mode == PathMode::ResolvedUri && source.resolved {
244            resolved_uri_to_path(&source.uri)
245        } else {
246            entity.source.path.clone()
247        };
248        let accepted_path = if options.path_mode == PathMode::ResolvedUri && accepted.resolved {
249            resolved_uri_to_path(&accepted.uri)
250        } else {
251            entity.sink.accepted.path.clone()
252        };
253
254        manifest_entities.push(ManifestEntity {
255            name: entity.name.clone(),
256            domain: entity_domain,
257            group_name,
258            asset_key,
259            source_format: entity.source.format.clone(),
260            accepted_sink_uri: accepted.uri.clone(),
261            rejected_sink_uri: rejected.as_ref().map(|value| value.uri.clone()),
262            tags,
263            source: ManifestSource {
264                format: entity.source.format.clone(),
265                storage: source.storage,
266                uri: source.uri,
267                path: source_path,
268                resolved: source.resolved,
269                cast_mode: entity.source.cast_mode.clone(),
270                options: map_source_options(entity.source.options.as_ref()),
271            },
272            sinks: ManifestSinks {
273                accepted: ManifestSinkTarget {
274                    format: entity.sink.accepted.format.clone(),
275                    storage: accepted.storage,
276                    uri: accepted.uri,
277                    path: accepted_path,
278                    resolved: accepted.resolved,
279                    options: entity
280                        .sink
281                        .accepted
282                        .options
283                        .as_ref()
284                        .and_then(|v| serde_json::to_value(v).ok()),
285                    partition_by: entity.sink.accepted.partition_by.clone(),
286                    merge: entity
287                        .sink
288                        .accepted
289                        .merge
290                        .as_ref()
291                        .and_then(|v| serde_json::to_value(v).ok()),
292                    iceberg: entity
293                        .sink
294                        .accepted
295                        .iceberg
296                        .as_ref()
297                        .and_then(|v| serde_json::to_value(v).ok()),
298                    delta: entity
299                        .sink
300                        .accepted
301                        .delta
302                        .as_ref()
303                        .and_then(|v| serde_json::to_value(v).ok()),
304                },
305                rejected: rejected.map(|value| {
306                    let rej = entity.sink.rejected.as_ref();
307                    let rej_raw_path = rej.map(|t| t.path.clone()).unwrap_or_default();
308                    let rej_path = if options.path_mode == PathMode::ResolvedUri && value.resolved {
309                        resolved_uri_to_path(&value.uri)
310                    } else {
311                        rej_raw_path
312                    };
313                    ManifestSinkTarget {
314                        format: rej
315                            .map(|t| t.format.clone())
316                            .unwrap_or_else(|| "csv".to_string()),
317                        storage: value.storage,
318                        uri: value.uri,
319                        path: rej_path,
320                        resolved: value.resolved,
321                        options: rej
322                            .and_then(|t| t.options.as_ref())
323                            .and_then(|v| serde_json::to_value(v).ok()),
324                        partition_by: rej.and_then(|t| t.partition_by.clone()),
325                        merge: rej
326                            .and_then(|t| t.merge.as_ref())
327                            .and_then(|v| serde_json::to_value(v).ok()),
328                        iceberg: rej
329                            .and_then(|t| t.iceberg.as_ref())
330                            .and_then(|v| serde_json::to_value(v).ok()),
331                        delta: rej
332                            .and_then(|t| t.delta.as_ref())
333                            .and_then(|v| serde_json::to_value(v).ok()),
334                    }
335                }),
336                archive: archive.map(|value| {
337                    let arc_raw_path = entity
338                        .sink
339                        .archive
340                        .as_ref()
341                        .map(|target| target.path.clone())
342                        .unwrap_or_default();
343                    let arc_path = if options.path_mode == PathMode::ResolvedUri && value.resolved {
344                        resolved_uri_to_path(&value.uri)
345                    } else {
346                        arc_raw_path
347                    };
348                    ManifestArchiveTarget {
349                        storage: value.storage,
350                        uri: value.uri,
351                        path: arc_path,
352                        resolved: value.resolved,
353                    }
354                }),
355            },
356            runner: None,
357            policy_severity: entity.policy.severity.as_str().to_string(),
358            write_mode: entity.sink.write_mode.as_str().to_string(),
359            incremental_mode: entity.incremental_mode.as_str().to_string(),
360            schema,
361            pii,
362            state_path: entity.state.as_ref().and_then(|s| s.path.clone()),
363        });
364    }
365
366    let config_uri = canonical_config_uri(&config_location.display);
367    let config_checksum = std::fs::read(&config_location.path)
368        .ok()
369        .map(|b| sha256_hex(&b));
370
371    let profile_checksum = options
372        .profile_path
373        .as_ref()
374        .and_then(|p| std::fs::read(p).ok())
375        .map(|b| sha256_hex(&b));
376
377    let generated_at_ts_ms = if options.deterministic {
378        0
379    } else {
380        now_ts_ms()
381    };
382
383    // Serialize profile sections as opaque JSON values so they can be re-applied at run time.
384    let storages = profile
385        .and_then(|p| p.storages.as_ref())
386        .and_then(|v| serde_json::to_value(v).ok());
387    let catalogs = profile
388        .and_then(|p| p.catalogs.as_ref())
389        .and_then(|v| serde_json::to_value(v).ok());
390    let lineage = profile
391        .and_then(|p| p.lineage.as_ref())
392        .and_then(|v| serde_json::to_value(v).ok());
393
394    CommonManifest {
395        schema: "floe.manifest.v1",
396        generated_at_ts_ms,
397        floe_version: env!("CARGO_PKG_VERSION"),
398        spec_version: config.version.clone(),
399        manifest_name: options.manifest_name.clone(),
400        manifest_id: build_manifest_id(&config_uri, config_checksum.as_deref()),
401        manifest_revision: None,
402        config_uri,
403        config_checksum,
404        profile_uri: options.profile_uri.clone(),
405        profile_checksum,
406        report_base_uri: report_base.uri,
407        domains: config
408            .domains
409            .iter()
410            .map(|domain| ManifestDomain {
411                name: domain.name.clone(),
412                incoming_dir: domain
413                    .resolved_incoming_dir
414                    .clone()
415                    .unwrap_or_else(|| domain.incoming_dir.clone()),
416            })
417            .collect(),
418        execution: default_execution_contract(options),
419        runners: runners_contract(profile),
420        entities: manifest_entities,
421        storages,
422        catalogs,
423        lineage,
424    }
425}
426
427fn resolve_or_raw(
428    resolver: &StorageResolver,
429    entity_name: &str,
430    field: &str,
431    storage_name: Option<&str>,
432    raw_path: &str,
433) -> ResolvedOrRaw {
434    match resolver.resolve_path(entity_name, field, storage_name, raw_path) {
435        Ok(resolved) => ResolvedOrRaw {
436            storage: resolved.storage,
437            uri: resolved.uri,
438            resolved: true,
439        },
440        Err(_) => ResolvedOrRaw {
441            storage: storage_name.unwrap_or("local").to_string(),
442            uri: raw_path.to_string(),
443            resolved: false,
444        },
445    }
446}
447
448/// Convert a resolved URI to the path value used in manifest source/sink entries.
449/// Remote URIs (s3://, gs://, abfs://) are kept as-is.
450/// Local URIs (local:///abs/path) have the scheme stripped so the StorageResolver
451/// receives a plain filesystem path rather than an unrecognised `local://` prefix.
452fn resolved_uri_to_path(uri: &str) -> String {
453    if let Some(path) = uri.strip_prefix("local://") {
454        path.to_string()
455    } else {
456        uri.to_string()
457    }
458}
459
460fn canonical_config_uri(display: &str) -> String {
461    if display.contains("://") {
462        display.to_string()
463    } else {
464        format!("local://{}", display)
465    }
466}
467
468fn build_manifest_id(config_uri: &str, config_checksum: Option<&str>) -> String {
469    const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
470    const FNV_PRIME: u64 = 0x100000001b3;
471
472    let mut hash = FNV_OFFSET_BASIS;
473    hash = fnv1a_update(hash, config_uri.as_bytes(), FNV_PRIME);
474    hash = fnv1a_update(hash, &[0], FNV_PRIME);
475    hash = fnv1a_update(hash, config_checksum.unwrap_or("").as_bytes(), FNV_PRIME);
476
477    format!("mfv1-{hash:016x}")
478}
479
480fn fnv1a_update(mut hash: u64, bytes: &[u8], prime: u64) -> u64 {
481    for byte in bytes {
482        hash ^= u64::from(*byte);
483        hash = hash.wrapping_mul(prime);
484    }
485    hash
486}
487
488fn map_source_options(options: Option<&SourceOptions>) -> Option<serde_json::Value> {
489    let options = options?;
490    let mut map = serde_json::Map::new();
491    map.insert("header".to_string(), serde_json::json!(options.header));
492    map.insert(
493        "separator".to_string(),
494        serde_json::json!(options.separator),
495    );
496    map.insert("encoding".to_string(), serde_json::json!(options.encoding));
497    map.insert(
498        "null_values".to_string(),
499        serde_json::json!(options.null_values),
500    );
501    map.insert(
502        "recursive".to_string(),
503        serde_json::json!(options.recursive),
504    );
505    map.insert("glob".to_string(), serde_json::json!(options.glob));
506    map.insert(
507        "json_mode".to_string(),
508        serde_json::json!(options.json_mode),
509    );
510    map.insert("sheet".to_string(), serde_json::json!(options.sheet));
511    map.insert(
512        "header_row".to_string(),
513        serde_json::json!(options.header_row),
514    );
515    map.insert("data_row".to_string(), serde_json::json!(options.data_row));
516    map.insert("row_tag".to_string(), serde_json::json!(options.row_tag));
517    map.insert(
518        "namespace".to_string(),
519        serde_json::json!(options.namespace),
520    );
521    map.insert(
522        "value_tag".to_string(),
523        serde_json::json!(options.value_tag),
524    );
525    Some(serde_json::Value::Object(map))
526}
527
528fn default_execution_contract(options: &ManifestOptions) -> ManifestExecution {
529    let mut exit_codes = BTreeMap::new();
530    exit_codes.insert("0", "success_or_rejected");
531    exit_codes.insert("1", "technical_failure");
532    exit_codes.insert("2", "aborted");
533
534    const PLACEHOLDER: &str = "{manifest_uri}";
535    let base_args = [
536        "run",
537        "--manifest",
538        PLACEHOLDER,
539        "--log-format",
540        "json",
541        "--quiet",
542    ]
543    .iter()
544    .map(|&a| {
545        if a == PLACEHOLDER {
546            options
547                .manifest_uri
548                .as_deref()
549                .unwrap_or(PLACEHOLDER)
550                .to_string()
551        } else {
552            a.to_string()
553        }
554    })
555    .collect();
556
557    ManifestExecution {
558        entrypoint: "floe",
559        base_args,
560        per_entity_args: vec!["--entities".to_string(), "{entity_name}".to_string()],
561        log_format: "json",
562        result_contract: ManifestResultContract {
563            run_finished_event: true,
564            summary_uri_field: "summary_uri",
565            exit_codes,
566        },
567        defaults: ManifestExecutionDefaults {
568            env: BTreeMap::new(),
569            workdir: None,
570        },
571    }
572}
573
574fn runners_contract(profile: Option<&ProfileConfig>) -> ManifestRunners {
575    let profile_runner_type = profile
576        .and_then(|p| p.execution.as_ref())
577        .map(|e| e.runner.runner_type.as_str());
578
579    match profile_runner_type {
580        Some("kubernetes_job") => {
581            let profile_runner = profile
582                .and_then(|p| p.execution.as_ref())
583                .map(|e| &e.runner);
584            let mut definitions = BTreeMap::new();
585            definitions.insert(
586                "default",
587                ManifestRunnerDefinition {
588                    runner_type: "kubernetes_job",
589                    command: profile_runner.and_then(|r| r.command.clone()),
590                    args: profile_runner.and_then(|r| r.args.clone()),
591                    timeout_seconds: profile_runner.and_then(|r| r.timeout_seconds),
592                    ttl_seconds_after_finished: profile_runner
593                        .and_then(|r| r.ttl_seconds_after_finished),
594                    poll_interval_seconds: profile_runner.and_then(|r| r.poll_interval_seconds),
595                    secrets: profile_runner.and_then(|r| {
596                        r.secrets.as_ref().map(|secrets| {
597                            secrets
598                                .iter()
599                                .map(|s| ManifestRunnerSecret {
600                                    name: s.name.clone(),
601                                    secret_name: s.secret_name.clone(),
602                                    key: s.key.clone(),
603                                })
604                                .collect()
605                        })
606                    }),
607                    image: profile_runner.and_then(|r| r.image.clone()),
608                    namespace: profile_runner.and_then(|r| r.namespace.clone()),
609                    service_account: profile_runner.and_then(|r| r.service_account.clone()),
610                    resources: profile_runner.and_then(|r| {
611                        r.resources.as_ref().map(|res| ManifestRunnerResources {
612                            cpu: res.cpu.clone(),
613                            memory_mb: res.memory_mb,
614                        })
615                    }),
616                    env: profile_runner.and_then(|r| {
617                        r.env
618                            .as_ref()
619                            .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
620                    }),
621                    workspace_url: None,
622                    existing_cluster_id: None,
623                    config_uri: None,
624                    python_file_uri: None,
625                    job_name: None,
626                    auth: None,
627                    env_parameters: None,
628                },
629            );
630            ManifestRunners {
631                default: "default",
632                definitions,
633            }
634        }
635        Some("databricks_job") => {
636            let profile_runner = profile
637                .and_then(|p| p.execution.as_ref())
638                .map(|e| &e.runner);
639            let mut definitions = BTreeMap::new();
640            definitions.insert(
641                "default",
642                ManifestRunnerDefinition {
643                    runner_type: "databricks_job",
644                    command: profile_runner.and_then(|r| r.command.clone()),
645                    args: profile_runner.and_then(|r| r.args.clone()),
646                    timeout_seconds: profile_runner.and_then(|r| r.timeout_seconds),
647                    ttl_seconds_after_finished: None,
648                    poll_interval_seconds: profile_runner.and_then(|r| r.poll_interval_seconds),
649                    secrets: None,
650                    image: None,
651                    namespace: None,
652                    service_account: None,
653                    resources: None,
654                    env: None,
655                    workspace_url: profile_runner.and_then(|r| r.workspace_url.clone()),
656                    existing_cluster_id: profile_runner.and_then(|r| r.existing_cluster_id.clone()),
657                    config_uri: profile_runner.and_then(|r| r.config_uri.clone()),
658                    python_file_uri: profile_runner.and_then(|r| r.python_file_uri.clone()),
659                    job_name: profile_runner
660                        .and_then(|r| r.job_name.clone())
661                        .or_else(|| Some("floe-{domain}-{env}".to_string())),
662                    auth: profile_runner.and_then(|r| {
663                        r.auth.as_ref().map(|auth| ManifestRunnerAuth {
664                            service_principal_oauth_ref: auth.service_principal_oauth_ref.clone(),
665                        })
666                    }),
667                    env_parameters: profile_runner.and_then(|r| {
668                        r.env_parameters
669                            .as_ref()
670                            .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
671                    }),
672                },
673            );
674            ManifestRunners {
675                default: "default",
676                definitions,
677            }
678        }
679        // "local" or absent → local_process (backward-compatible default)
680        _ => {
681            let mut definitions = BTreeMap::new();
682            definitions.insert(
683                "local",
684                ManifestRunnerDefinition {
685                    runner_type: "local_process",
686                    command: None,
687                    args: None,
688                    timeout_seconds: None,
689                    ttl_seconds_after_finished: None,
690                    poll_interval_seconds: None,
691                    secrets: None,
692                    image: None,
693                    namespace: None,
694                    service_account: None,
695                    resources: None,
696                    env: None,
697                    workspace_url: None,
698                    existing_cluster_id: None,
699                    config_uri: None,
700                    python_file_uri: None,
701                    job_name: None,
702                    auth: None,
703                    env_parameters: None,
704                },
705            );
706            ManifestRunners {
707                default: "local",
708                definitions,
709            }
710        }
711    }
712}
713
714fn now_ts_ms() -> u64 {
715    SystemTime::now()
716        .duration_since(UNIX_EPOCH)
717        .map(|duration| duration.as_millis() as u64)
718        .unwrap_or(0)
719}