Skip to main content

floe_core/run/
mod.rs

1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4
5use crate::io::storage::CloudClient;
6use crate::report::build::project_metadata_json;
7use crate::report::output::write_summary_report;
8use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
9
10mod context;
11pub(crate) mod entity;
12pub mod events;
13mod file;
14pub mod normalize;
15mod output;
16
17pub(crate) use context::RunContext;
18use entity::{run_entity, EntityRunResult};
19use events::{default_observer, event_time_ms, RunEvent};
20
21pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
22
23#[derive(Debug, Clone)]
24pub struct RunOutcome {
25    pub run_id: String,
26    pub report_base_path: Option<String>,
27    pub entity_outcomes: Vec<EntityOutcome>,
28    pub summary: report::RunSummaryReport,
29}
30
31#[derive(Debug, Clone)]
32pub struct EntityOutcome {
33    pub report: crate::report::RunReport,
34    pub file_timings_ms: Vec<Option<u64>>,
35}
36
37pub(crate) fn validate_entities(
38    config: &config::RootConfig,
39    selected: &[String],
40) -> FloeResult<()> {
41    let missing: Vec<String> = selected
42        .iter()
43        .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
44        .cloned()
45        .collect();
46
47    if !missing.is_empty() {
48        return Err(Box::new(ConfigError(format!(
49            "entities not found: {}",
50            missing.join(", ")
51        ))));
52    }
53    Ok(())
54}
55
56pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
57    let config_base = config::ConfigBase::local_from_path(config_path);
58    run_with_base(config_path, config_base, options)
59}
60
61pub fn run_with_base(
62    config_path: &Path,
63    config_base: config::ConfigBase,
64    options: RunOptions,
65) -> FloeResult<RunOutcome> {
66    init_thread_pool();
67    let validate_options = ValidateOptions {
68        entities: options.entities.clone(),
69    };
70    crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
71
72    let context = RunContext::new(config_path, config_base, &options)?;
73    if !options.entities.is_empty() {
74        validate_entities(&context.config, &options.entities)?;
75    }
76
77    let mut entity_outcomes = Vec::new();
78    let mut abort_run = false;
79    let mut cloud = CloudClient::new();
80    let observer = default_observer();
81    observer.on_event(RunEvent::RunStarted {
82        run_id: context.run_id.clone(),
83        config: context.config_path.display().to_string(),
84        report_base: context.report_base_path.clone(),
85        ts_ms: event_time_ms(),
86    });
87
88    let selected_entities: Vec<&config::EntityConfig> = if options.entities.is_empty() {
89        context.config.entities.iter().collect()
90    } else {
91        let selected: HashSet<&str> = options.entities.iter().map(|s| s.as_str()).collect();
92        context
93            .config
94            .entities
95            .iter()
96            .filter(|entity| selected.contains(entity.name.as_str()))
97            .collect()
98    };
99
100    for entity in selected_entities {
101        observer.on_event(RunEvent::EntityStarted {
102            run_id: context.run_id.clone(),
103            name: entity.name.clone(),
104            ts_ms: event_time_ms(),
105        });
106        let EntityRunResult {
107            outcome,
108            abort_run: aborted,
109        } = run_entity(&context, &mut cloud, observer, entity)?;
110        let report = &outcome.report;
111        let (mut status, _) = report::compute_run_outcome(
112            &report
113                .files
114                .iter()
115                .map(|file| file.status)
116                .collect::<Vec<_>>(),
117        );
118        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
119            status = report::RunStatus::SuccessWithWarnings;
120        }
121        observer.on_event(RunEvent::EntityFinished {
122            run_id: context.run_id.clone(),
123            name: entity.name.clone(),
124            status: run_status_str(status).to_string(),
125            files: report.results.files_total,
126            rows: report.results.rows_total,
127            accepted: report.results.accepted_total,
128            rejected: report.results.rejected_total,
129            warnings: report.results.warnings_total,
130            errors: report.results.errors_total,
131            ts_ms: event_time_ms(),
132        });
133        entity_outcomes.push(outcome);
134        abort_run = abort_run || aborted;
135        if abort_run {
136            break;
137        }
138    }
139    let summary = build_run_summary(&context, &entity_outcomes);
140    if let Some(report_target) = &context.report_target {
141        write_summary_report(
142            report_target,
143            &context.run_id,
144            &summary,
145            &mut cloud,
146            &context.storage_resolver,
147        )?;
148    }
149    observer.on_event(RunEvent::RunFinished {
150        run_id: context.run_id.clone(),
151        status: run_status_str(summary.run.status).to_string(),
152        exit_code: summary.run.exit_code,
153        files: summary.results.files_total,
154        rows: summary.results.rows_total,
155        accepted: summary.results.accepted_total,
156        rejected: summary.results.rejected_total,
157        warnings: summary.results.warnings_total,
158        errors: summary.results.errors_total,
159        summary_uri: context.report_target.as_ref().map(|target| {
160            target.join_relative(&report::ReportWriter::summary_relative_path(
161                &context.run_id,
162            ))
163        }),
164        ts_ms: event_time_ms(),
165    });
166
167    Ok(RunOutcome {
168        run_id: context.run_id.clone(),
169        report_base_path: context.report_base_path.clone(),
170        entity_outcomes,
171        summary,
172    })
173}
174
175fn init_thread_pool() {
176    static INIT: Once = Once::new();
177    INIT.call_once(|| {
178        if std::env::var("RAYON_NUM_THREADS").is_ok() {
179            return;
180        }
181        let cap = std::env::var("FLOE_MAX_THREADS")
182            .ok()
183            .and_then(|value| value.parse::<usize>().ok())
184            .unwrap_or(4);
185        let available = std::thread::available_parallelism()
186            .map(|value| value.get())
187            .unwrap_or(1);
188        let threads = available.min(cap).max(1);
189        let _ = rayon::ThreadPoolBuilder::new()
190            .num_threads(threads)
191            .build_global();
192    });
193}
194
195fn build_run_summary(
196    context: &RunContext,
197    entity_outcomes: &[EntityOutcome],
198) -> report::RunSummaryReport {
199    let mut totals = report::ResultsTotals {
200        files_total: 0,
201        rows_total: 0,
202        accepted_total: 0,
203        rejected_total: 0,
204        warnings_total: 0,
205        errors_total: 0,
206    };
207    let mut statuses = Vec::new();
208    let mut entities = Vec::with_capacity(entity_outcomes.len());
209
210    for outcome in entity_outcomes {
211        let report = &outcome.report;
212        totals.files_total += report.results.files_total;
213        totals.rows_total += report.results.rows_total;
214        totals.accepted_total += report.results.accepted_total;
215        totals.rejected_total += report.results.rejected_total;
216        totals.warnings_total += report.results.warnings_total;
217        totals.errors_total += report.results.errors_total;
218
219        let file_statuses = report
220            .files
221            .iter()
222            .map(|file| file.status)
223            .collect::<Vec<_>>();
224        let (mut status, _) = report::compute_run_outcome(&file_statuses);
225        if status == report::RunStatus::Success && report.results.warnings_total > 0 {
226            status = report::RunStatus::SuccessWithWarnings;
227        }
228        statuses.extend(file_statuses);
229
230        let report_file = context
231            .report_target
232            .as_ref()
233            .map(|target| {
234                target.join_relative(&report::ReportWriter::report_relative_path(
235                    &context.run_id,
236                    &report.entity.name,
237                ))
238            })
239            .unwrap_or_else(|| "disabled".to_string());
240        entities.push(report::EntitySummary {
241            name: report.entity.name.clone(),
242            status,
243            results: report.results.clone(),
244            report_file,
245        });
246    }
247
248    let (mut status, exit_code) = report::compute_run_outcome(&statuses);
249    if status == report::RunStatus::Success && totals.warnings_total > 0 {
250        status = report::RunStatus::SuccessWithWarnings;
251    }
252
253    let finished_at = report::now_rfc3339();
254    let duration_ms = context.run_timer.elapsed().as_millis() as u64;
255    let report_base_path = context
256        .report_base_path
257        .clone()
258        .unwrap_or_else(|| "disabled".to_string());
259    let report_file = context
260        .report_target
261        .as_ref()
262        .map(|target| {
263            target.join_relative(&report::ReportWriter::summary_relative_path(
264                &context.run_id,
265            ))
266        })
267        .unwrap_or_else(|| "disabled".to_string());
268
269    report::RunSummaryReport {
270        spec_version: context.config.version.clone(),
271        tool: report::ToolInfo {
272            name: "floe".to_string(),
273            version: env!("CARGO_PKG_VERSION").to_string(),
274            git: None,
275        },
276        run: report::RunInfo {
277            run_id: context.run_id.clone(),
278            started_at: context.started_at.clone(),
279            finished_at,
280            duration_ms,
281            status,
282            exit_code,
283        },
284        config: report::ConfigEcho {
285            path: context.config_path.display().to_string(),
286            version: context.config.version.clone(),
287            metadata: context.config.metadata.as_ref().map(project_metadata_json),
288        },
289        report: report::ReportEcho {
290            path: report_base_path,
291            report_file,
292        },
293        results: totals,
294        entities,
295    }
296}
297
298fn run_status_str(status: report::RunStatus) -> &'static str {
299    match status {
300        report::RunStatus::Success => "success",
301        report::RunStatus::SuccessWithWarnings => "success_with_warnings",
302        report::RunStatus::Rejected => "rejected",
303        report::RunStatus::Aborted => "aborted",
304        report::RunStatus::Failed => "failed",
305    }
306}