Skip to main content

floe_core/run/
mod.rs

1use std::path::Path;
2use std::sync::Once;
3
4use crate::io::storage::CloudClient;
5use crate::report::build::project_metadata_json;
6use crate::report::output::write_summary_report;
7use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
8
9mod context;
10pub(crate) mod entity;
11mod events;
12mod file;
13pub mod normalize;
14mod output;
15
16pub(crate) use context::RunContext;
17use entity::{run_entity, EntityRunResult};
18use events::{default_observer, RunEvent};
19
20pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
21
22#[derive(Debug, Clone)]
23pub struct RunOutcome {
24    pub run_id: String,
25    pub report_base_path: Option<String>,
26    pub entity_outcomes: Vec<EntityOutcome>,
27    pub summary: report::RunSummaryReport,
28}
29
30#[derive(Debug, Clone)]
31pub struct EntityOutcome {
32    pub report: crate::report::RunReport,
33    pub file_timings_ms: Vec<Option<u64>>,
34}
35
36pub(crate) fn validate_entities(
37    config: &config::RootConfig,
38    selected: &[String],
39) -> FloeResult<()> {
40    let missing: Vec<String> = selected
41        .iter()
42        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
43        .cloned()
44        .collect();
45
46    if !missing.is_empty() {
47        return Err(Box::new(ConfigError(format!(
48            "entities not found: {}",
49            missing.join(", ")
50        ))));
51    }
52    Ok(())
53}
54
55pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
56    let config_base = config::ConfigBase::local_from_path(config_path);
57    run_with_base(config_path, config_base, options)
58}
59
60pub fn run_with_base(
61    config_path: &Path,
62    config_base: config::ConfigBase,
63    options: RunOptions,
64) -> FloeResult<RunOutcome> {
65    init_thread_pool();
66    let validate_options = ValidateOptions {
67        entities: options.entities.clone(),
68    };
69    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
70
71    let context = RunContext::new(config_path, config_base, &options)?;
72    if !options.entities.is_empty() {
73        validate_entities(&context.config, &options.entities)?;
74    }
75
76    let mut entity_outcomes = Vec::new();
77    let mut abort_run = false;
78    let mut cloud = CloudClient::new();
79    let observer = default_observer();
80    observer.on_event(RunEvent::RunStarted {
81        run_id: context.run_id.clone(),
82    });
83    for entity in &context.config.entities {
84        observer.on_event(RunEvent::EntityStarted {
85            name: entity.name.clone(),
86        });
87        let EntityRunResult {
88            outcome,
89            abort_run: aborted,
90        } = run_entity(&context, &mut cloud, entity)?;
91        observer.on_event(RunEvent::EntityFinished {
92            name: entity.name.clone(),
93        });
94        entity_outcomes.push(outcome);
95        abort_run = abort_run || aborted;
96        if abort_run {
97            break;
98        }
99    }
100    observer.on_event(RunEvent::RunFinished {
101        run_id: context.run_id.clone(),
102    });
103
104    let summary = build_run_summary(&context, &entity_outcomes);
105    if let Some(report_target) = &context.report_target {
106        write_summary_report(
107            report_target,
108            &context.run_id,
109            &summary,
110            &mut cloud,
111            &context.storage_resolver,
112        )?;
113    }
114
115    Ok(RunOutcome {
116        run_id: context.run_id.clone(),
117        report_base_path: context.report_base_path.clone(),
118        entity_outcomes,
119        summary,
120    })
121}
122
123fn init_thread_pool() {
124    static INIT: Once = Once::new();
125    INIT.call_once(|| {
126        if std::env::var("RAYON_NUM_THREADS").is_ok() {
127            return;
128        }
129        let cap = std::env::var("FLOE_MAX_THREADS")
130            .ok()
131            .and_then(|value| value.parse::<usize>().ok())
132            .unwrap_or(4);
133        let available = std::thread::available_parallelism()
134            .map(|value| value.get())
135            .unwrap_or(1);
136        let threads = available.min(cap).max(1);
137        let _ = rayon::ThreadPoolBuilder::new()
138            .num_threads(threads)
139            .build_global();
140    });
141}
142
143fn build_run_summary(
144    context: &RunContext,
145    entity_outcomes: &[EntityOutcome],
146) -> report::RunSummaryReport {
147    let mut totals = report::ResultsTotals {
148        files_total: 0,
149        rows_total: 0,
150        accepted_total: 0,
151        rejected_total: 0,
152        warnings_total: 0,
153        errors_total: 0,
154    };
155    let mut statuses = Vec::new();
156    let mut entities = Vec::with_capacity(entity_outcomes.len());
157
158    for outcome in entity_outcomes {
159        let report = &outcome.report;
160        totals.files_total += report.results.files_total;
161        totals.rows_total += report.results.rows_total;
162        totals.accepted_total += report.results.accepted_total;
163        totals.rejected_total += report.results.rejected_total;
164        totals.warnings_total += report.results.warnings_total;
165        totals.errors_total += report.results.errors_total;
166
167        let file_statuses = report
168            .files
169            .iter()
170            .map(|file| file.status)
171            .collect::<Vec<_>>();
172        let (mut status, _) = report::compute_run_outcome(&file_statuses);
173        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
174            status = report::RunStatus::SuccessWithWarnings;
175        }
176        statuses.extend(file_statuses);
177
178        let report_file = context
179            .report_target
180            .as_ref()
181            .map(|target| {
182                target.join_relative(&report::ReportWriter::report_relative_path(
183                    &context.run_id,
184                    &report.entity.name,
185                ))
186            })
187            .unwrap_or_else(|| "disabled".to_string());
188        entities.push(report::EntitySummary {
189            name: report.entity.name.clone(),
190            status,
191            results: report.results.clone(),
192            report_file,
193        });
194    }
195
196    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
197    if status == report::RunStatus::Success && totals.warnings_total > 0 {
198        status = report::RunStatus::SuccessWithWarnings;
199    }
200
201    let finished_at = report::now_rfc3339();
202    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
203    let report_base_path = context
204        .report_base_path
205        .clone()
206        .unwrap_or_else(|| "disabled".to_string());
207    let report_file = context
208        .report_target
209        .as_ref()
210        .map(|target| {
211            target.join_relative(&report::ReportWriter::summary_relative_path(
212                &context.run_id,
213            ))
214        })
215        .unwrap_or_else(|| "disabled".to_string());
216
217    report::RunSummaryReport {
218        spec_version: context.config.version.clone(),
219        tool: report::ToolInfo {
220            name: "floe".to_string(),
221            version: env!("CARGO_PKG_VERSION").to_string(),
222            git: None,
223        },
224        run: report::RunInfo {
225            run_id: context.run_id.clone(),
226            started_at: context.started_at.clone(),
227            finished_at,
228            duration_ms,
229            status,
230            exit_code,
231        },
232        config: report::ConfigEcho {
233            path: context.config_path.display().to_string(),
234            version: context.config.version.clone(),
235            metadata: context.config.metadata.as_ref().map(project_metadata_json),
236        },
237        report: report::ReportEcho {
238            path: report_base_path,
239            report_file,
240        },
241        results: totals,
242        entities,
243    }
244}