Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27    pub(crate) entity: &'a config::EntityConfig,
28    pub(crate) resolved_targets: ResolvedEntityTargets,
29    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30    pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35    pub run_id: String,
36    pub report_base_path: Option<String>,
37    pub entity_outcomes: Vec<EntityOutcome>,
38    pub summary: report::RunSummaryReport,
39    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44    pub name: String,
45    pub input_path: String,
46    pub input_format: String,
47    pub accepted_path: String,
48    pub accepted_format: String,
49    pub rejected_path: Option<String>,
50    pub rejected_format: Option<String>,
51    pub archive_path: String,
52    pub archive_storage: Option<String>,
53    pub report_file: Option<String>,
54    pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59    pub report: crate::report::RunReport,
60    pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64    config: &config::RootConfig,
65    selected: &[String],
66) -> FloeResult<()> {
67    let missing: Vec<String> = selected
68        .iter()
69        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70        .cloned()
71        .collect();
72
73    if !missing.is_empty() {
74        return Err(Box::new(ConfigError(format!(
75            "entities not found: {}",
76            missing.join(", ")
77        ))));
78    }
79    Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83    let config_base = config::ConfigBase::local_from_path(config_path);
84    run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88    config_path: &Path,
89    config_base: config::ConfigBase,
90    options: RunOptions,
91) -> FloeResult<RunOutcome> {
92    let mut runtime = DefaultRuntime::new();
93    run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_manifest_path(manifest_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
97    let mut runtime = DefaultRuntime::new();
98    run_with_manifest_runtime(manifest_path, options, &mut runtime)
99}
100
101pub(crate) fn run_with_manifest_runtime(
102    manifest_path: &Path,
103    options: RunOptions,
104    runtime: &mut dyn Runtime,
105) -> FloeResult<RunOutcome> {
106    init_thread_pool();
107    let manifest_str = manifest_path.to_string_lossy();
108    let location = config::resolve_config_location(&manifest_str)?;
109    let json = std::fs::read_to_string(&location.path)?;
110    let (config, report_base_uri) = crate::manifest::config_from_manifest_json(&json)?;
111    let config_base = location.base.clone();
112    if !options.entities.is_empty() {
113        validate_entities(&config, &options.entities)?;
114    }
115    let context = RunContext::from_config(
116        config,
117        config_base,
118        manifest_path,
119        &report_base_uri,
120        &options,
121    )?;
122    run_from_context(context, options, runtime)
123}
124
125pub fn run_with_runtime(
126    config_path: &Path,
127    config_base: config::ConfigBase,
128    options: RunOptions,
129    runtime: &mut dyn Runtime,
130) -> FloeResult<RunOutcome> {
131    init_thread_pool();
132    let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
133    let profile_vars = options
134        .profile
135        .as_ref()
136        .map(|p| {
137            crate::resolve_vars(crate::VarSources {
138                profile: &p.variables,
139                cli: &std::collections::HashMap::new(),
140                config: &raw_config_env_vars,
141            })
142        })
143        .transpose()?
144        .unwrap_or_default();
145    let validate_options = ValidateOptions {
146        entities: options.entities.clone(),
147        profile_vars: profile_vars.clone(),
148        profile_catalogs: options
149            .profile
150            .as_ref()
151            .and_then(|profile| profile.catalogs.clone()),
152        profile_storages: options
153            .profile
154            .as_ref()
155            .and_then(|profile| profile.storages.clone()),
156        profile_lineage: options
157            .profile
158            .as_ref()
159            .and_then(|profile| profile.lineage.clone()),
160    };
161    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
162    let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
163    if !options.entities.is_empty() {
164        validate_entities(&context.config, &options.entities)?;
165    }
166
167    run_from_context(context, options, runtime)
168}
169
170fn run_from_context(
171    context: RunContext,
172    options: RunOptions,
173    runtime: &mut dyn Runtime,
174) -> FloeResult<RunOutcome> {
175    let observer = default_observer();
176    let perf_enabled = perf::phase_timing_enabled();
177    let selected_entities = select_entities(&context, &options);
178    if options.full_refresh {
179        for entity in &selected_entities {
180            if matches!(
181                entity.sink.write_mode,
182                config::WriteMode::MergeScd1 | config::WriteMode::MergeScd2
183            ) {
184                return Err(Box::new(ConfigError(format!(
185                    "entity '{}': --full-refresh is not supported with write_mode '{}'",
186                    entity.name,
187                    entity.sink.write_mode.as_str()
188                ))));
189            }
190        }
191    }
192    let resolution_mode = if options.dry_run {
193        io::storage::inputs::ResolveInputsMode::ListOnly
194    } else {
195        io::storage::inputs::ResolveInputsMode::Download
196    };
197    if !options.dry_run {
198        observer.on_event(RunEvent::RunStarted {
199            run_id: context.run_id.clone(),
200            config: context.config_path.display().to_string(),
201            report_base: context.report_base_path.clone(),
202            ts_ms: event_time_ms(),
203        });
204        crate::run::events::mark_run_started();
205    }
206    let resolve_start = perf_enabled.then(Instant::now);
207    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
208    if let Some(start) = resolve_start {
209        perf::emit_perf_log(
210            observer,
211            &context.run_id,
212            None,
213            "perf_run_phase_timings",
214            json!({
215                "phase": "resolve_inputs",
216                "elapsed_ms": start.elapsed().as_millis() as u64,
217                "entity_count": selected_entities.len(),
218                "mode": match resolution_mode {
219                    io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
220                    io::storage::inputs::ResolveInputsMode::Download => "download",
221                },
222            }),
223        );
224    }
225    if options.dry_run {
226        return create_dry_run_outcome(&context, plans);
227    }
228
229    let mut entity_outcomes = Vec::new();
230    let mut abort_run = false;
231
232    for plan in plans {
233        let entity_name = plan.entity.name.clone();
234        observer.on_event(RunEvent::EntityStarted {
235            run_id: context.run_id.clone(),
236            name: entity_name.clone(),
237            ts_ms: event_time_ms(),
238        });
239        let entity_result = run_entity(&context, runtime, observer, plan);
240        if let Err(err) = entity_result {
241            observer.on_event(RunEvent::EntityFinished {
242                run_id: context.run_id.clone(),
243                name: entity_name,
244                status: "failed".to_string(),
245                files: 0,
246                files_skipped: 0,
247                rows: 0,
248                accepted: 0,
249                rejected: 0,
250                warnings: 0,
251                errors: 0,
252                ts_ms: event_time_ms(),
253            });
254            return Err(err);
255        }
256        let EntityRunResult {
257            outcome,
258            abort_run: aborted,
259        } = entity_result.unwrap();
260        let report = &outcome.report;
261        let (mut status, _) = report::compute_run_outcome(
262            &report
263                .files
264                .iter()
265                .map(|file| file.status)
266                .collect::<Vec<_>>(),
267        );
268        status = upgrade_status_for_warnings(status, report.results.warnings_total);
269        observer.on_event(RunEvent::EntityFinished {
270            run_id: context.run_id.clone(),
271            name: report.entity.name.clone(),
272            status: run_status_str(status).to_string(),
273            files: report.results.files_total,
274            files_skipped: report.results.files_skipped,
275            rows: report.results.rows_total,
276            accepted: report.results.accepted_total,
277            rejected: report.results.rejected_total,
278            warnings: report.results.warnings_total,
279            errors: report.results.errors_total,
280            ts_ms: event_time_ms(),
281        });
282        entity_outcomes.push(outcome);
283        abort_run = abort_run || aborted;
284        if abort_run {
285            break;
286        }
287    }
288    let summary = build_run_summary(&context, &entity_outcomes);
289    let written_summary_uri = if let Some(report_target) = &context.report_target {
290        let summary_write_start = perf_enabled.then(Instant::now);
291        let uri = write_summary_report(
292            report_target,
293            &context.run_id,
294            &summary,
295            runtime.storage(),
296            &context.storage_resolver,
297        )?;
298        if let Some(start) = summary_write_start {
299            perf::emit_perf_log(
300                observer,
301                &context.run_id,
302                None,
303                "perf_run_phase_timings",
304                json!({
305                    "phase": "write_summary_report",
306                    "elapsed_ms": start.elapsed().as_millis() as u64,
307                    "entity_count": entity_outcomes.len(),
308                }),
309            );
310        }
311        Some(uri)
312    } else {
313        None
314    };
315    let entity_report_uris: std::collections::HashMap<String, String> = entity_outcomes
316        .iter()
317        .filter_map(|outcome| {
318            let uri = context.report_target.as_ref().map(|t| {
319                t.join_relative(&report::ReportWriter::report_relative_path(
320                    &context.run_id,
321                    &outcome.report.entity.name,
322                ))
323            })?;
324            Some((outcome.report.entity.name.clone(), uri))
325        })
326        .collect();
327    observer.on_event(RunEvent::RunFinished {
328        run_id: context.run_id.clone(),
329        status: run_status_str(summary.run.status).to_string(),
330        exit_code: summary.run.exit_code,
331        files: summary.results.files_total,
332        files_skipped: summary.results.files_skipped,
333        rows: summary.results.rows_total,
334        accepted: summary.results.accepted_total,
335        rejected: summary.results.rejected_total,
336        warnings: summary.results.warnings_total,
337        errors: summary.results.errors_total,
338        summary_uri: written_summary_uri,
339        report_base: context.report_base_path.clone(),
340        entity_report_uris,
341        ts_ms: event_time_ms(),
342    });
343
344    Ok(RunOutcome {
345        run_id: context.run_id.clone(),
346        report_base_path: context.report_base_path.clone(),
347        entity_outcomes,
348        summary,
349        dry_run_previews: None,
350    })
351}
352
353fn init_thread_pool() {
354    static INIT: Once = Once::new();
355    INIT.call_once(|| {
356        if std::env::var("RAYON_NUM_THREADS").is_ok() {
357            return;
358        }
359        let cap = std::env::var("FLOE_MAX_THREADS")
360            .ok()
361            .and_then(|value| value.parse::<usize>().ok())
362            .unwrap_or(4);
363        let available = std::thread::available_parallelism()
364            .map(|value| value.get())
365            .unwrap_or(1);
366        let threads = available.min(cap).max(1);
367        let _ = rayon::ThreadPoolBuilder::new()
368            .num_threads(threads)
369            .build_global();
370    });
371}
372
373fn select_entities<'a>(
374    context: &'a RunContext,
375    options: &RunOptions,
376) -> Vec<&'a config::EntityConfig> {
377    if options.entities.is_empty() {
378        context.config.entities.iter().collect()
379    } else {
380        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
381        context
382            .config
383            .entities
384            .iter()
385            .filter(|entity| selected.contains(entity.name.as_str()))
386            .collect()
387    }
388}
389
390fn resolve_entity_plans<'a>(
391    context: &'a RunContext,
392    runtime: &mut dyn Runtime,
393    entities: &[&'a config::EntityConfig],
394    resolution_mode: io::storage::inputs::ResolveInputsMode,
395) -> FloeResult<Vec<EntityRunPlan<'a>>> {
396    let mut plans = Vec::with_capacity(entities.len());
397    for entity in entities {
398        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
399        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
400        let needs_temp = matches!(
401            resolution_mode,
402            io::storage::inputs::ResolveInputsMode::Download
403        ) && (resolved_targets.source.is_remote()
404            || resolved_targets.accepted.is_remote()
405            || resolved_targets
406                .rejected
407                .as_ref()
408                .is_some_and(io::storage::Target::is_remote));
409        let temp_dir = if needs_temp {
410            Some(
411                tempfile::TempDir::new()
412                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
413            )
414        } else {
415            None
416        };
417        let storage_client = Some(runtime.storage().client_for(
418            &context.storage_resolver,
419            resolved_targets.source.storage(),
420            entity,
421        )? as &dyn io::storage::StorageClient);
422        let resolved_inputs = io::storage::ops::resolve_inputs(
423            &context.config_dir,
424            entity,
425            input_adapter,
426            &resolved_targets.source,
427            resolution_mode,
428            temp_dir.as_ref().map(|dir| dir.path()),
429            storage_client,
430        )?;
431        plans.push(EntityRunPlan {
432            entity,
433            resolved_targets,
434            resolved_inputs,
435            temp_dir,
436        });
437    }
438    Ok(plans)
439}
440
441fn build_run_summary(
442    context: &RunContext,
443    entity_outcomes: &[EntityOutcome],
444) -> report::RunSummaryReport {
445    let mut totals = report::ResultsTotals {
446        files_total: 0,
447        files_skipped: 0,
448        rows_total: 0,
449        accepted_total: 0,
450        rejected_total: 0,
451        warnings_total: 0,
452        errors_total: 0,
453    };
454    let mut statuses = Vec::new();
455    let mut entities = Vec::with_capacity(entity_outcomes.len());
456
457    for outcome in entity_outcomes {
458        let report = &outcome.report;
459        totals.files_total += report.results.files_total;
460        totals.files_skipped += report.results.files_skipped;
461        totals.rows_total += report.results.rows_total;
462        totals.accepted_total += report.results.accepted_total;
463        totals.rejected_total += report.results.rejected_total;
464        totals.warnings_total += report.results.warnings_total;
465        totals.errors_total += report.results.errors_total;
466
467        let file_statuses = report
468            .files
469            .iter()
470            .map(|file| file.status)
471            .collect::<Vec<_>>();
472        let (mut status, _) = report::compute_run_outcome(&file_statuses);
473        status = upgrade_status_for_warnings(status, report.results.warnings_total);
474        statuses.extend(file_statuses);
475
476        let report_file = context
477            .report_target
478            .as_ref()
479            .map(|target| {
480                target.join_relative(&report::ReportWriter::report_relative_path(
481                    &context.run_id,
482                    &report.entity.name,
483                ))
484            })
485            .unwrap_or_else(|| "disabled".to_string());
486        entities.push(report::EntitySummary {
487            name: report.entity.name.clone(),
488            status,
489            results: report.results.clone(),
490            report_file,
491        });
492    }
493
494    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
495    status = upgrade_status_for_warnings(status, totals.warnings_total);
496
497    let finished_at = report::now_rfc3339();
498    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
499    let report_base_path = context
500        .report_base_path
501        .clone()
502        .unwrap_or_else(|| "disabled".to_string());
503    let report_file = context
504        .report_target
505        .as_ref()
506        .map(|target| {
507            target.join_relative(&report::ReportWriter::summary_relative_path(
508                &context.run_id,
509            ))
510        })
511        .unwrap_or_else(|| "disabled".to_string());
512
513    report::RunSummaryReport {
514        spec_version: context.config.version.clone(),
515        tool: build_tool_info(),
516        run: report::RunInfo {
517            run_id: context.run_id.clone(),
518            started_at: context.started_at.clone(),
519            finished_at,
520            duration_ms,
521            status,
522            exit_code,
523        },
524        config: build_config_echo(context),
525        report: build_report_echo(report_base_path, report_file),
526        results: totals,
527        entities,
528    }
529}
530
531fn create_dry_run_outcome(
532    context: &RunContext,
533    plans: Vec<EntityRunPlan<'_>>,
534) -> FloeResult<RunOutcome> {
535    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
536
537    for plan in plans {
538        let entity = plan.entity;
539        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
540        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
541        let (archive_path, archive_storage) = entity
542            .sink
543            .archive
544            .as_ref()
545            .map(|a| (a.path.clone(), a.storage.clone()))
546            .unwrap_or_else(|| (String::new(), None));
547
548        let report_file = context.report_target.as_ref().map(|target| {
549            target.join_relative(&report::ReportWriter::report_relative_path(
550                &context.run_id,
551                &entity.name,
552            ))
553        });
554
555        previews.push(DryRunEntityPreview {
556            name: entity.name.clone(),
557            input_path: entity.source.path.clone(),
558            input_format: entity.source.format.clone(),
559            accepted_path: entity.sink.accepted.path.clone(),
560            accepted_format: entity.sink.accepted.format.clone(),
561            rejected_path,
562            rejected_format,
563            archive_path,
564            archive_storage,
565            report_file,
566            scanned_files: plan.resolved_inputs.listed,
567        });
568    }
569
570    Ok(RunOutcome {
571        run_id: context.run_id.clone(),
572        report_base_path: context.report_base_path.clone(),
573        entity_outcomes: Vec::new(),
574        summary: report::RunSummaryReport {
575            spec_version: context.config.version.clone(),
576            tool: build_tool_info(),
577            run: report::RunInfo {
578                run_id: context.run_id.clone(),
579                started_at: context.started_at.clone(),
580                finished_at: report::now_rfc3339(),
581                duration_ms: 0,
582                status: report::RunStatus::Success,
583                exit_code: 0,
584            },
585            config: build_config_echo(context),
586            report: build_report_echo(
587                context
588                    .report_base_path
589                    .clone()
590                    .unwrap_or_else(|| "disabled".to_string()),
591                "disabled (dry-run)".to_string(),
592            ),
593            results: report::ResultsTotals {
594                files_total: 0,
595                files_skipped: 0,
596                rows_total: 0,
597                accepted_total: 0,
598                rejected_total: 0,
599                warnings_total: 0,
600                errors_total: 0,
601            },
602            entities: Vec::new(),
603        },
604        dry_run_previews: Some(previews),
605    })
606}
607
608fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
609    if status == report::RunStatus::Success && warnings > 0 {
610        report::RunStatus::SuccessWithWarnings
611    } else {
612        status
613    }
614}
615
616fn build_tool_info() -> report::ToolInfo {
617    report::ToolInfo {
618        name: "floe".to_string(),
619        version: env!("CARGO_PKG_VERSION").to_string(),
620        git: None,
621    }
622}
623
624fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
625    report::ConfigEcho {
626        path: context.config_path.display().to_string(),
627        version: context.config.version.clone(),
628        metadata: context.config.metadata.as_ref().map(project_metadata_json),
629    }
630}
631
632fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
633    report::ReportEcho { path, report_file }
634}
635
636fn run_status_str(status: report::RunStatus) -> &'static str {
637    match status {
638        report::RunStatus::Success => "success",
639        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
640        report::RunStatus::Rejected => "rejected",
641        report::RunStatus::Aborted => "aborted",
642        report::RunStatus::Failed => "failed",
643    }
644}