Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4
5use crate::errors::IoError;
6use crate::report::build::project_metadata_json;
7use crate::report::output::write_summary_report;
8use crate::runtime::{DefaultRuntime, Runtime};
9use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
10
11mod context;
12pub(crate) mod entity;
13pub mod events;
14mod file;
15mod output;
16
17pub(crate) use context::RunContext;
18use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
19use events::{default_observer, event_time_ms, RunEvent};
20
21pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
22
23pub(crate) struct EntityRunPlan<'a> {
24    pub(crate) entity: &'a config::EntityConfig,
25    pub(crate) resolved_targets: ResolvedEntityTargets,
26    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
27    pub(crate) temp_dir: Option<tempfile::TempDir>,
28}
29
30#[derive(Debug, Clone)]
31pub struct RunOutcome {
32    pub run_id: String,
33    pub report_base_path: Option<String>,
34    pub entity_outcomes: Vec<EntityOutcome>,
35    pub summary: report::RunSummaryReport,
36    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct DryRunEntityPreview {
41    pub name: String,
42    pub input_path: String,
43    pub input_format: String,
44    pub accepted_path: String,
45    pub accepted_format: String,
46    pub rejected_path: Option<String>,
47    pub rejected_format: Option<String>,
48    pub archive_path: String,
49    pub archive_storage: Option<String>,
50    pub report_file: Option<String>,
51    pub scanned_files: Vec<String>,
52}
53
54#[derive(Debug, Clone)]
55pub struct EntityOutcome {
56    pub report: crate::report::RunReport,
57    pub file_timings_ms: Vec<Option<u64>>,
58}
59
60pub(crate) fn validate_entities(
61    config: &config::RootConfig,
62    selected: &[String],
63) -> FloeResult<()> {
64    let missing: Vec<String> = selected
65        .iter()
66        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
67        .cloned()
68        .collect();
69
70    if !missing.is_empty() {
71        return Err(Box::new(ConfigError(format!(
72            "entities not found: {}",
73            missing.join(", ")
74        ))));
75    }
76    Ok(())
77}
78
79pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
80    let config_base = config::ConfigBase::local_from_path(config_path);
81    run_with_base(config_path, config_base, options)
82}
83
84pub fn run_with_base(
85    config_path: &Path,
86    config_base: config::ConfigBase,
87    options: RunOptions,
88) -> FloeResult<RunOutcome> {
89    let mut runtime = DefaultRuntime::new();
90    run_with_runtime(config_path, config_base, options, &mut runtime)
91}
92
93pub fn run_with_runtime(
94    config_path: &Path,
95    config_base: config::ConfigBase,
96    options: RunOptions,
97    runtime: &mut dyn Runtime,
98) -> FloeResult<RunOutcome> {
99    init_thread_pool();
100    let validate_options = ValidateOptions {
101        entities: options.entities.clone(),
102    };
103    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
104
105    let context = RunContext::new(config_path, config_base, &options)?;
106    if !options.entities.is_empty() {
107        validate_entities(&context.config, &options.entities)?;
108    }
109
110    let selected_entities = select_entities(&context, &options);
111    let resolution_mode = if options.dry_run {
112        io::storage::inputs::ResolveInputsMode::ListOnly
113    } else {
114        io::storage::inputs::ResolveInputsMode::Download
115    };
116    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
117    if options.dry_run {
118        return create_dry_run_outcome(&context, plans);
119    }
120
121    let mut entity_outcomes = Vec::new();
122    let mut abort_run = false;
123    let observer = default_observer();
124    observer.on_event(RunEvent::RunStarted {
125        run_id: context.run_id.clone(),
126        config: context.config_path.display().to_string(),
127        report_base: context.report_base_path.clone(),
128        ts_ms: event_time_ms(),
129    });
130
131    for plan in plans {
132        observer.on_event(RunEvent::EntityStarted {
133            run_id: context.run_id.clone(),
134            name: plan.entity.name.clone(),
135            ts_ms: event_time_ms(),
136        });
137        let EntityRunResult {
138            outcome,
139            abort_run: aborted,
140        } = run_entity(&context, runtime, observer, plan)?;
141        let report = &outcome.report;
142        let (mut status, _) = report::compute_run_outcome(
143            &report
144                .files
145                .iter()
146                .map(|file| file.status)
147                .collect::<Vec<_>>(),
148        );
149        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
150            status = report::RunStatus::SuccessWithWarnings;
151        }
152        observer.on_event(RunEvent::EntityFinished {
153            run_id: context.run_id.clone(),
154            name: report.entity.name.clone(),
155            status: run_status_str(status).to_string(),
156            files: report.results.files_total,
157            rows: report.results.rows_total,
158            accepted: report.results.accepted_total,
159            rejected: report.results.rejected_total,
160            warnings: report.results.warnings_total,
161            errors: report.results.errors_total,
162            ts_ms: event_time_ms(),
163        });
164        entity_outcomes.push(outcome);
165        abort_run = abort_run || aborted;
166        if abort_run {
167            break;
168        }
169    }
170    let summary = build_run_summary(&context, &entity_outcomes);
171    if let Some(report_target) = &context.report_target {
172        write_summary_report(
173            report_target,
174            &context.run_id,
175            &summary,
176            runtime.storage(),
177            &context.storage_resolver,
178        )?;
179    }
180    observer.on_event(RunEvent::RunFinished {
181        run_id: context.run_id.clone(),
182        status: run_status_str(summary.run.status).to_string(),
183        exit_code: summary.run.exit_code,
184        files: summary.results.files_total,
185        rows: summary.results.rows_total,
186        accepted: summary.results.accepted_total,
187        rejected: summary.results.rejected_total,
188        warnings: summary.results.warnings_total,
189        errors: summary.results.errors_total,
190        summary_uri: context.report_target.as_ref().map(|target| {
191            target.join_relative(&report::ReportWriter::summary_relative_path(
192                &context.run_id,
193            ))
194        }),
195        ts_ms: event_time_ms(),
196    });
197
198    Ok(RunOutcome {
199        run_id: context.run_id.clone(),
200        report_base_path: context.report_base_path.clone(),
201        entity_outcomes,
202        summary,
203        dry_run_previews: None,
204    })
205}
206
207fn init_thread_pool() {
208    static INIT: Once = Once::new();
209    INIT.call_once(|| {
210        if std::env::var("RAYON_NUM_THREADS").is_ok() {
211            return;
212        }
213        let cap = std::env::var("FLOE_MAX_THREADS")
214            .ok()
215            .and_then(|value| value.parse::<usize>().ok())
216            .unwrap_or(4);
217        let available = std::thread::available_parallelism()
218            .map(|value| value.get())
219            .unwrap_or(1);
220        let threads = available.min(cap).max(1);
221        let _ = rayon::ThreadPoolBuilder::new()
222            .num_threads(threads)
223            .build_global();
224    });
225}
226
227fn select_entities<'a>(
228    context: &'a RunContext,
229    options: &RunOptions,
230) -> Vec<&'a config::EntityConfig> {
231    if options.entities.is_empty() {
232        context.config.entities.iter().collect()
233    } else {
234        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
235        context
236            .config
237            .entities
238            .iter()
239            .filter(|entity| selected.contains(entity.name.as_str()))
240            .collect()
241    }
242}
243
244fn resolve_entity_plans<'a>(
245    context: &'a RunContext,
246    runtime: &mut dyn Runtime,
247    entities: &[&'a config::EntityConfig],
248    resolution_mode: io::storage::inputs::ResolveInputsMode,
249) -> FloeResult<Vec<EntityRunPlan<'a>>> {
250    let mut plans = Vec::with_capacity(entities.len());
251    for entity in entities {
252        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
253        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
254        let needs_temp = matches!(
255            resolution_mode,
256            io::storage::inputs::ResolveInputsMode::Download
257        ) && (resolved_targets.source.is_remote()
258            || resolved_targets.accepted.is_remote()
259            || resolved_targets
260                .rejected
261                .as_ref()
262                .is_some_and(io::storage::Target::is_remote));
263        let temp_dir = if needs_temp {
264            Some(
265                tempfile::TempDir::new()
266                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
267            )
268        } else {
269            None
270        };
271        let storage_client = Some(runtime.storage().client_for(
272            &context.storage_resolver,
273            resolved_targets.source.storage(),
274            entity,
275        )? as &dyn io::storage::StorageClient);
276        let resolved_inputs = io::storage::ops::resolve_inputs(
277            &context.config_dir,
278            entity,
279            input_adapter,
280            &resolved_targets.source,
281            resolution_mode,
282            temp_dir.as_ref().map(|dir| dir.path()),
283            storage_client,
284        )?;
285        plans.push(EntityRunPlan {
286            entity,
287            resolved_targets,
288            resolved_inputs,
289            temp_dir,
290        });
291    }
292    Ok(plans)
293}
294
295fn build_run_summary(
296    context: &RunContext,
297    entity_outcomes: &[EntityOutcome],
298) -> report::RunSummaryReport {
299    let mut totals = report::ResultsTotals {
300        files_total: 0,
301        rows_total: 0,
302        accepted_total: 0,
303        rejected_total: 0,
304        warnings_total: 0,
305        errors_total: 0,
306    };
307    let mut statuses = Vec::new();
308    let mut entities = Vec::with_capacity(entity_outcomes.len());
309
310    for outcome in entity_outcomes {
311        let report = &outcome.report;
312        totals.files_total += report.results.files_total;
313        totals.rows_total += report.results.rows_total;
314        totals.accepted_total += report.results.accepted_total;
315        totals.rejected_total += report.results.rejected_total;
316        totals.warnings_total += report.results.warnings_total;
317        totals.errors_total += report.results.errors_total;
318
319        let file_statuses = report
320            .files
321            .iter()
322            .map(|file| file.status)
323            .collect::<Vec<_>>();
324        let (mut status, _) = report::compute_run_outcome(&file_statuses);
325        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
326            status = report::RunStatus::SuccessWithWarnings;
327        }
328        statuses.extend(file_statuses);
329
330        let report_file = context
331            .report_target
332            .as_ref()
333            .map(|target| {
334                target.join_relative(&report::ReportWriter::report_relative_path(
335                    &context.run_id,
336                    &report.entity.name,
337                ))
338            })
339            .unwrap_or_else(|| "disabled".to_string());
340        entities.push(report::EntitySummary {
341            name: report.entity.name.clone(),
342            status,
343            results: report.results.clone(),
344            report_file,
345        });
346    }
347
348    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
349    if status == report::RunStatus::Success && totals.warnings_total > 0 {
350        status = report::RunStatus::SuccessWithWarnings;
351    }
352
353    let finished_at = report::now_rfc3339();
354    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
355    let report_base_path = context
356        .report_base_path
357        .clone()
358        .unwrap_or_else(|| "disabled".to_string());
359    let report_file = context
360        .report_target
361        .as_ref()
362        .map(|target| {
363            target.join_relative(&report::ReportWriter::summary_relative_path(
364                &context.run_id,
365            ))
366        })
367        .unwrap_or_else(|| "disabled".to_string());
368
369    report::RunSummaryReport {
370        spec_version: context.config.version.clone(),
371        tool: report::ToolInfo {
372            name: "floe".to_string(),
373            version: env!("CARGO_PKG_VERSION").to_string(),
374            git: None,
375        },
376        run: report::RunInfo {
377            run_id: context.run_id.clone(),
378            started_at: context.started_at.clone(),
379            finished_at,
380            duration_ms,
381            status,
382            exit_code,
383        },
384        config: report::ConfigEcho {
385            path: context.config_path.display().to_string(),
386            version: context.config.version.clone(),
387            metadata: context.config.metadata.as_ref().map(project_metadata_json),
388        },
389        report: report::ReportEcho {
390            path: report_base_path,
391            report_file,
392        },
393        results: totals,
394        entities,
395    }
396}
397
398fn create_dry_run_outcome(
399    context: &RunContext,
400    plans: Vec<EntityRunPlan<'_>>,
401) -> FloeResult<RunOutcome> {
402    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
403
404    for plan in plans {
405        let entity = plan.entity;
406        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
407        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
408        let (archive_path, archive_storage) = entity
409            .sink
410            .archive
411            .as_ref()
412            .map(|a| (a.path.clone(), a.storage.clone()))
413            .unwrap_or_else(|| (String::new(), None));
414
415        let report_file = context.report_target.as_ref().map(|target| {
416            target.join_relative(&report::ReportWriter::report_relative_path(
417                &context.run_id,
418                &entity.name,
419            ))
420        });
421
422        previews.push(DryRunEntityPreview {
423            name: entity.name.clone(),
424            input_path: entity.source.path.clone(),
425            input_format: entity.source.format.clone(),
426            accepted_path: entity.sink.accepted.path.clone(),
427            accepted_format: entity.sink.accepted.format.clone(),
428            rejected_path,
429            rejected_format,
430            archive_path,
431            archive_storage,
432            report_file,
433            scanned_files: plan.resolved_inputs.listed,
434        });
435    }
436
437    Ok(RunOutcome {
438        run_id: context.run_id.clone(),
439        report_base_path: context.report_base_path.clone(),
440        entity_outcomes: Vec::new(),
441        summary: report::RunSummaryReport {
442            spec_version: context.config.version.clone(),
443            tool: report::ToolInfo {
444                name: "floe".to_string(),
445                version: env!("CARGO_PKG_VERSION").to_string(),
446                git: None,
447            },
448            run: report::RunInfo {
449                run_id: context.run_id.clone(),
450                started_at: context.started_at.clone(),
451                finished_at: report::now_rfc3339(),
452                duration_ms: 0,
453                status: report::RunStatus::Success,
454                exit_code: 0,
455            },
456            config: report::ConfigEcho {
457                path: context.config_path.display().to_string(),
458                version: context.config.version.clone(),
459                metadata: context.config.metadata.as_ref().map(project_metadata_json),
460            },
461            report: report::ReportEcho {
462                path: context
463                    .report_base_path
464                    .clone()
465                    .unwrap_or_else(|| "disabled".to_string()),
466                report_file: "disabled (dry-run)".to_string(),
467            },
468            results: report::ResultsTotals {
469                files_total: 0,
470                rows_total: 0,
471                accepted_total: 0,
472                rejected_total: 0,
473                warnings_total: 0,
474                errors_total: 0,
475            },
476            entities: Vec::new(),
477        },
478        dry_run_previews: Some(previews),
479    })
480}
481
482fn run_status_str(status: report::RunStatus) -> &'static str {
483    match status {
484        report::RunStatus::Success => "success",
485        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
486        report::RunStatus::Rejected => "rejected",
487        report::RunStatus::Aborted => "aborted",
488        report::RunStatus::Failed => "failed",
489    }
490}