Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27    pub(crate) entity: &'a config::EntityConfig,
28    pub(crate) resolved_targets: ResolvedEntityTargets,
29    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30    pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35    pub run_id: String,
36    pub report_base_path: Option<String>,
37    pub entity_outcomes: Vec<EntityOutcome>,
38    pub summary: report::RunSummaryReport,
39    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44    pub name: String,
45    pub input_path: String,
46    pub input_format: String,
47    pub accepted_path: String,
48    pub accepted_format: String,
49    pub rejected_path: Option<String>,
50    pub rejected_format: Option<String>,
51    pub archive_path: String,
52    pub archive_storage: Option<String>,
53    pub report_file: Option<String>,
54    pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59    pub report: crate::report::RunReport,
60    pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64    config: &config::RootConfig,
65    selected: &[String],
66) -> FloeResult<()> {
67    let missing: Vec<String> = selected
68        .iter()
69        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70        .cloned()
71        .collect();
72
73    if !missing.is_empty() {
74        return Err(Box::new(ConfigError(format!(
75            "entities not found: {}",
76            missing.join(", ")
77        ))));
78    }
79    Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83    let config_base = config::ConfigBase::local_from_path(config_path);
84    run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88    config_path: &Path,
89    config_base: config::ConfigBase,
90    options: RunOptions,
91) -> FloeResult<RunOutcome> {
92    let mut runtime = DefaultRuntime::new();
93    run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_manifest_path(manifest_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
97    let mut runtime = DefaultRuntime::new();
98    run_with_manifest_runtime(manifest_path, options, &mut runtime)
99}
100
101pub(crate) fn run_with_manifest_runtime(
102    manifest_path: &Path,
103    options: RunOptions,
104    runtime: &mut dyn Runtime,
105) -> FloeResult<RunOutcome> {
106    init_thread_pool();
107    let json = std::fs::read_to_string(manifest_path)?;
108    let (config, report_base_uri) = crate::manifest::config_from_manifest_json(&json)?;
109    let config_base = config::ConfigBase::local_from_path(manifest_path);
110    if !options.entities.is_empty() {
111        validate_entities(&config, &options.entities)?;
112    }
113    let context = RunContext::from_config(
114        config,
115        config_base,
116        manifest_path,
117        &report_base_uri,
118        &options,
119    )?;
120    run_from_context(context, options, runtime)
121}
122
123pub fn run_with_runtime(
124    config_path: &Path,
125    config_base: config::ConfigBase,
126    options: RunOptions,
127    runtime: &mut dyn Runtime,
128) -> FloeResult<RunOutcome> {
129    init_thread_pool();
130    let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
131    let profile_vars = options
132        .profile
133        .as_ref()
134        .map(|p| {
135            crate::resolve_vars(crate::VarSources {
136                profile: &p.variables,
137                cli: &std::collections::HashMap::new(),
138                config: &raw_config_env_vars,
139            })
140        })
141        .transpose()?
142        .unwrap_or_default();
143    let validate_options = ValidateOptions {
144        entities: options.entities.clone(),
145        profile_vars: profile_vars.clone(),
146        profile_catalogs: options
147            .profile
148            .as_ref()
149            .and_then(|profile| profile.catalogs.clone()),
150        profile_storages: options
151            .profile
152            .as_ref()
153            .and_then(|profile| profile.storages.clone()),
154        profile_lineage: options
155            .profile
156            .as_ref()
157            .and_then(|profile| profile.lineage.clone()),
158    };
159    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
160    let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
161    if !options.entities.is_empty() {
162        validate_entities(&context.config, &options.entities)?;
163    }
164
165    run_from_context(context, options, runtime)
166}
167
168fn run_from_context(
169    context: RunContext,
170    options: RunOptions,
171    runtime: &mut dyn Runtime,
172) -> FloeResult<RunOutcome> {
173    let observer = default_observer();
174    let perf_enabled = perf::phase_timing_enabled();
175    let selected_entities = select_entities(&context, &options);
176    let resolution_mode = if options.dry_run {
177        io::storage::inputs::ResolveInputsMode::ListOnly
178    } else {
179        io::storage::inputs::ResolveInputsMode::Download
180    };
181    if !options.dry_run {
182        observer.on_event(RunEvent::RunStarted {
183            run_id: context.run_id.clone(),
184            config: context.config_path.display().to_string(),
185            report_base: context.report_base_path.clone(),
186            ts_ms: event_time_ms(),
187        });
188        crate::run::events::mark_run_started();
189    }
190    let resolve_start = perf_enabled.then(Instant::now);
191    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
192    if let Some(start) = resolve_start {
193        perf::emit_perf_log(
194            observer,
195            &context.run_id,
196            None,
197            "perf_run_phase_timings",
198            json!({
199                "phase": "resolve_inputs",
200                "elapsed_ms": start.elapsed().as_millis() as u64,
201                "entity_count": selected_entities.len(),
202                "mode": match resolution_mode {
203                    io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
204                    io::storage::inputs::ResolveInputsMode::Download => "download",
205                },
206            }),
207        );
208    }
209    if options.dry_run {
210        return create_dry_run_outcome(&context, plans);
211    }
212
213    let mut entity_outcomes = Vec::new();
214    let mut abort_run = false;
215
216    for plan in plans {
217        let entity_name = plan.entity.name.clone();
218        observer.on_event(RunEvent::EntityStarted {
219            run_id: context.run_id.clone(),
220            name: entity_name.clone(),
221            ts_ms: event_time_ms(),
222        });
223        let entity_result = run_entity(&context, runtime, observer, plan);
224        if let Err(err) = entity_result {
225            observer.on_event(RunEvent::EntityFinished {
226                run_id: context.run_id.clone(),
227                name: entity_name,
228                status: "failed".to_string(),
229                files: 0,
230                files_skipped: 0,
231                rows: 0,
232                accepted: 0,
233                rejected: 0,
234                warnings: 0,
235                errors: 0,
236                ts_ms: event_time_ms(),
237            });
238            return Err(err);
239        }
240        let EntityRunResult {
241            outcome,
242            abort_run: aborted,
243        } = entity_result.unwrap();
244        let report = &outcome.report;
245        let (mut status, _) = report::compute_run_outcome(
246            &report
247                .files
248                .iter()
249                .map(|file| file.status)
250                .collect::<Vec<_>>(),
251        );
252        status = upgrade_status_for_warnings(status, report.results.warnings_total);
253        observer.on_event(RunEvent::EntityFinished {
254            run_id: context.run_id.clone(),
255            name: report.entity.name.clone(),
256            status: run_status_str(status).to_string(),
257            files: report.results.files_total,
258            files_skipped: report.results.files_skipped,
259            rows: report.results.rows_total,
260            accepted: report.results.accepted_total,
261            rejected: report.results.rejected_total,
262            warnings: report.results.warnings_total,
263            errors: report.results.errors_total,
264            ts_ms: event_time_ms(),
265        });
266        entity_outcomes.push(outcome);
267        abort_run = abort_run || aborted;
268        if abort_run {
269            break;
270        }
271    }
272    let summary = build_run_summary(&context, &entity_outcomes);
273    if let Some(report_target) = &context.report_target {
274        let summary_write_start = perf_enabled.then(Instant::now);
275        write_summary_report(
276            report_target,
277            &context.run_id,
278            &summary,
279            runtime.storage(),
280            &context.storage_resolver,
281        )?;
282        if let Some(start) = summary_write_start {
283            perf::emit_perf_log(
284                observer,
285                &context.run_id,
286                None,
287                "perf_run_phase_timings",
288                json!({
289                    "phase": "write_summary_report",
290                    "elapsed_ms": start.elapsed().as_millis() as u64,
291                    "entity_count": entity_outcomes.len(),
292                }),
293            );
294        }
295    }
296    observer.on_event(RunEvent::RunFinished {
297        run_id: context.run_id.clone(),
298        status: run_status_str(summary.run.status).to_string(),
299        exit_code: summary.run.exit_code,
300        files: summary.results.files_total,
301        files_skipped: summary.results.files_skipped,
302        rows: summary.results.rows_total,
303        accepted: summary.results.accepted_total,
304        rejected: summary.results.rejected_total,
305        warnings: summary.results.warnings_total,
306        errors: summary.results.errors_total,
307        summary_uri: context.report_target.as_ref().map(|target| {
308            target.join_relative(&report::ReportWriter::summary_relative_path(
309                &context.run_id,
310            ))
311        }),
312        ts_ms: event_time_ms(),
313    });
314
315    Ok(RunOutcome {
316        run_id: context.run_id.clone(),
317        report_base_path: context.report_base_path.clone(),
318        entity_outcomes,
319        summary,
320        dry_run_previews: None,
321    })
322}
323
324fn init_thread_pool() {
325    static INIT: Once = Once::new();
326    INIT.call_once(|| {
327        if std::env::var("RAYON_NUM_THREADS").is_ok() {
328            return;
329        }
330        let cap = std::env::var("FLOE_MAX_THREADS")
331            .ok()
332            .and_then(|value| value.parse::<usize>().ok())
333            .unwrap_or(4);
334        let available = std::thread::available_parallelism()
335            .map(|value| value.get())
336            .unwrap_or(1);
337        let threads = available.min(cap).max(1);
338        let _ = rayon::ThreadPoolBuilder::new()
339            .num_threads(threads)
340            .build_global();
341    });
342}
343
344fn select_entities<'a>(
345    context: &'a RunContext,
346    options: &RunOptions,
347) -> Vec<&'a config::EntityConfig> {
348    if options.entities.is_empty() {
349        context.config.entities.iter().collect()
350    } else {
351        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
352        context
353            .config
354            .entities
355            .iter()
356            .filter(|entity| selected.contains(entity.name.as_str()))
357            .collect()
358    }
359}
360
361fn resolve_entity_plans<'a>(
362    context: &'a RunContext,
363    runtime: &mut dyn Runtime,
364    entities: &[&'a config::EntityConfig],
365    resolution_mode: io::storage::inputs::ResolveInputsMode,
366) -> FloeResult<Vec<EntityRunPlan<'a>>> {
367    let mut plans = Vec::with_capacity(entities.len());
368    for entity in entities {
369        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
370        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
371        let needs_temp = matches!(
372            resolution_mode,
373            io::storage::inputs::ResolveInputsMode::Download
374        ) && (resolved_targets.source.is_remote()
375            || resolved_targets.accepted.is_remote()
376            || resolved_targets
377                .rejected
378                .as_ref()
379                .is_some_and(io::storage::Target::is_remote));
380        let temp_dir = if needs_temp {
381            Some(
382                tempfile::TempDir::new()
383                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
384            )
385        } else {
386            None
387        };
388        let storage_client = Some(runtime.storage().client_for(
389            &context.storage_resolver,
390            resolved_targets.source.storage(),
391            entity,
392        )? as &dyn io::storage::StorageClient);
393        let resolved_inputs = io::storage::ops::resolve_inputs(
394            &context.config_dir,
395            entity,
396            input_adapter,
397            &resolved_targets.source,
398            resolution_mode,
399            temp_dir.as_ref().map(|dir| dir.path()),
400            storage_client,
401        )?;
402        plans.push(EntityRunPlan {
403            entity,
404            resolved_targets,
405            resolved_inputs,
406            temp_dir,
407        });
408    }
409    Ok(plans)
410}
411
412fn build_run_summary(
413    context: &RunContext,
414    entity_outcomes: &[EntityOutcome],
415) -> report::RunSummaryReport {
416    let mut totals = report::ResultsTotals {
417        files_total: 0,
418        files_skipped: 0,
419        rows_total: 0,
420        accepted_total: 0,
421        rejected_total: 0,
422        warnings_total: 0,
423        errors_total: 0,
424    };
425    let mut statuses = Vec::new();
426    let mut entities = Vec::with_capacity(entity_outcomes.len());
427
428    for outcome in entity_outcomes {
429        let report = &outcome.report;
430        totals.files_total += report.results.files_total;
431        totals.files_skipped += report.results.files_skipped;
432        totals.rows_total += report.results.rows_total;
433        totals.accepted_total += report.results.accepted_total;
434        totals.rejected_total += report.results.rejected_total;
435        totals.warnings_total += report.results.warnings_total;
436        totals.errors_total += report.results.errors_total;
437
438        let file_statuses = report
439            .files
440            .iter()
441            .map(|file| file.status)
442            .collect::<Vec<_>>();
443        let (mut status, _) = report::compute_run_outcome(&file_statuses);
444        status = upgrade_status_for_warnings(status, report.results.warnings_total);
445        statuses.extend(file_statuses);
446
447        let report_file = context
448            .report_target
449            .as_ref()
450            .map(|target| {
451                target.join_relative(&report::ReportWriter::report_relative_path(
452                    &context.run_id,
453                    &report.entity.name,
454                ))
455            })
456            .unwrap_or_else(|| "disabled".to_string());
457        entities.push(report::EntitySummary {
458            name: report.entity.name.clone(),
459            status,
460            results: report.results.clone(),
461            report_file,
462        });
463    }
464
465    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
466    status = upgrade_status_for_warnings(status, totals.warnings_total);
467
468    let finished_at = report::now_rfc3339();
469    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
470    let report_base_path = context
471        .report_base_path
472        .clone()
473        .unwrap_or_else(|| "disabled".to_string());
474    let report_file = context
475        .report_target
476        .as_ref()
477        .map(|target| {
478            target.join_relative(&report::ReportWriter::summary_relative_path(
479                &context.run_id,
480            ))
481        })
482        .unwrap_or_else(|| "disabled".to_string());
483
484    report::RunSummaryReport {
485        spec_version: context.config.version.clone(),
486        tool: build_tool_info(),
487        run: report::RunInfo {
488            run_id: context.run_id.clone(),
489            started_at: context.started_at.clone(),
490            finished_at,
491            duration_ms,
492            status,
493            exit_code,
494        },
495        config: build_config_echo(context),
496        report: build_report_echo(report_base_path, report_file),
497        results: totals,
498        entities,
499    }
500}
501
502fn create_dry_run_outcome(
503    context: &RunContext,
504    plans: Vec<EntityRunPlan<'_>>,
505) -> FloeResult<RunOutcome> {
506    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
507
508    for plan in plans {
509        let entity = plan.entity;
510        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
511        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
512        let (archive_path, archive_storage) = entity
513            .sink
514            .archive
515            .as_ref()
516            .map(|a| (a.path.clone(), a.storage.clone()))
517            .unwrap_or_else(|| (String::new(), None));
518
519        let report_file = context.report_target.as_ref().map(|target| {
520            target.join_relative(&report::ReportWriter::report_relative_path(
521                &context.run_id,
522                &entity.name,
523            ))
524        });
525
526        previews.push(DryRunEntityPreview {
527            name: entity.name.clone(),
528            input_path: entity.source.path.clone(),
529            input_format: entity.source.format.clone(),
530            accepted_path: entity.sink.accepted.path.clone(),
531            accepted_format: entity.sink.accepted.format.clone(),
532            rejected_path,
533            rejected_format,
534            archive_path,
535            archive_storage,
536            report_file,
537            scanned_files: plan.resolved_inputs.listed,
538        });
539    }
540
541    Ok(RunOutcome {
542        run_id: context.run_id.clone(),
543        report_base_path: context.report_base_path.clone(),
544        entity_outcomes: Vec::new(),
545        summary: report::RunSummaryReport {
546            spec_version: context.config.version.clone(),
547            tool: build_tool_info(),
548            run: report::RunInfo {
549                run_id: context.run_id.clone(),
550                started_at: context.started_at.clone(),
551                finished_at: report::now_rfc3339(),
552                duration_ms: 0,
553                status: report::RunStatus::Success,
554                exit_code: 0,
555            },
556            config: build_config_echo(context),
557            report: build_report_echo(
558                context
559                    .report_base_path
560                    .clone()
561                    .unwrap_or_else(|| "disabled".to_string()),
562                "disabled (dry-run)".to_string(),
563            ),
564            results: report::ResultsTotals {
565                files_total: 0,
566                files_skipped: 0,
567                rows_total: 0,
568                accepted_total: 0,
569                rejected_total: 0,
570                warnings_total: 0,
571                errors_total: 0,
572            },
573            entities: Vec::new(),
574        },
575        dry_run_previews: Some(previews),
576    })
577}
578
579fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
580    if status == report::RunStatus::Success && warnings > 0 {
581        report::RunStatus::SuccessWithWarnings
582    } else {
583        status
584    }
585}
586
587fn build_tool_info() -> report::ToolInfo {
588    report::ToolInfo {
589        name: "floe".to_string(),
590        version: env!("CARGO_PKG_VERSION").to_string(),
591        git: None,
592    }
593}
594
595fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
596    report::ConfigEcho {
597        path: context.config_path.display().to_string(),
598        version: context.config.version.clone(),
599        metadata: context.config.metadata.as_ref().map(project_metadata_json),
600    }
601}
602
603fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
604    report::ReportEcho { path, report_file }
605}
606
607fn run_status_str(status: report::RunStatus) -> &'static str {
608    match status {
609        report::RunStatus::Success => "success",
610        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
611        report::RunStatus::Rejected => "rejected",
612        report::RunStatus::Aborted => "aborted",
613        report::RunStatus::Failed => "failed",
614    }
615}