Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4
5use crate::io::storage::CloudClient;
6use crate::report::build::project_metadata_json;
7use crate::report::output::write_summary_report;
8use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
9
10mod context;
11pub(crate) mod entity;
12pub mod events;
13mod file;
14mod output;
15
16pub(crate) use context::RunContext;
17use entity::{run_entity, EntityRunResult};
18use events::{default_observer, event_time_ms, RunEvent};
19
20pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
21
22#[derive(Debug, Clone)]
23pub struct RunOutcome {
24    pub run_id: String,
25    pub report_base_path: Option<String>,
26    pub entity_outcomes: Vec<EntityOutcome>,
27    pub summary: report::RunSummaryReport,
28}
29
30#[derive(Debug, Clone)]
31pub struct EntityOutcome {
32    pub report: crate::report::RunReport,
33    pub file_timings_ms: Vec<Option<u64>>,
34}
35
36pub(crate) fn validate_entities(
37    config: &config::RootConfig,
38    selected: &[String],
39) -> FloeResult<()> {
40    let missing: Vec<String> = selected
41        .iter()
42        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
43        .cloned()
44        .collect();
45
46    if !missing.is_empty() {
47        return Err(Box::new(ConfigError(format!(
48            "entities not found: {}",
49            missing.join(", ")
50        ))));
51    }
52    Ok(())
53}
54
55pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
56    let config_base = config::ConfigBase::local_from_path(config_path);
57    run_with_base(config_path, config_base, options)
58}
59
60pub fn run_with_base(
61    config_path: &Path,
62    config_base: config::ConfigBase,
63    options: RunOptions,
64) -> FloeResult<RunOutcome> {
65    init_thread_pool();
66    let validate_options = ValidateOptions {
67        entities: options.entities.clone(),
68    };
69    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
70
71    let context = RunContext::new(config_path, config_base, &options)?;
72    if !options.entities.is_empty() {
73        validate_entities(&context.config, &options.entities)?;
74    }
75
76    let mut entity_outcomes = Vec::new();
77    let mut abort_run = false;
78    let mut cloud = CloudClient::new();
79    let observer = default_observer();
80    observer.on_event(RunEvent::RunStarted {
81        run_id: context.run_id.clone(),
82        config: context.config_path.display().to_string(),
83        report_base: context.report_base_path.clone(),
84        ts_ms: event_time_ms(),
85    });
86
87    let selected_entities: Vec<&config::EntityConfig> = if options.entities.is_empty() {
88        context.config.entities.iter().collect()
89    } else {
90        let selected: HashSet<&str> = options.entities.iter().map(|s| s.as_str()).collect();
91        context
92            .config
93            .entities
94            .iter()
95            .filter(|entity| selected.contains(entity.name.as_str()))
96            .collect()
97    };
98
99    for entity in selected_entities {
100        observer.on_event(RunEvent::EntityStarted {
101            run_id: context.run_id.clone(),
102            name: entity.name.clone(),
103            ts_ms: event_time_ms(),
104        });
105        let EntityRunResult {
106            outcome,
107            abort_run: aborted,
108        } = run_entity(&context, &mut cloud, observer, entity)?;
109        let report = &outcome.report;
110        let (mut status, _) = report::compute_run_outcome(
111            &report
112                .files
113                .iter()
114                .map(|file| file.status)
115                .collect::<Vec<_>>(),
116        );
117        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
118            status = report::RunStatus::SuccessWithWarnings;
119        }
120        observer.on_event(RunEvent::EntityFinished {
121            run_id: context.run_id.clone(),
122            name: entity.name.clone(),
123            status: run_status_str(status).to_string(),
124            files: report.results.files_total,
125            rows: report.results.rows_total,
126            accepted: report.results.accepted_total,
127            rejected: report.results.rejected_total,
128            warnings: report.results.warnings_total,
129            errors: report.results.errors_total,
130            ts_ms: event_time_ms(),
131        });
132        entity_outcomes.push(outcome);
133        abort_run = abort_run || aborted;
134        if abort_run {
135            break;
136        }
137    }
138    let summary = build_run_summary(&context, &entity_outcomes);
139    if let Some(report_target) = &context.report_target {
140        write_summary_report(
141            report_target,
142            &context.run_id,
143            &summary,
144            &mut cloud,
145            &context.storage_resolver,
146        )?;
147    }
148    observer.on_event(RunEvent::RunFinished {
149        run_id: context.run_id.clone(),
150        status: run_status_str(summary.run.status).to_string(),
151        exit_code: summary.run.exit_code,
152        files: summary.results.files_total,
153        rows: summary.results.rows_total,
154        accepted: summary.results.accepted_total,
155        rejected: summary.results.rejected_total,
156        warnings: summary.results.warnings_total,
157        errors: summary.results.errors_total,
158        summary_uri: context.report_target.as_ref().map(|target| {
159            target.join_relative(&report::ReportWriter::summary_relative_path(
160                &context.run_id,
161            ))
162        }),
163        ts_ms: event_time_ms(),
164    });
165
166    Ok(RunOutcome {
167        run_id: context.run_id.clone(),
168        report_base_path: context.report_base_path.clone(),
169        entity_outcomes,
170        summary,
171    })
172}
173
174fn init_thread_pool() {
175    static INIT: Once = Once::new();
176    INIT.call_once(|| {
177        if std::env::var("RAYON_NUM_THREADS").is_ok() {
178            return;
179        }
180        let cap = std::env::var("FLOE_MAX_THREADS")
181            .ok()
182            .and_then(|value| value.parse::<usize>().ok())
183            .unwrap_or(4);
184        let available = std::thread::available_parallelism()
185            .map(|value| value.get())
186            .unwrap_or(1);
187        let threads = available.min(cap).max(1);
188        let _ = rayon::ThreadPoolBuilder::new()
189            .num_threads(threads)
190            .build_global();
191    });
192}
193
194fn build_run_summary(
195    context: &RunContext,
196    entity_outcomes: &[EntityOutcome],
197) -> report::RunSummaryReport {
198    let mut totals = report::ResultsTotals {
199        files_total: 0,
200        rows_total: 0,
201        accepted_total: 0,
202        rejected_total: 0,
203        warnings_total: 0,
204        errors_total: 0,
205    };
206    let mut statuses = Vec::new();
207    let mut entities = Vec::with_capacity(entity_outcomes.len());
208
209    for outcome in entity_outcomes {
210        let report = &outcome.report;
211        totals.files_total += report.results.files_total;
212        totals.rows_total += report.results.rows_total;
213        totals.accepted_total += report.results.accepted_total;
214        totals.rejected_total += report.results.rejected_total;
215        totals.warnings_total += report.results.warnings_total;
216        totals.errors_total += report.results.errors_total;
217
218        let file_statuses = report
219            .files
220            .iter()
221            .map(|file| file.status)
222            .collect::<Vec<_>>();
223        let (mut status, _) = report::compute_run_outcome(&file_statuses);
224        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
225            status = report::RunStatus::SuccessWithWarnings;
226        }
227        statuses.extend(file_statuses);
228
229        let report_file = context
230            .report_target
231            .as_ref()
232            .map(|target| {
233                target.join_relative(&report::ReportWriter::report_relative_path(
234                    &context.run_id,
235                    &report.entity.name,
236                ))
237            })
238            .unwrap_or_else(|| "disabled".to_string());
239        entities.push(report::EntitySummary {
240            name: report.entity.name.clone(),
241            status,
242            results: report.results.clone(),
243            report_file,
244        });
245    }
246
247    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
248    if status == report::RunStatus::Success && totals.warnings_total > 0 {
249        status = report::RunStatus::SuccessWithWarnings;
250    }
251
252    let finished_at = report::now_rfc3339();
253    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
254    let report_base_path = context
255        .report_base_path
256        .clone()
257        .unwrap_or_else(|| "disabled".to_string());
258    let report_file = context
259        .report_target
260        .as_ref()
261        .map(|target| {
262            target.join_relative(&report::ReportWriter::summary_relative_path(
263                &context.run_id,
264            ))
265        })
266        .unwrap_or_else(|| "disabled".to_string());
267
268    report::RunSummaryReport {
269        spec_version: context.config.version.clone(),
270        tool: report::ToolInfo {
271            name: "floe".to_string(),
272            version: env!("CARGO_PKG_VERSION").to_string(),
273            git: None,
274        },
275        run: report::RunInfo {
276            run_id: context.run_id.clone(),
277            started_at: context.started_at.clone(),
278            finished_at,
279            duration_ms,
280            status,
281            exit_code,
282        },
283        config: report::ConfigEcho {
284            path: context.config_path.display().to_string(),
285            version: context.config.version.clone(),
286            metadata: context.config.metadata.as_ref().map(project_metadata_json),
287        },
288        report: report::ReportEcho {
289            path: report_base_path,
290            report_file,
291        },
292        results: totals,
293        entities,
294    }
295}
296
297fn run_status_str(status: report::RunStatus) -> &'static str {
298    match status {
299        report::RunStatus::Success => "success",
300        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
301        report::RunStatus::Rejected => "rejected",
302        report::RunStatus::Aborted => "aborted",
303        report::RunStatus::Failed => "failed",
304    }
305}