Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27    pub(crate) entity: &'a config::EntityConfig,
28    pub(crate) resolved_targets: ResolvedEntityTargets,
29    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30    pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35    pub run_id: String,
36    pub report_base_path: Option<String>,
37    pub entity_outcomes: Vec<EntityOutcome>,
38    pub summary: report::RunSummaryReport,
39    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44    pub name: String,
45    pub input_path: String,
46    pub input_format: String,
47    pub accepted_path: String,
48    pub accepted_format: String,
49    pub rejected_path: Option<String>,
50    pub rejected_format: Option<String>,
51    pub archive_path: String,
52    pub archive_storage: Option<String>,
53    pub report_file: Option<String>,
54    pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59    pub report: crate::report::RunReport,
60    pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64    config: &config::RootConfig,
65    selected: &[String],
66) -> FloeResult<()> {
67    let missing: Vec<String> = selected
68        .iter()
69        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70        .cloned()
71        .collect();
72
73    if !missing.is_empty() {
74        return Err(Box::new(ConfigError(format!(
75            "entities not found: {}",
76            missing.join(", ")
77        ))));
78    }
79    Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83    let config_base = config::ConfigBase::local_from_path(config_path);
84    run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88    config_path: &Path,
89    config_base: config::ConfigBase,
90    options: RunOptions,
91) -> FloeResult<RunOutcome> {
92    let mut runtime = DefaultRuntime::new();
93    run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97    config_path: &Path,
98    config_base: config::ConfigBase,
99    options: RunOptions,
100    runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102    init_thread_pool();
103    let validate_options = ValidateOptions {
104        entities: options.entities.clone(),
105    };
106    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
107    let context = RunContext::new(config_path, config_base, &options)?;
108    if !options.entities.is_empty() {
109        validate_entities(&context.config, &options.entities)?;
110    }
111
112    let observer = default_observer();
113    let perf_enabled = perf::phase_timing_enabled();
114    let selected_entities = select_entities(&context, &options);
115    let resolution_mode = if options.dry_run {
116        io::storage::inputs::ResolveInputsMode::ListOnly
117    } else {
118        io::storage::inputs::ResolveInputsMode::Download
119    };
120    if !options.dry_run {
121        observer.on_event(RunEvent::RunStarted {
122            run_id: context.run_id.clone(),
123            config: context.config_path.display().to_string(),
124            report_base: context.report_base_path.clone(),
125            ts_ms: event_time_ms(),
126        });
127    }
128    let resolve_start = perf_enabled.then(Instant::now);
129    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
130    if let Some(start) = resolve_start {
131        perf::emit_perf_log(
132            observer,
133            &context.run_id,
134            None,
135            "perf_run_phase_timings",
136            json!({
137                "phase": "resolve_inputs",
138                "elapsed_ms": start.elapsed().as_millis() as u64,
139                "entity_count": selected_entities.len(),
140                "mode": match resolution_mode {
141                    io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
142                    io::storage::inputs::ResolveInputsMode::Download => "download",
143                },
144            }),
145        );
146    }
147    if options.dry_run {
148        return create_dry_run_outcome(&context, plans);
149    }
150
151    let mut entity_outcomes = Vec::new();
152    let mut abort_run = false;
153
154    for plan in plans {
155        observer.on_event(RunEvent::EntityStarted {
156            run_id: context.run_id.clone(),
157            name: plan.entity.name.clone(),
158            ts_ms: event_time_ms(),
159        });
160        let EntityRunResult {
161            outcome,
162            abort_run: aborted,
163        } = run_entity(&context, runtime, observer, plan)?;
164        let report = &outcome.report;
165        let (mut status, _) = report::compute_run_outcome(
166            &report
167                .files
168                .iter()
169                .map(|file| file.status)
170                .collect::<Vec<_>>(),
171        );
172        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
173            status = report::RunStatus::SuccessWithWarnings;
174        }
175        observer.on_event(RunEvent::EntityFinished {
176            run_id: context.run_id.clone(),
177            name: report.entity.name.clone(),
178            status: run_status_str(status).to_string(),
179            files: report.results.files_total,
180            rows: report.results.rows_total,
181            accepted: report.results.accepted_total,
182            rejected: report.results.rejected_total,
183            warnings: report.results.warnings_total,
184            errors: report.results.errors_total,
185            ts_ms: event_time_ms(),
186        });
187        entity_outcomes.push(outcome);
188        abort_run = abort_run || aborted;
189        if abort_run {
190            break;
191        }
192    }
193    let summary = build_run_summary(&context, &entity_outcomes);
194    if let Some(report_target) = &context.report_target {
195        let summary_write_start = perf_enabled.then(Instant::now);
196        write_summary_report(
197            report_target,
198            &context.run_id,
199            &summary,
200            runtime.storage(),
201            &context.storage_resolver,
202        )?;
203        if let Some(start) = summary_write_start {
204            perf::emit_perf_log(
205                observer,
206                &context.run_id,
207                None,
208                "perf_run_phase_timings",
209                json!({
210                    "phase": "write_summary_report",
211                    "elapsed_ms": start.elapsed().as_millis() as u64,
212                    "entity_count": entity_outcomes.len(),
213                }),
214            );
215        }
216    }
217    observer.on_event(RunEvent::RunFinished {
218        run_id: context.run_id.clone(),
219        status: run_status_str(summary.run.status).to_string(),
220        exit_code: summary.run.exit_code,
221        files: summary.results.files_total,
222        rows: summary.results.rows_total,
223        accepted: summary.results.accepted_total,
224        rejected: summary.results.rejected_total,
225        warnings: summary.results.warnings_total,
226        errors: summary.results.errors_total,
227        summary_uri: context.report_target.as_ref().map(|target| {
228            target.join_relative(&report::ReportWriter::summary_relative_path(
229                &context.run_id,
230            ))
231        }),
232        ts_ms: event_time_ms(),
233    });
234
235    Ok(RunOutcome {
236        run_id: context.run_id.clone(),
237        report_base_path: context.report_base_path.clone(),
238        entity_outcomes,
239        summary,
240        dry_run_previews: None,
241    })
242}
243
244fn init_thread_pool() {
245    static INIT: Once = Once::new();
246    INIT.call_once(|| {
247        if std::env::var("RAYON_NUM_THREADS").is_ok() {
248            return;
249        }
250        let cap = std::env::var("FLOE_MAX_THREADS")
251            .ok()
252            .and_then(|value| value.parse::<usize>().ok())
253            .unwrap_or(4);
254        let available = std::thread::available_parallelism()
255            .map(|value| value.get())
256            .unwrap_or(1);
257        let threads = available.min(cap).max(1);
258        let _ = rayon::ThreadPoolBuilder::new()
259            .num_threads(threads)
260            .build_global();
261    });
262}
263
264fn select_entities<'a>(
265    context: &'a RunContext,
266    options: &RunOptions,
267) -> Vec<&'a config::EntityConfig> {
268    if options.entities.is_empty() {
269        context.config.entities.iter().collect()
270    } else {
271        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
272        context
273            .config
274            .entities
275            .iter()
276            .filter(|entity| selected.contains(entity.name.as_str()))
277            .collect()
278    }
279}
280
281fn resolve_entity_plans<'a>(
282    context: &'a RunContext,
283    runtime: &mut dyn Runtime,
284    entities: &[&'a config::EntityConfig],
285    resolution_mode: io::storage::inputs::ResolveInputsMode,
286) -> FloeResult<Vec<EntityRunPlan<'a>>> {
287    let mut plans = Vec::with_capacity(entities.len());
288    for entity in entities {
289        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
290        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
291        let needs_temp = matches!(
292            resolution_mode,
293            io::storage::inputs::ResolveInputsMode::Download
294        ) && (resolved_targets.source.is_remote()
295            || resolved_targets.accepted.is_remote()
296            || resolved_targets
297                .rejected
298                .as_ref()
299                .is_some_and(io::storage::Target::is_remote));
300        let temp_dir = if needs_temp {
301            Some(
302                tempfile::TempDir::new()
303                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
304            )
305        } else {
306            None
307        };
308        let storage_client = Some(runtime.storage().client_for(
309            &context.storage_resolver,
310            resolved_targets.source.storage(),
311            entity,
312        )? as &dyn io::storage::StorageClient);
313        let resolved_inputs = io::storage::ops::resolve_inputs(
314            &context.config_dir,
315            entity,
316            input_adapter,
317            &resolved_targets.source,
318            resolution_mode,
319            temp_dir.as_ref().map(|dir| dir.path()),
320            storage_client,
321        )?;
322        plans.push(EntityRunPlan {
323            entity,
324            resolved_targets,
325            resolved_inputs,
326            temp_dir,
327        });
328    }
329    Ok(plans)
330}
331
332fn build_run_summary(
333    context: &RunContext,
334    entity_outcomes: &[EntityOutcome],
335) -> report::RunSummaryReport {
336    let mut totals = report::ResultsTotals {
337        files_total: 0,
338        rows_total: 0,
339        accepted_total: 0,
340        rejected_total: 0,
341        warnings_total: 0,
342        errors_total: 0,
343    };
344    let mut statuses = Vec::new();
345    let mut entities = Vec::with_capacity(entity_outcomes.len());
346
347    for outcome in entity_outcomes {
348        let report = &outcome.report;
349        totals.files_total += report.results.files_total;
350        totals.rows_total += report.results.rows_total;
351        totals.accepted_total += report.results.accepted_total;
352        totals.rejected_total += report.results.rejected_total;
353        totals.warnings_total += report.results.warnings_total;
354        totals.errors_total += report.results.errors_total;
355
356        let file_statuses = report
357            .files
358            .iter()
359            .map(|file| file.status)
360            .collect::<Vec<_>>();
361        let (mut status, _) = report::compute_run_outcome(&file_statuses);
362        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
363            status = report::RunStatus::SuccessWithWarnings;
364        }
365        statuses.extend(file_statuses);
366
367        let report_file = context
368            .report_target
369            .as_ref()
370            .map(|target| {
371                target.join_relative(&report::ReportWriter::report_relative_path(
372                    &context.run_id,
373                    &report.entity.name,
374                ))
375            })
376            .unwrap_or_else(|| "disabled".to_string());
377        entities.push(report::EntitySummary {
378            name: report.entity.name.clone(),
379            status,
380            results: report.results.clone(),
381            report_file,
382        });
383    }
384
385    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
386    if status == report::RunStatus::Success && totals.warnings_total > 0 {
387        status = report::RunStatus::SuccessWithWarnings;
388    }
389
390    let finished_at = report::now_rfc3339();
391    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
392    let report_base_path = context
393        .report_base_path
394        .clone()
395        .unwrap_or_else(|| "disabled".to_string());
396    let report_file = context
397        .report_target
398        .as_ref()
399        .map(|target| {
400            target.join_relative(&report::ReportWriter::summary_relative_path(
401                &context.run_id,
402            ))
403        })
404        .unwrap_or_else(|| "disabled".to_string());
405
406    report::RunSummaryReport {
407        spec_version: context.config.version.clone(),
408        tool: report::ToolInfo {
409            name: "floe".to_string(),
410            version: env!("CARGO_PKG_VERSION").to_string(),
411            git: None,
412        },
413        run: report::RunInfo {
414            run_id: context.run_id.clone(),
415            started_at: context.started_at.clone(),
416            finished_at,
417            duration_ms,
418            status,
419            exit_code,
420        },
421        config: report::ConfigEcho {
422            path: context.config_path.display().to_string(),
423            version: context.config.version.clone(),
424            metadata: context.config.metadata.as_ref().map(project_metadata_json),
425        },
426        report: report::ReportEcho {
427            path: report_base_path,
428            report_file,
429        },
430        results: totals,
431        entities,
432    }
433}
434
435fn create_dry_run_outcome(
436    context: &RunContext,
437    plans: Vec<EntityRunPlan<'_>>,
438) -> FloeResult<RunOutcome> {
439    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
440
441    for plan in plans {
442        let entity = plan.entity;
443        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
444        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
445        let (archive_path, archive_storage) = entity
446            .sink
447            .archive
448            .as_ref()
449            .map(|a| (a.path.clone(), a.storage.clone()))
450            .unwrap_or_else(|| (String::new(), None));
451
452        let report_file = context.report_target.as_ref().map(|target| {
453            target.join_relative(&report::ReportWriter::report_relative_path(
454                &context.run_id,
455                &entity.name,
456            ))
457        });
458
459        previews.push(DryRunEntityPreview {
460            name: entity.name.clone(),
461            input_path: entity.source.path.clone(),
462            input_format: entity.source.format.clone(),
463            accepted_path: entity.sink.accepted.path.clone(),
464            accepted_format: entity.sink.accepted.format.clone(),
465            rejected_path,
466            rejected_format,
467            archive_path,
468            archive_storage,
469            report_file,
470            scanned_files: plan.resolved_inputs.listed,
471        });
472    }
473
474    Ok(RunOutcome {
475        run_id: context.run_id.clone(),
476        report_base_path: context.report_base_path.clone(),
477        entity_outcomes: Vec::new(),
478        summary: report::RunSummaryReport {
479            spec_version: context.config.version.clone(),
480            tool: report::ToolInfo {
481                name: "floe".to_string(),
482                version: env!("CARGO_PKG_VERSION").to_string(),
483                git: None,
484            },
485            run: report::RunInfo {
486                run_id: context.run_id.clone(),
487                started_at: context.started_at.clone(),
488                finished_at: report::now_rfc3339(),
489                duration_ms: 0,
490                status: report::RunStatus::Success,
491                exit_code: 0,
492            },
493            config: report::ConfigEcho {
494                path: context.config_path.display().to_string(),
495                version: context.config.version.clone(),
496                metadata: context.config.metadata.as_ref().map(project_metadata_json),
497            },
498            report: report::ReportEcho {
499                path: context
500                    .report_base_path
501                    .clone()
502                    .unwrap_or_else(|| "disabled".to_string()),
503                report_file: "disabled (dry-run)".to_string(),
504            },
505            results: report::ResultsTotals {
506                files_total: 0,
507                rows_total: 0,
508                accepted_total: 0,
509                rejected_total: 0,
510                warnings_total: 0,
511                errors_total: 0,
512            },
513            entities: Vec::new(),
514        },
515        dry_run_previews: Some(previews),
516    })
517}
518
519fn run_status_str(status: report::RunStatus) -> &'static str {
520    match status {
521        report::RunStatus::Success => "success",
522        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
523        report::RunStatus::Rejected => "rejected",
524        report::RunStatus::Aborted => "aborted",
525        report::RunStatus::Failed => "failed",
526    }
527}