Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27    pub(crate) entity: &'a config::EntityConfig,
28    pub(crate) resolved_targets: ResolvedEntityTargets,
29    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30    pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35    pub run_id: String,
36    pub report_base_path: Option<String>,
37    pub entity_outcomes: Vec<EntityOutcome>,
38    pub summary: report::RunSummaryReport,
39    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44    pub name: String,
45    pub input_path: String,
46    pub input_format: String,
47    pub accepted_path: String,
48    pub accepted_format: String,
49    pub rejected_path: Option<String>,
50    pub rejected_format: Option<String>,
51    pub archive_path: String,
52    pub archive_storage: Option<String>,
53    pub report_file: Option<String>,
54    pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59    pub report: crate::report::RunReport,
60    pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64    config: &config::RootConfig,
65    selected: &[String],
66) -> FloeResult<()> {
67    let missing: Vec<String> = selected
68        .iter()
69        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70        .cloned()
71        .collect();
72
73    if !missing.is_empty() {
74        return Err(Box::new(ConfigError(format!(
75            "entities not found: {}",
76            missing.join(", ")
77        ))));
78    }
79    Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83    let config_base = config::ConfigBase::local_from_path(config_path);
84    run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88    config_path: &Path,
89    config_base: config::ConfigBase,
90    options: RunOptions,
91) -> FloeResult<RunOutcome> {
92    let mut runtime = DefaultRuntime::new();
93    run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97    config_path: &Path,
98    config_base: config::ConfigBase,
99    options: RunOptions,
100    runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102    init_thread_pool();
103    let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
104    let profile_vars = options
105        .profile
106        .as_ref()
107        .map(|p| {
108            crate::resolve_vars(crate::VarSources {
109                profile: &p.variables,
110                cli: &std::collections::HashMap::new(),
111                config: &raw_config_env_vars,
112            })
113        })
114        .transpose()?
115        .unwrap_or_default();
116    let validate_options = ValidateOptions {
117        entities: options.entities.clone(),
118        profile_vars: profile_vars.clone(),
119        profile_catalogs: options
120            .profile
121            .as_ref()
122            .and_then(|profile| profile.catalogs.clone()),
123        profile_storages: options
124            .profile
125            .as_ref()
126            .and_then(|profile| profile.storages.clone()),
127        profile_lineage: options
128            .profile
129            .as_ref()
130            .and_then(|profile| profile.lineage.clone()),
131    };
132    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
133    let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
134    if !options.entities.is_empty() {
135        validate_entities(&context.config, &options.entities)?;
136    }
137
138    let observer = default_observer();
139    let perf_enabled = perf::phase_timing_enabled();
140    let selected_entities = select_entities(&context, &options);
141    let resolution_mode = if options.dry_run {
142        io::storage::inputs::ResolveInputsMode::ListOnly
143    } else {
144        io::storage::inputs::ResolveInputsMode::Download
145    };
146    if !options.dry_run {
147        observer.on_event(RunEvent::RunStarted {
148            run_id: context.run_id.clone(),
149            config: context.config_path.display().to_string(),
150            report_base: context.report_base_path.clone(),
151            ts_ms: event_time_ms(),
152        });
153        crate::run::events::mark_run_started();
154    }
155    let resolve_start = perf_enabled.then(Instant::now);
156    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
157    if let Some(start) = resolve_start {
158        perf::emit_perf_log(
159            observer,
160            &context.run_id,
161            None,
162            "perf_run_phase_timings",
163            json!({
164                "phase": "resolve_inputs",
165                "elapsed_ms": start.elapsed().as_millis() as u64,
166                "entity_count": selected_entities.len(),
167                "mode": match resolution_mode {
168                    io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
169                    io::storage::inputs::ResolveInputsMode::Download => "download",
170                },
171            }),
172        );
173    }
174    if options.dry_run {
175        return create_dry_run_outcome(&context, plans);
176    }
177
178    let mut entity_outcomes = Vec::new();
179    let mut abort_run = false;
180
181    for plan in plans {
182        let entity_name = plan.entity.name.clone();
183        observer.on_event(RunEvent::EntityStarted {
184            run_id: context.run_id.clone(),
185            name: entity_name.clone(),
186            ts_ms: event_time_ms(),
187        });
188        let entity_result = run_entity(&context, runtime, observer, plan);
189        if let Err(err) = entity_result {
190            observer.on_event(RunEvent::EntityFinished {
191                run_id: context.run_id.clone(),
192                name: entity_name,
193                status: "failed".to_string(),
194                files: 0,
195                rows: 0,
196                accepted: 0,
197                rejected: 0,
198                warnings: 0,
199                errors: 0,
200                ts_ms: event_time_ms(),
201            });
202            return Err(err);
203        }
204        let EntityRunResult {
205            outcome,
206            abort_run: aborted,
207        } = entity_result.unwrap();
208        let report = &outcome.report;
209        let (mut status, _) = report::compute_run_outcome(
210            &report
211                .files
212                .iter()
213                .map(|file| file.status)
214                .collect::<Vec<_>>(),
215        );
216        status = upgrade_status_for_warnings(status, report.results.warnings_total);
217        observer.on_event(RunEvent::EntityFinished {
218            run_id: context.run_id.clone(),
219            name: report.entity.name.clone(),
220            status: run_status_str(status).to_string(),
221            files: report.results.files_total,
222            rows: report.results.rows_total,
223            accepted: report.results.accepted_total,
224            rejected: report.results.rejected_total,
225            warnings: report.results.warnings_total,
226            errors: report.results.errors_total,
227            ts_ms: event_time_ms(),
228        });
229        entity_outcomes.push(outcome);
230        abort_run = abort_run || aborted;
231        if abort_run {
232            break;
233        }
234    }
235    let summary = build_run_summary(&context, &entity_outcomes);
236    if let Some(report_target) = &context.report_target {
237        let summary_write_start = perf_enabled.then(Instant::now);
238        write_summary_report(
239            report_target,
240            &context.run_id,
241            &summary,
242            runtime.storage(),
243            &context.storage_resolver,
244        )?;
245        if let Some(start) = summary_write_start {
246            perf::emit_perf_log(
247                observer,
248                &context.run_id,
249                None,
250                "perf_run_phase_timings",
251                json!({
252                    "phase": "write_summary_report",
253                    "elapsed_ms": start.elapsed().as_millis() as u64,
254                    "entity_count": entity_outcomes.len(),
255                }),
256            );
257        }
258    }
259    observer.on_event(RunEvent::RunFinished {
260        run_id: context.run_id.clone(),
261        status: run_status_str(summary.run.status).to_string(),
262        exit_code: summary.run.exit_code,
263        files: summary.results.files_total,
264        rows: summary.results.rows_total,
265        accepted: summary.results.accepted_total,
266        rejected: summary.results.rejected_total,
267        warnings: summary.results.warnings_total,
268        errors: summary.results.errors_total,
269        summary_uri: context.report_target.as_ref().map(|target| {
270            target.join_relative(&report::ReportWriter::summary_relative_path(
271                &context.run_id,
272            ))
273        }),
274        ts_ms: event_time_ms(),
275    });
276
277    Ok(RunOutcome {
278        run_id: context.run_id.clone(),
279        report_base_path: context.report_base_path.clone(),
280        entity_outcomes,
281        summary,
282        dry_run_previews: None,
283    })
284}
285
286fn init_thread_pool() {
287    static INIT: Once = Once::new();
288    INIT.call_once(|| {
289        if std::env::var("RAYON_NUM_THREADS").is_ok() {
290            return;
291        }
292        let cap = std::env::var("FLOE_MAX_THREADS")
293            .ok()
294            .and_then(|value| value.parse::<usize>().ok())
295            .unwrap_or(4);
296        let available = std::thread::available_parallelism()
297            .map(|value| value.get())
298            .unwrap_or(1);
299        let threads = available.min(cap).max(1);
300        let _ = rayon::ThreadPoolBuilder::new()
301            .num_threads(threads)
302            .build_global();
303    });
304}
305
306fn select_entities<'a>(
307    context: &'a RunContext,
308    options: &RunOptions,
309) -> Vec<&'a config::EntityConfig> {
310    if options.entities.is_empty() {
311        context.config.entities.iter().collect()
312    } else {
313        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
314        context
315            .config
316            .entities
317            .iter()
318            .filter(|entity| selected.contains(entity.name.as_str()))
319            .collect()
320    }
321}
322
323fn resolve_entity_plans<'a>(
324    context: &'a RunContext,
325    runtime: &mut dyn Runtime,
326    entities: &[&'a config::EntityConfig],
327    resolution_mode: io::storage::inputs::ResolveInputsMode,
328) -> FloeResult<Vec<EntityRunPlan<'a>>> {
329    let mut plans = Vec::with_capacity(entities.len());
330    for entity in entities {
331        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
332        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
333        let needs_temp = matches!(
334            resolution_mode,
335            io::storage::inputs::ResolveInputsMode::Download
336        ) && (resolved_targets.source.is_remote()
337            || resolved_targets.accepted.is_remote()
338            || resolved_targets
339                .rejected
340                .as_ref()
341                .is_some_and(io::storage::Target::is_remote));
342        let temp_dir = if needs_temp {
343            Some(
344                tempfile::TempDir::new()
345                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
346            )
347        } else {
348            None
349        };
350        let storage_client = Some(runtime.storage().client_for(
351            &context.storage_resolver,
352            resolved_targets.source.storage(),
353            entity,
354        )? as &dyn io::storage::StorageClient);
355        let resolved_inputs = io::storage::ops::resolve_inputs(
356            &context.config_dir,
357            entity,
358            input_adapter,
359            &resolved_targets.source,
360            resolution_mode,
361            temp_dir.as_ref().map(|dir| dir.path()),
362            storage_client,
363        )?;
364        plans.push(EntityRunPlan {
365            entity,
366            resolved_targets,
367            resolved_inputs,
368            temp_dir,
369        });
370    }
371    Ok(plans)
372}
373
374fn build_run_summary(
375    context: &RunContext,
376    entity_outcomes: &[EntityOutcome],
377) -> report::RunSummaryReport {
378    let mut totals = report::ResultsTotals {
379        files_total: 0,
380        rows_total: 0,
381        accepted_total: 0,
382        rejected_total: 0,
383        warnings_total: 0,
384        errors_total: 0,
385    };
386    let mut statuses = Vec::new();
387    let mut entities = Vec::with_capacity(entity_outcomes.len());
388
389    for outcome in entity_outcomes {
390        let report = &outcome.report;
391        totals.files_total += report.results.files_total;
392        totals.rows_total += report.results.rows_total;
393        totals.accepted_total += report.results.accepted_total;
394        totals.rejected_total += report.results.rejected_total;
395        totals.warnings_total += report.results.warnings_total;
396        totals.errors_total += report.results.errors_total;
397
398        let file_statuses = report
399            .files
400            .iter()
401            .map(|file| file.status)
402            .collect::<Vec<_>>();
403        let (mut status, _) = report::compute_run_outcome(&file_statuses);
404        status = upgrade_status_for_warnings(status, report.results.warnings_total);
405        statuses.extend(file_statuses);
406
407        let report_file = context
408            .report_target
409            .as_ref()
410            .map(|target| {
411                target.join_relative(&report::ReportWriter::report_relative_path(
412                    &context.run_id,
413                    &report.entity.name,
414                ))
415            })
416            .unwrap_or_else(|| "disabled".to_string());
417        entities.push(report::EntitySummary {
418            name: report.entity.name.clone(),
419            status,
420            results: report.results.clone(),
421            report_file,
422        });
423    }
424
425    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
426    status = upgrade_status_for_warnings(status, totals.warnings_total);
427
428    let finished_at = report::now_rfc3339();
429    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
430    let report_base_path = context
431        .report_base_path
432        .clone()
433        .unwrap_or_else(|| "disabled".to_string());
434    let report_file = context
435        .report_target
436        .as_ref()
437        .map(|target| {
438            target.join_relative(&report::ReportWriter::summary_relative_path(
439                &context.run_id,
440            ))
441        })
442        .unwrap_or_else(|| "disabled".to_string());
443
444    report::RunSummaryReport {
445        spec_version: context.config.version.clone(),
446        tool: build_tool_info(),
447        run: report::RunInfo {
448            run_id: context.run_id.clone(),
449            started_at: context.started_at.clone(),
450            finished_at,
451            duration_ms,
452            status,
453            exit_code,
454        },
455        config: build_config_echo(context),
456        report: build_report_echo(report_base_path, report_file),
457        results: totals,
458        entities,
459    }
460}
461
462fn create_dry_run_outcome(
463    context: &RunContext,
464    plans: Vec<EntityRunPlan<'_>>,
465) -> FloeResult<RunOutcome> {
466    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
467
468    for plan in plans {
469        let entity = plan.entity;
470        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
471        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
472        let (archive_path, archive_storage) = entity
473            .sink
474            .archive
475            .as_ref()
476            .map(|a| (a.path.clone(), a.storage.clone()))
477            .unwrap_or_else(|| (String::new(), None));
478
479        let report_file = context.report_target.as_ref().map(|target| {
480            target.join_relative(&report::ReportWriter::report_relative_path(
481                &context.run_id,
482                &entity.name,
483            ))
484        });
485
486        previews.push(DryRunEntityPreview {
487            name: entity.name.clone(),
488            input_path: entity.source.path.clone(),
489            input_format: entity.source.format.clone(),
490            accepted_path: entity.sink.accepted.path.clone(),
491            accepted_format: entity.sink.accepted.format.clone(),
492            rejected_path,
493            rejected_format,
494            archive_path,
495            archive_storage,
496            report_file,
497            scanned_files: plan.resolved_inputs.listed,
498        });
499    }
500
501    Ok(RunOutcome {
502        run_id: context.run_id.clone(),
503        report_base_path: context.report_base_path.clone(),
504        entity_outcomes: Vec::new(),
505        summary: report::RunSummaryReport {
506            spec_version: context.config.version.clone(),
507            tool: build_tool_info(),
508            run: report::RunInfo {
509                run_id: context.run_id.clone(),
510                started_at: context.started_at.clone(),
511                finished_at: report::now_rfc3339(),
512                duration_ms: 0,
513                status: report::RunStatus::Success,
514                exit_code: 0,
515            },
516            config: build_config_echo(context),
517            report: build_report_echo(
518                context
519                    .report_base_path
520                    .clone()
521                    .unwrap_or_else(|| "disabled".to_string()),
522                "disabled (dry-run)".to_string(),
523            ),
524            results: report::ResultsTotals {
525                files_total: 0,
526                rows_total: 0,
527                accepted_total: 0,
528                rejected_total: 0,
529                warnings_total: 0,
530                errors_total: 0,
531            },
532            entities: Vec::new(),
533        },
534        dry_run_previews: Some(previews),
535    })
536}
537
538fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
539    if status == report::RunStatus::Success && warnings > 0 {
540        report::RunStatus::SuccessWithWarnings
541    } else {
542        status
543    }
544}
545
546fn build_tool_info() -> report::ToolInfo {
547    report::ToolInfo {
548        name: "floe".to_string(),
549        version: env!("CARGO_PKG_VERSION").to_string(),
550        git: None,
551    }
552}
553
554fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
555    report::ConfigEcho {
556        path: context.config_path.display().to_string(),
557        version: context.config.version.clone(),
558        metadata: context.config.metadata.as_ref().map(project_metadata_json),
559    }
560}
561
562fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
563    report::ReportEcho { path, report_file }
564}
565
566fn run_status_str(status: report::RunStatus) -> &'static str {
567    match status {
568        report::RunStatus::Success => "success",
569        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
570        report::RunStatus::Rejected => "rejected",
571        report::RunStatus::Aborted => "aborted",
572        report::RunStatus::Failed => "failed",
573    }
574}