Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27    pub(crate) entity: &'a config::EntityConfig,
28    pub(crate) resolved_targets: ResolvedEntityTargets,
29    pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30    pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35    pub run_id: String,
36    pub report_base_path: Option<String>,
37    pub entity_outcomes: Vec<EntityOutcome>,
38    pub summary: report::RunSummaryReport,
39    pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44    pub name: String,
45    pub input_path: String,
46    pub input_format: String,
47    pub accepted_path: String,
48    pub accepted_format: String,
49    pub rejected_path: Option<String>,
50    pub rejected_format: Option<String>,
51    pub archive_path: String,
52    pub archive_storage: Option<String>,
53    pub report_file: Option<String>,
54    pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59    pub report: crate::report::RunReport,
60    pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64    config: &config::RootConfig,
65    selected: &[String],
66) -> FloeResult<()> {
67    let missing: Vec<String> = selected
68        .iter()
69        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70        .cloned()
71        .collect();
72
73    if !missing.is_empty() {
74        return Err(Box::new(ConfigError(format!(
75            "entities not found: {}",
76            missing.join(", ")
77        ))));
78    }
79    Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83    let config_base = config::ConfigBase::local_from_path(config_path);
84    run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88    config_path: &Path,
89    config_base: config::ConfigBase,
90    options: RunOptions,
91) -> FloeResult<RunOutcome> {
92    let mut runtime = DefaultRuntime::new();
93    run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97    config_path: &Path,
98    config_base: config::ConfigBase,
99    options: RunOptions,
100    runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102    init_thread_pool();
103    let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
104    let profile_vars = options
105        .profile
106        .as_ref()
107        .map(|p| {
108            crate::resolve_vars(crate::VarSources {
109                profile: &p.variables,
110                cli: &std::collections::HashMap::new(),
111                config: &raw_config_env_vars,
112            })
113        })
114        .transpose()?
115        .unwrap_or_default();
116    let validate_options = ValidateOptions {
117        entities: options.entities.clone(),
118        profile_vars: profile_vars.clone(),
119    };
120    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
121    let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
122    if !options.entities.is_empty() {
123        validate_entities(&context.config, &options.entities)?;
124    }
125
126    let observer = default_observer();
127    let perf_enabled = perf::phase_timing_enabled();
128    let selected_entities = select_entities(&context, &options);
129    let resolution_mode = if options.dry_run {
130        io::storage::inputs::ResolveInputsMode::ListOnly
131    } else {
132        io::storage::inputs::ResolveInputsMode::Download
133    };
134    if !options.dry_run {
135        observer.on_event(RunEvent::RunStarted {
136            run_id: context.run_id.clone(),
137            config: context.config_path.display().to_string(),
138            report_base: context.report_base_path.clone(),
139            ts_ms: event_time_ms(),
140        });
141    }
142    let resolve_start = perf_enabled.then(Instant::now);
143    let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
144    if let Some(start) = resolve_start {
145        perf::emit_perf_log(
146            observer,
147            &context.run_id,
148            None,
149            "perf_run_phase_timings",
150            json!({
151                "phase": "resolve_inputs",
152                "elapsed_ms": start.elapsed().as_millis() as u64,
153                "entity_count": selected_entities.len(),
154                "mode": match resolution_mode {
155                    io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
156                    io::storage::inputs::ResolveInputsMode::Download => "download",
157                },
158            }),
159        );
160    }
161    if options.dry_run {
162        return create_dry_run_outcome(&context, plans);
163    }
164
165    let mut entity_outcomes = Vec::new();
166    let mut abort_run = false;
167
168    for plan in plans {
169        observer.on_event(RunEvent::EntityStarted {
170            run_id: context.run_id.clone(),
171            name: plan.entity.name.clone(),
172            ts_ms: event_time_ms(),
173        });
174        let EntityRunResult {
175            outcome,
176            abort_run: aborted,
177        } = run_entity(&context, runtime, observer, plan)?;
178        let report = &outcome.report;
179        let (mut status, _) = report::compute_run_outcome(
180            &report
181                .files
182                .iter()
183                .map(|file| file.status)
184                .collect::<Vec<_>>(),
185        );
186        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
187            status = report::RunStatus::SuccessWithWarnings;
188        }
189        observer.on_event(RunEvent::EntityFinished {
190            run_id: context.run_id.clone(),
191            name: report.entity.name.clone(),
192            status: run_status_str(status).to_string(),
193            files: report.results.files_total,
194            rows: report.results.rows_total,
195            accepted: report.results.accepted_total,
196            rejected: report.results.rejected_total,
197            warnings: report.results.warnings_total,
198            errors: report.results.errors_total,
199            ts_ms: event_time_ms(),
200        });
201        entity_outcomes.push(outcome);
202        abort_run = abort_run || aborted;
203        if abort_run {
204            break;
205        }
206    }
207    let summary = build_run_summary(&context, &entity_outcomes);
208    if let Some(report_target) = &context.report_target {
209        let summary_write_start = perf_enabled.then(Instant::now);
210        write_summary_report(
211            report_target,
212            &context.run_id,
213            &summary,
214            runtime.storage(),
215            &context.storage_resolver,
216        )?;
217        if let Some(start) = summary_write_start {
218            perf::emit_perf_log(
219                observer,
220                &context.run_id,
221                None,
222                "perf_run_phase_timings",
223                json!({
224                    "phase": "write_summary_report",
225                    "elapsed_ms": start.elapsed().as_millis() as u64,
226                    "entity_count": entity_outcomes.len(),
227                }),
228            );
229        }
230    }
231    observer.on_event(RunEvent::RunFinished {
232        run_id: context.run_id.clone(),
233        status: run_status_str(summary.run.status).to_string(),
234        exit_code: summary.run.exit_code,
235        files: summary.results.files_total,
236        rows: summary.results.rows_total,
237        accepted: summary.results.accepted_total,
238        rejected: summary.results.rejected_total,
239        warnings: summary.results.warnings_total,
240        errors: summary.results.errors_total,
241        summary_uri: context.report_target.as_ref().map(|target| {
242            target.join_relative(&report::ReportWriter::summary_relative_path(
243                &context.run_id,
244            ))
245        }),
246        ts_ms: event_time_ms(),
247    });
248
249    Ok(RunOutcome {
250        run_id: context.run_id.clone(),
251        report_base_path: context.report_base_path.clone(),
252        entity_outcomes,
253        summary,
254        dry_run_previews: None,
255    })
256}
257
258fn init_thread_pool() {
259    static INIT: Once = Once::new();
260    INIT.call_once(|| {
261        if std::env::var("RAYON_NUM_THREADS").is_ok() {
262            return;
263        }
264        let cap = std::env::var("FLOE_MAX_THREADS")
265            .ok()
266            .and_then(|value| value.parse::<usize>().ok())
267            .unwrap_or(4);
268        let available = std::thread::available_parallelism()
269            .map(|value| value.get())
270            .unwrap_or(1);
271        let threads = available.min(cap).max(1);
272        let _ = rayon::ThreadPoolBuilder::new()
273            .num_threads(threads)
274            .build_global();
275    });
276}
277
278fn select_entities<'a>(
279    context: &'a RunContext,
280    options: &RunOptions,
281) -> Vec<&'a config::EntityConfig> {
282    if options.entities.is_empty() {
283        context.config.entities.iter().collect()
284    } else {
285        let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
286        context
287            .config
288            .entities
289            .iter()
290            .filter(|entity| selected.contains(entity.name.as_str()))
291            .collect()
292    }
293}
294
295fn resolve_entity_plans<'a>(
296    context: &'a RunContext,
297    runtime: &mut dyn Runtime,
298    entities: &[&'a config::EntityConfig],
299    resolution_mode: io::storage::inputs::ResolveInputsMode,
300) -> FloeResult<Vec<EntityRunPlan<'a>>> {
301    let mut plans = Vec::with_capacity(entities.len());
302    for entity in entities {
303        let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
304        let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
305        let needs_temp = matches!(
306            resolution_mode,
307            io::storage::inputs::ResolveInputsMode::Download
308        ) && (resolved_targets.source.is_remote()
309            || resolved_targets.accepted.is_remote()
310            || resolved_targets
311                .rejected
312                .as_ref()
313                .is_some_and(io::storage::Target::is_remote));
314        let temp_dir = if needs_temp {
315            Some(
316                tempfile::TempDir::new()
317                    .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
318            )
319        } else {
320            None
321        };
322        let storage_client = Some(runtime.storage().client_for(
323            &context.storage_resolver,
324            resolved_targets.source.storage(),
325            entity,
326        )? as &dyn io::storage::StorageClient);
327        let resolved_inputs = io::storage::ops::resolve_inputs(
328            &context.config_dir,
329            entity,
330            input_adapter,
331            &resolved_targets.source,
332            resolution_mode,
333            temp_dir.as_ref().map(|dir| dir.path()),
334            storage_client,
335        )?;
336        plans.push(EntityRunPlan {
337            entity,
338            resolved_targets,
339            resolved_inputs,
340            temp_dir,
341        });
342    }
343    Ok(plans)
344}
345
346fn build_run_summary(
347    context: &RunContext,
348    entity_outcomes: &[EntityOutcome],
349) -> report::RunSummaryReport {
350    let mut totals = report::ResultsTotals {
351        files_total: 0,
352        rows_total: 0,
353        accepted_total: 0,
354        rejected_total: 0,
355        warnings_total: 0,
356        errors_total: 0,
357    };
358    let mut statuses = Vec::new();
359    let mut entities = Vec::with_capacity(entity_outcomes.len());
360
361    for outcome in entity_outcomes {
362        let report = &outcome.report;
363        totals.files_total += report.results.files_total;
364        totals.rows_total += report.results.rows_total;
365        totals.accepted_total += report.results.accepted_total;
366        totals.rejected_total += report.results.rejected_total;
367        totals.warnings_total += report.results.warnings_total;
368        totals.errors_total += report.results.errors_total;
369
370        let file_statuses = report
371            .files
372            .iter()
373            .map(|file| file.status)
374            .collect::<Vec<_>>();
375        let (mut status, _) = report::compute_run_outcome(&file_statuses);
376        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
377            status = report::RunStatus::SuccessWithWarnings;
378        }
379        statuses.extend(file_statuses);
380
381        let report_file = context
382            .report_target
383            .as_ref()
384            .map(|target| {
385                target.join_relative(&report::ReportWriter::report_relative_path(
386                    &context.run_id,
387                    &report.entity.name,
388                ))
389            })
390            .unwrap_or_else(|| "disabled".to_string());
391        entities.push(report::EntitySummary {
392            name: report.entity.name.clone(),
393            status,
394            results: report.results.clone(),
395            report_file,
396        });
397    }
398
399    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
400    if status == report::RunStatus::Success && totals.warnings_total > 0 {
401        status = report::RunStatus::SuccessWithWarnings;
402    }
403
404    let finished_at = report::now_rfc3339();
405    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
406    let report_base_path = context
407        .report_base_path
408        .clone()
409        .unwrap_or_else(|| "disabled".to_string());
410    let report_file = context
411        .report_target
412        .as_ref()
413        .map(|target| {
414            target.join_relative(&report::ReportWriter::summary_relative_path(
415                &context.run_id,
416            ))
417        })
418        .unwrap_or_else(|| "disabled".to_string());
419
420    report::RunSummaryReport {
421        spec_version: context.config.version.clone(),
422        tool: report::ToolInfo {
423            name: "floe".to_string(),
424            version: env!("CARGO_PKG_VERSION").to_string(),
425            git: None,
426        },
427        run: report::RunInfo {
428            run_id: context.run_id.clone(),
429            started_at: context.started_at.clone(),
430            finished_at,
431            duration_ms,
432            status,
433            exit_code,
434        },
435        config: report::ConfigEcho {
436            path: context.config_path.display().to_string(),
437            version: context.config.version.clone(),
438            metadata: context.config.metadata.as_ref().map(project_metadata_json),
439        },
440        report: report::ReportEcho {
441            path: report_base_path,
442            report_file,
443        },
444        results: totals,
445        entities,
446    }
447}
448
449fn create_dry_run_outcome(
450    context: &RunContext,
451    plans: Vec<EntityRunPlan<'_>>,
452) -> FloeResult<RunOutcome> {
453    let mut previews: Vec<DryRunEntityPreview> = Vec::new();
454
455    for plan in plans {
456        let entity = plan.entity;
457        let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
458        let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
459        let (archive_path, archive_storage) = entity
460            .sink
461            .archive
462            .as_ref()
463            .map(|a| (a.path.clone(), a.storage.clone()))
464            .unwrap_or_else(|| (String::new(), None));
465
466        let report_file = context.report_target.as_ref().map(|target| {
467            target.join_relative(&report::ReportWriter::report_relative_path(
468                &context.run_id,
469                &entity.name,
470            ))
471        });
472
473        previews.push(DryRunEntityPreview {
474            name: entity.name.clone(),
475            input_path: entity.source.path.clone(),
476            input_format: entity.source.format.clone(),
477            accepted_path: entity.sink.accepted.path.clone(),
478            accepted_format: entity.sink.accepted.format.clone(),
479            rejected_path,
480            rejected_format,
481            archive_path,
482            archive_storage,
483            report_file,
484            scanned_files: plan.resolved_inputs.listed,
485        });
486    }
487
488    Ok(RunOutcome {
489        run_id: context.run_id.clone(),
490        report_base_path: context.report_base_path.clone(),
491        entity_outcomes: Vec::new(),
492        summary: report::RunSummaryReport {
493            spec_version: context.config.version.clone(),
494            tool: report::ToolInfo {
495                name: "floe".to_string(),
496                version: env!("CARGO_PKG_VERSION").to_string(),
497                git: None,
498            },
499            run: report::RunInfo {
500                run_id: context.run_id.clone(),
501                started_at: context.started_at.clone(),
502                finished_at: report::now_rfc3339(),
503                duration_ms: 0,
504                status: report::RunStatus::Success,
505                exit_code: 0,
506            },
507            config: report::ConfigEcho {
508                path: context.config_path.display().to_string(),
509                version: context.config.version.clone(),
510                metadata: context.config.metadata.as_ref().map(project_metadata_json),
511            },
512            report: report::ReportEcho {
513                path: context
514                    .report_base_path
515                    .clone()
516                    .unwrap_or_else(|| "disabled".to_string()),
517                report_file: "disabled (dry-run)".to_string(),
518            },
519            results: report::ResultsTotals {
520                files_total: 0,
521                rows_total: 0,
522                accepted_total: 0,
523                rejected_total: 0,
524                warnings_total: 0,
525                errors_total: 0,
526            },
527            entities: Vec::new(),
528        },
529        dry_run_previews: Some(previews),
530    })
531}
532
533fn run_status_str(status: report::RunStatus) -> &'static str {
534    match status {
535        report::RunStatus::Success => "success",
536        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
537        report::RunStatus::Rejected => "rejected",
538        report::RunStatus::Aborted => "aborted",
539        report::RunStatus::Failed => "failed",
540    }
541}