Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27    pub(crate) entity: &'a config::EntityConfig,
28    pub(crate) resolved_targets: ResolvedEntityTargets,
29    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30    pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35    pub run_id: String,
36    pub report_base_path: Option<String>,
37    pub entity_outcomes: Vec<EntityOutcome>,
38    pub summary: report::RunSummaryReport,
39    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44    pub name: String,
45    pub input_path: String,
46    pub input_format: String,
47    pub accepted_path: String,
48    pub accepted_format: String,
49    pub rejected_path: Option<String>,
50    pub rejected_format: Option<String>,
51    pub archive_path: String,
52    pub archive_storage: Option<String>,
53    pub report_file: Option<String>,
54    pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59    pub report: crate::report::RunReport,
60    pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64    config: &config::RootConfig,
65    selected: &[String],
66) -> FloeResult<()> {
67    let missing: Vec<String> = selected
68        .iter()
69        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70        .cloned()
71        .collect();
72
73    if !missing.is_empty() {
74        return Err(Box::new(ConfigError(format!(
75            "entities not found: {}",
76            missing.join(", ")
77        ))));
78    }
79    Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83    let config_base = config::ConfigBase::local_from_path(config_path);
84    run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88    config_path: &Path,
89    config_base: config::ConfigBase,
90    options: RunOptions,
91) -> FloeResult<RunOutcome> {
92    let mut runtime = DefaultRuntime::new();
93    run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97    config_path: &Path,
98    config_base: config::ConfigBase,
99    options: RunOptions,
100    runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102    init_thread_pool();
103    let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
104    let profile_vars = options
105        .profile
106        .as_ref()
107        .map(|p| {
108            crate::resolve_vars(crate::VarSources {
109                profile: &p.variables,
110                cli: &std::collections::HashMap::new(),
111                config: &raw_config_env_vars,
112            })
113        })
114        .transpose()?
115        .unwrap_or_default();
116    let validate_options = ValidateOptions {
117        entities: options.entities.clone(),
118        profile_vars: profile_vars.clone(),
119        profile_catalogs: options
120            .profile
121            .as_ref()
122            .and_then(|profile| profile.catalogs.clone()),
123    };
124    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
125    let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
126    if !options.entities.is_empty() {
127        validate_entities(&context.config, &options.entities)?;
128    }
129
130    let observer = default_observer();
131    let perf_enabled = perf::phase_timing_enabled();
132    let selected_entities = select_entities(&context, &options);
133    let resolution_mode = if options.dry_run {
134        io::storage::inputs::ResolveInputsMode::ListOnly
135    } else {
136        io::storage::inputs::ResolveInputsMode::Download
137    };
138    if !options.dry_run {
139        observer.on_event(RunEvent::RunStarted {
140            run_id: context.run_id.clone(),
141            config: context.config_path.display().to_string(),
142            report_base: context.report_base_path.clone(),
143            ts_ms: event_time_ms(),
144        });
145        crate::run::events::mark_run_started();
146    }
147    let resolve_start = perf_enabled.then(Instant::now);
148    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
149    if let Some(start) = resolve_start {
150        perf::emit_perf_log(
151            observer,
152            &context.run_id,
153            None,
154            "perf_run_phase_timings",
155            json!({
156                "phase": "resolve_inputs",
157                "elapsed_ms": start.elapsed().as_millis() as u64,
158                "entity_count": selected_entities.len(),
159                "mode": match resolution_mode {
160                    io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
161                    io::storage::inputs::ResolveInputsMode::Download => "download",
162                },
163            }),
164        );
165    }
166    if options.dry_run {
167        return create_dry_run_outcome(&context, plans);
168    }
169
170    let mut entity_outcomes = Vec::new();
171    let mut abort_run = false;
172
173    for plan in plans {
174        let entity_name = plan.entity.name.clone();
175        observer.on_event(RunEvent::EntityStarted {
176            run_id: context.run_id.clone(),
177            name: entity_name.clone(),
178            ts_ms: event_time_ms(),
179        });
180        let entity_result = run_entity(&context, runtime, observer, plan);
181        if let Err(err) = entity_result {
182            observer.on_event(RunEvent::EntityFinished {
183                run_id: context.run_id.clone(),
184                name: entity_name,
185                status: "failed".to_string(),
186                files: 0,
187                rows: 0,
188                accepted: 0,
189                rejected: 0,
190                warnings: 0,
191                errors: 0,
192                ts_ms: event_time_ms(),
193            });
194            return Err(err);
195        }
196        let EntityRunResult {
197            outcome,
198            abort_run: aborted,
199        } = entity_result.unwrap();
200        let report = &outcome.report;
201        let (mut status, _) = report::compute_run_outcome(
202            &report
203                .files
204                .iter()
205                .map(|file| file.status)
206                .collect::<Vec<_>>(),
207        );
208        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
209            status = report::RunStatus::SuccessWithWarnings;
210        }
211        observer.on_event(RunEvent::EntityFinished {
212            run_id: context.run_id.clone(),
213            name: report.entity.name.clone(),
214            status: run_status_str(status).to_string(),
215            files: report.results.files_total,
216            rows: report.results.rows_total,
217            accepted: report.results.accepted_total,
218            rejected: report.results.rejected_total,
219            warnings: report.results.warnings_total,
220            errors: report.results.errors_total,
221            ts_ms: event_time_ms(),
222        });
223        entity_outcomes.push(outcome);
224        abort_run = abort_run || aborted;
225        if abort_run {
226            break;
227        }
228    }
229    let summary = build_run_summary(&context, &entity_outcomes);
230    if let Some(report_target) = &context.report_target {
231        let summary_write_start = perf_enabled.then(Instant::now);
232        write_summary_report(
233            report_target,
234            &context.run_id,
235            &summary,
236            runtime.storage(),
237            &context.storage_resolver,
238        )?;
239        if let Some(start) = summary_write_start {
240            perf::emit_perf_log(
241                observer,
242                &context.run_id,
243                None,
244                "perf_run_phase_timings",
245                json!({
246                    "phase": "write_summary_report",
247                    "elapsed_ms": start.elapsed().as_millis() as u64,
248                    "entity_count": entity_outcomes.len(),
249                }),
250            );
251        }
252    }
253    observer.on_event(RunEvent::RunFinished {
254        run_id: context.run_id.clone(),
255        status: run_status_str(summary.run.status).to_string(),
256        exit_code: summary.run.exit_code,
257        files: summary.results.files_total,
258        rows: summary.results.rows_total,
259        accepted: summary.results.accepted_total,
260        rejected: summary.results.rejected_total,
261        warnings: summary.results.warnings_total,
262        errors: summary.results.errors_total,
263        summary_uri: context.report_target.as_ref().map(|target| {
264            target.join_relative(&report::ReportWriter::summary_relative_path(
265                &context.run_id,
266            ))
267        }),
268        ts_ms: event_time_ms(),
269    });
270
271    Ok(RunOutcome {
272        run_id: context.run_id.clone(),
273        report_base_path: context.report_base_path.clone(),
274        entity_outcomes,
275        summary,
276        dry_run_previews: None,
277    })
278}
279
280fn init_thread_pool() {
281    static INIT: Once = Once::new();
282    INIT.call_once(|| {
283        if std::env::var("RAYON_NUM_THREADS").is_ok() {
284            return;
285        }
286        let cap = std::env::var("FLOE_MAX_THREADS")
287            .ok()
288            .and_then(|value| value.parse::<usize>().ok())
289            .unwrap_or(4);
290        let available = std::thread::available_parallelism()
291            .map(|value| value.get())
292            .unwrap_or(1);
293        let threads = available.min(cap).max(1);
294        let _ = rayon::ThreadPoolBuilder::new()
295            .num_threads(threads)
296            .build_global();
297    });
298}
299
300fn select_entities<'a>(
301    context: &'a RunContext,
302    options: &RunOptions,
303) -> Vec<&'a config::EntityConfig> {
304    if options.entities.is_empty() {
305        context.config.entities.iter().collect()
306    } else {
307        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
308        context
309            .config
310            .entities
311            .iter()
312            .filter(|entity| selected.contains(entity.name.as_str()))
313            .collect()
314    }
315}
316
317fn resolve_entity_plans<'a>(
318    context: &'a RunContext,
319    runtime: &mut dyn Runtime,
320    entities: &[&'a config::EntityConfig],
321    resolution_mode: io::storage::inputs::ResolveInputsMode,
322) -> FloeResult<Vec<EntityRunPlan<'a>>> {
323    let mut plans = Vec::with_capacity(entities.len());
324    for entity in entities {
325        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
326        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
327        let needs_temp = matches!(
328            resolution_mode,
329            io::storage::inputs::ResolveInputsMode::Download
330        ) && (resolved_targets.source.is_remote()
331            || resolved_targets.accepted.is_remote()
332            || resolved_targets
333                .rejected
334                .as_ref()
335                .is_some_and(io::storage::Target::is_remote));
336        let temp_dir = if needs_temp {
337            Some(
338                tempfile::TempDir::new()
339                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
340            )
341        } else {
342            None
343        };
344        let storage_client = Some(runtime.storage().client_for(
345            &context.storage_resolver,
346            resolved_targets.source.storage(),
347            entity,
348        )? as &dyn io::storage::StorageClient);
349        let resolved_inputs = io::storage::ops::resolve_inputs(
350            &context.config_dir,
351            entity,
352            input_adapter,
353            &resolved_targets.source,
354            resolution_mode,
355            temp_dir.as_ref().map(|dir| dir.path()),
356            storage_client,
357        )?;
358        plans.push(EntityRunPlan {
359            entity,
360            resolved_targets,
361            resolved_inputs,
362            temp_dir,
363        });
364    }
365    Ok(plans)
366}
367
368fn build_run_summary(
369    context: &RunContext,
370    entity_outcomes: &[EntityOutcome],
371) -> report::RunSummaryReport {
372    let mut totals = report::ResultsTotals {
373        files_total: 0,
374        rows_total: 0,
375        accepted_total: 0,
376        rejected_total: 0,
377        warnings_total: 0,
378        errors_total: 0,
379    };
380    let mut statuses = Vec::new();
381    let mut entities = Vec::with_capacity(entity_outcomes.len());
382
383    for outcome in entity_outcomes {
384        let report = &outcome.report;
385        totals.files_total += report.results.files_total;
386        totals.rows_total += report.results.rows_total;
387        totals.accepted_total += report.results.accepted_total;
388        totals.rejected_total += report.results.rejected_total;
389        totals.warnings_total += report.results.warnings_total;
390        totals.errors_total += report.results.errors_total;
391
392        let file_statuses = report
393            .files
394            .iter()
395            .map(|file| file.status)
396            .collect::<Vec<_>>();
397        let (mut status, _) = report::compute_run_outcome(&file_statuses);
398        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
399            status = report::RunStatus::SuccessWithWarnings;
400        }
401        statuses.extend(file_statuses);
402
403        let report_file = context
404            .report_target
405            .as_ref()
406            .map(|target| {
407                target.join_relative(&report::ReportWriter::report_relative_path(
408                    &context.run_id,
409                    &report.entity.name,
410                ))
411            })
412            .unwrap_or_else(|| "disabled".to_string());
413        entities.push(report::EntitySummary {
414            name: report.entity.name.clone(),
415            status,
416            results: report.results.clone(),
417            report_file,
418        });
419    }
420
421    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
422    if status == report::RunStatus::Success && totals.warnings_total > 0 {
423        status = report::RunStatus::SuccessWithWarnings;
424    }
425
426    let finished_at = report::now_rfc3339();
427    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
428    let report_base_path = context
429        .report_base_path
430        .clone()
431        .unwrap_or_else(|| "disabled".to_string());
432    let report_file = context
433        .report_target
434        .as_ref()
435        .map(|target| {
436            target.join_relative(&report::ReportWriter::summary_relative_path(
437                &context.run_id,
438            ))
439        })
440        .unwrap_or_else(|| "disabled".to_string());
441
442    report::RunSummaryReport {
443        spec_version: context.config.version.clone(),
444        tool: report::ToolInfo {
445            name: "floe".to_string(),
446            version: env!("CARGO_PKG_VERSION").to_string(),
447            git: None,
448        },
449        run: report::RunInfo {
450            run_id: context.run_id.clone(),
451            started_at: context.started_at.clone(),
452            finished_at,
453            duration_ms,
454            status,
455            exit_code,
456        },
457        config: report::ConfigEcho {
458            path: context.config_path.display().to_string(),
459            version: context.config.version.clone(),
460            metadata: context.config.metadata.as_ref().map(project_metadata_json),
461        },
462        report: report::ReportEcho {
463            path: report_base_path,
464            report_file,
465        },
466        results: totals,
467        entities,
468    }
469}
470
471fn create_dry_run_outcome(
472    context: &RunContext,
473    plans: Vec<EntityRunPlan<'_>>,
474) -> FloeResult<RunOutcome> {
475    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
476
477    for plan in plans {
478        let entity = plan.entity;
479        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
480        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
481        let (archive_path, archive_storage) = entity
482            .sink
483            .archive
484            .as_ref()
485            .map(|a| (a.path.clone(), a.storage.clone()))
486            .unwrap_or_else(|| (String::new(), None));
487
488        let report_file = context.report_target.as_ref().map(|target| {
489            target.join_relative(&report::ReportWriter::report_relative_path(
490                &context.run_id,
491                &entity.name,
492            ))
493        });
494
495        previews.push(DryRunEntityPreview {
496            name: entity.name.clone(),
497            input_path: entity.source.path.clone(),
498            input_format: entity.source.format.clone(),
499            accepted_path: entity.sink.accepted.path.clone(),
500            accepted_format: entity.sink.accepted.format.clone(),
501            rejected_path,
502            rejected_format,
503            archive_path,
504            archive_storage,
505            report_file,
506            scanned_files: plan.resolved_inputs.listed,
507        });
508    }
509
510    Ok(RunOutcome {
511        run_id: context.run_id.clone(),
512        report_base_path: context.report_base_path.clone(),
513        entity_outcomes: Vec::new(),
514        summary: report::RunSummaryReport {
515            spec_version: context.config.version.clone(),
516            tool: report::ToolInfo {
517                name: "floe".to_string(),
518                version: env!("CARGO_PKG_VERSION").to_string(),
519                git: None,
520            },
521            run: report::RunInfo {
522                run_id: context.run_id.clone(),
523                started_at: context.started_at.clone(),
524                finished_at: report::now_rfc3339(),
525                duration_ms: 0,
526                status: report::RunStatus::Success,
527                exit_code: 0,
528            },
529            config: report::ConfigEcho {
530                path: context.config_path.display().to_string(),
531                version: context.config.version.clone(),
532                metadata: context.config.metadata.as_ref().map(project_metadata_json),
533            },
534            report: report::ReportEcho {
535                path: context
536                    .report_base_path
537                    .clone()
538                    .unwrap_or_else(|| "disabled".to_string()),
539                report_file: "disabled (dry-run)".to_string(),
540            },
541            results: report::ResultsTotals {
542                files_total: 0,
543                rows_total: 0,
544                accepted_total: 0,
545                rejected_total: 0,
546                warnings_total: 0,
547                errors_total: 0,
548            },
549            entities: Vec::new(),
550        },
551        dry_run_previews: Some(previews),
552    })
553}
554
555fn run_status_str(status: report::RunStatus) -> &'static str {
556    match status {
557        report::RunStatus::Success => "success",
558        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
559        report::RunStatus::Rejected => "rejected",
560        report::RunStatus::Aborted => "aborted",
561        report::RunStatus::Failed => "failed",
562    }
563}