1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4
5use crate::io::storage::CloudClient;
6use crate::report::build::project_metadata_json;
7use crate::report::output::write_summary_report;
8use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
9
10mod context;
11pub(crate) mod entity;
12pub mod events;
13mod file;
14pub mod normalize;
15mod output;
16
17pub(crate) use context::RunContext;
18use entity::{run_entity, EntityRunResult};
19use events::{default_observer, event_time_ms, RunEvent};
20
21pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
22
23#[derive(Debug, Clone)]
24pub struct RunOutcome {
25 pub run_id: String,
26 pub report_base_path: Option<String>,
27 pub entity_outcomes: Vec<EntityOutcome>,
28 pub summary: report::RunSummaryReport,
29}
30
31#[derive(Debug, Clone)]
32pub struct EntityOutcome {
33 pub report: crate::report::RunReport,
34 pub file_timings_ms: Vec<Option<u64>>,
35}
36
37pub(crate) fn validate_entities(
38 config: &config::RootConfig,
39 selected: &[String],
40) -> FloeResult<()> {
41 let missing: Vec<String> = selected
42 .iter()
43 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
44 .cloned()
45 .collect();
46
47 if !missing.is_empty() {
48 return Err(Box::new(ConfigError(format!(
49 "entities not found: {}",
50 missing.join(", ")
51 ))));
52 }
53 Ok(())
54}
55
56pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
57 let config_base = config::ConfigBase::local_from_path(config_path);
58 run_with_base(config_path, config_base, options)
59}
60
61pub fn run_with_base(
62 config_path: &Path,
63 config_base: config::ConfigBase,
64 options: RunOptions,
65) -> FloeResult<RunOutcome> {
66 init_thread_pool();
67 let validate_options = ValidateOptions {
68 entities: options.entities.clone(),
69 };
70 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
71
72 let context = RunContext::new(config_path, config_base, &options)?;
73 if !options.entities.is_empty() {
74 validate_entities(&context.config, &options.entities)?;
75 }
76
77 let mut entity_outcomes = Vec::new();
78 let mut abort_run = false;
79 let mut cloud = CloudClient::new();
80 let observer = default_observer();
81 observer.on_event(RunEvent::RunStarted {
82 run_id: context.run_id.clone(),
83 config: context.config_path.display().to_string(),
84 report_base: context.report_base_path.clone(),
85 ts_ms: event_time_ms(),
86 });
87
88 let selected_entities: Vec<&config::EntityConfig> = if options.entities.is_empty() {
89 context.config.entities.iter().collect()
90 } else {
91 let selected: HashSet<&str> = options.entities.iter().map(|s| s.as_str()).collect();
92 context
93 .config
94 .entities
95 .iter()
96 .filter(|entity| selected.contains(entity.name.as_str()))
97 .collect()
98 };
99
100 for entity in selected_entities {
101 observer.on_event(RunEvent::EntityStarted {
102 run_id: context.run_id.clone(),
103 name: entity.name.clone(),
104 ts_ms: event_time_ms(),
105 });
106 let EntityRunResult {
107 outcome,
108 abort_run: aborted,
109 } = run_entity(&context, &mut cloud, observer, entity)?;
110 let report = &outcome.report;
111 let (mut status, _) = report::compute_run_outcome(
112 &report
113 .files
114 .iter()
115 .map(|file| file.status)
116 .collect::<Vec<_>>(),
117 );
118 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
119 status = report::RunStatus::SuccessWithWarnings;
120 }
121 observer.on_event(RunEvent::EntityFinished {
122 run_id: context.run_id.clone(),
123 name: entity.name.clone(),
124 status: run_status_str(status).to_string(),
125 files: report.results.files_total,
126 rows: report.results.rows_total,
127 accepted: report.results.accepted_total,
128 rejected: report.results.rejected_total,
129 warnings: report.results.warnings_total,
130 errors: report.results.errors_total,
131 ts_ms: event_time_ms(),
132 });
133 entity_outcomes.push(outcome);
134 abort_run = abort_run || aborted;
135 if abort_run {
136 break;
137 }
138 }
139 let summary = build_run_summary(&context, &entity_outcomes);
140 if let Some(report_target) = &context.report_target {
141 write_summary_report(
142 report_target,
143 &context.run_id,
144 &summary,
145 &mut cloud,
146 &context.storage_resolver,
147 )?;
148 }
149 observer.on_event(RunEvent::RunFinished {
150 run_id: context.run_id.clone(),
151 status: run_status_str(summary.run.status).to_string(),
152 exit_code: summary.run.exit_code,
153 files: summary.results.files_total,
154 rows: summary.results.rows_total,
155 accepted: summary.results.accepted_total,
156 rejected: summary.results.rejected_total,
157 warnings: summary.results.warnings_total,
158 errors: summary.results.errors_total,
159 summary_uri: context.report_target.as_ref().map(|target| {
160 target.join_relative(&report::ReportWriter::summary_relative_path(
161 &context.run_id,
162 ))
163 }),
164 ts_ms: event_time_ms(),
165 });
166
167 Ok(RunOutcome {
168 run_id: context.run_id.clone(),
169 report_base_path: context.report_base_path.clone(),
170 entity_outcomes,
171 summary,
172 })
173}
174
175fn init_thread_pool() {
176 static INIT: Once = Once::new();
177 INIT.call_once(|| {
178 if std::env::var("RAYON_NUM_THREADS").is_ok() {
179 return;
180 }
181 let cap = std::env::var("FLOE_MAX_THREADS")
182 .ok()
183 .and_then(|value| value.parse::<usize>().ok())
184 .unwrap_or(4);
185 let available = std::thread::available_parallelism()
186 .map(|value| value.get())
187 .unwrap_or(1);
188 let threads = available.min(cap).max(1);
189 let _ = rayon::ThreadPoolBuilder::new()
190 .num_threads(threads)
191 .build_global();
192 });
193}
194
195fn build_run_summary(
196 context: &RunContext,
197 entity_outcomes: &[EntityOutcome],
198) -> report::RunSummaryReport {
199 let mut totals = report::ResultsTotals {
200 files_total: 0,
201 rows_total: 0,
202 accepted_total: 0,
203 rejected_total: 0,
204 warnings_total: 0,
205 errors_total: 0,
206 };
207 let mut statuses = Vec::new();
208 let mut entities = Vec::with_capacity(entity_outcomes.len());
209
210 for outcome in entity_outcomes {
211 let report = &outcome.report;
212 totals.files_total += report.results.files_total;
213 totals.rows_total += report.results.rows_total;
214 totals.accepted_total += report.results.accepted_total;
215 totals.rejected_total += report.results.rejected_total;
216 totals.warnings_total += report.results.warnings_total;
217 totals.errors_total += report.results.errors_total;
218
219 let file_statuses = report
220 .files
221 .iter()
222 .map(|file| file.status)
223 .collect::<Vec<_>>();
224 let (mut status, _) = report::compute_run_outcome(&file_statuses);
225 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
226 status = report::RunStatus::SuccessWithWarnings;
227 }
228 statuses.extend(file_statuses);
229
230 let report_file = context
231 .report_target
232 .as_ref()
233 .map(|target| {
234 target.join_relative(&report::ReportWriter::report_relative_path(
235 &context.run_id,
236 &report.entity.name,
237 ))
238 })
239 .unwrap_or_else(|| "disabled".to_string());
240 entities.push(report::EntitySummary {
241 name: report.entity.name.clone(),
242 status,
243 results: report.results.clone(),
244 report_file,
245 });
246 }
247
248 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
249 if status == report::RunStatus::Success && totals.warnings_total > 0 {
250 status = report::RunStatus::SuccessWithWarnings;
251 }
252
253 let finished_at = report::now_rfc3339();
254 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
255 let report_base_path = context
256 .report_base_path
257 .clone()
258 .unwrap_or_else(|| "disabled".to_string());
259 let report_file = context
260 .report_target
261 .as_ref()
262 .map(|target| {
263 target.join_relative(&report::ReportWriter::summary_relative_path(
264 &context.run_id,
265 ))
266 })
267 .unwrap_or_else(|| "disabled".to_string());
268
269 report::RunSummaryReport {
270 spec_version: context.config.version.clone(),
271 tool: report::ToolInfo {
272 name: "floe".to_string(),
273 version: env!("CARGO_PKG_VERSION").to_string(),
274 git: None,
275 },
276 run: report::RunInfo {
277 run_id: context.run_id.clone(),
278 started_at: context.started_at.clone(),
279 finished_at,
280 duration_ms,
281 status,
282 exit_code,
283 },
284 config: report::ConfigEcho {
285 path: context.config_path.display().to_string(),
286 version: context.config.version.clone(),
287 metadata: context.config.metadata.as_ref().map(project_metadata_json),
288 },
289 report: report::ReportEcho {
290 path: report_base_path,
291 report_file,
292 },
293 results: totals,
294 entities,
295 }
296}
297
298fn run_status_str(status: report::RunStatus) -> &'static str {
299 match status {
300 report::RunStatus::Success => "success",
301 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
302 report::RunStatus::Rejected => "rejected",
303 report::RunStatus::Aborted => "aborted",
304 report::RunStatus::Failed => "failed",
305 }
306}