1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4
5use crate::io::storage::CloudClient;
6use crate::report::build::project_metadata_json;
7use crate::report::output::write_summary_report;
8use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
9
10mod context;
11pub(crate) mod entity;
12pub mod events;
13mod file;
14mod output;
15
16pub(crate) use context::RunContext;
17use entity::{run_entity, EntityRunResult};
18use events::{default_observer, event_time_ms, RunEvent};
19
20pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
21
22#[derive(Debug, Clone)]
23pub struct RunOutcome {
24 pub run_id: String,
25 pub report_base_path: Option<String>,
26 pub entity_outcomes: Vec<EntityOutcome>,
27 pub summary: report::RunSummaryReport,
28}
29
30#[derive(Debug, Clone)]
31pub struct EntityOutcome {
32 pub report: crate::report::RunReport,
33 pub file_timings_ms: Vec<Option<u64>>,
34}
35
36pub(crate) fn validate_entities(
37 config: &config::RootConfig,
38 selected: &[String],
39) -> FloeResult<()> {
40 let missing: Vec<String> = selected
41 .iter()
42 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
43 .cloned()
44 .collect();
45
46 if !missing.is_empty() {
47 return Err(Box::new(ConfigError(format!(
48 "entities not found: {}",
49 missing.join(", ")
50 ))));
51 }
52 Ok(())
53}
54
55pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
56 let config_base = config::ConfigBase::local_from_path(config_path);
57 run_with_base(config_path, config_base, options)
58}
59
60pub fn run_with_base(
61 config_path: &Path,
62 config_base: config::ConfigBase,
63 options: RunOptions,
64) -> FloeResult<RunOutcome> {
65 init_thread_pool();
66 let validate_options = ValidateOptions {
67 entities: options.entities.clone(),
68 };
69 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
70
71 let context = RunContext::new(config_path, config_base, &options)?;
72 if !options.entities.is_empty() {
73 validate_entities(&context.config, &options.entities)?;
74 }
75
76 let mut entity_outcomes = Vec::new();
77 let mut abort_run = false;
78 let mut cloud = CloudClient::new();
79 let observer = default_observer();
80 observer.on_event(RunEvent::RunStarted {
81 run_id: context.run_id.clone(),
82 config: context.config_path.display().to_string(),
83 report_base: context.report_base_path.clone(),
84 ts_ms: event_time_ms(),
85 });
86
87 let selected_entities: Vec<&config::EntityConfig> = if options.entities.is_empty() {
88 context.config.entities.iter().collect()
89 } else {
90 let selected: HashSet<&str> = options.entities.iter().map(|s| s.as_str()).collect();
91 context
92 .config
93 .entities
94 .iter()
95 .filter(|entity| selected.contains(entity.name.as_str()))
96 .collect()
97 };
98
99 for entity in selected_entities {
100 observer.on_event(RunEvent::EntityStarted {
101 run_id: context.run_id.clone(),
102 name: entity.name.clone(),
103 ts_ms: event_time_ms(),
104 });
105 let EntityRunResult {
106 outcome,
107 abort_run: aborted,
108 } = run_entity(&context, &mut cloud, observer, entity)?;
109 let report = &outcome.report;
110 let (mut status, _) = report::compute_run_outcome(
111 &report
112 .files
113 .iter()
114 .map(|file| file.status)
115 .collect::<Vec<_>>(),
116 );
117 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
118 status = report::RunStatus::SuccessWithWarnings;
119 }
120 observer.on_event(RunEvent::EntityFinished {
121 run_id: context.run_id.clone(),
122 name: entity.name.clone(),
123 status: run_status_str(status).to_string(),
124 files: report.results.files_total,
125 rows: report.results.rows_total,
126 accepted: report.results.accepted_total,
127 rejected: report.results.rejected_total,
128 warnings: report.results.warnings_total,
129 errors: report.results.errors_total,
130 ts_ms: event_time_ms(),
131 });
132 entity_outcomes.push(outcome);
133 abort_run = abort_run || aborted;
134 if abort_run {
135 break;
136 }
137 }
138 let summary = build_run_summary(&context, &entity_outcomes);
139 if let Some(report_target) = &context.report_target {
140 write_summary_report(
141 report_target,
142 &context.run_id,
143 &summary,
144 &mut cloud,
145 &context.storage_resolver,
146 )?;
147 }
148 observer.on_event(RunEvent::RunFinished {
149 run_id: context.run_id.clone(),
150 status: run_status_str(summary.run.status).to_string(),
151 exit_code: summary.run.exit_code,
152 files: summary.results.files_total,
153 rows: summary.results.rows_total,
154 accepted: summary.results.accepted_total,
155 rejected: summary.results.rejected_total,
156 warnings: summary.results.warnings_total,
157 errors: summary.results.errors_total,
158 summary_uri: context.report_target.as_ref().map(|target| {
159 target.join_relative(&report::ReportWriter::summary_relative_path(
160 &context.run_id,
161 ))
162 }),
163 ts_ms: event_time_ms(),
164 });
165
166 Ok(RunOutcome {
167 run_id: context.run_id.clone(),
168 report_base_path: context.report_base_path.clone(),
169 entity_outcomes,
170 summary,
171 })
172}
173
174fn init_thread_pool() {
175 static INIT: Once = Once::new();
176 INIT.call_once(|| {
177 if std::env::var("RAYON_NUM_THREADS").is_ok() {
178 return;
179 }
180 let cap = std::env::var("FLOE_MAX_THREADS")
181 .ok()
182 .and_then(|value| value.parse::<usize>().ok())
183 .unwrap_or(4);
184 let available = std::thread::available_parallelism()
185 .map(|value| value.get())
186 .unwrap_or(1);
187 let threads = available.min(cap).max(1);
188 let _ = rayon::ThreadPoolBuilder::new()
189 .num_threads(threads)
190 .build_global();
191 });
192}
193
194fn build_run_summary(
195 context: &RunContext,
196 entity_outcomes: &[EntityOutcome],
197) -> report::RunSummaryReport {
198 let mut totals = report::ResultsTotals {
199 files_total: 0,
200 rows_total: 0,
201 accepted_total: 0,
202 rejected_total: 0,
203 warnings_total: 0,
204 errors_total: 0,
205 };
206 let mut statuses = Vec::new();
207 let mut entities = Vec::with_capacity(entity_outcomes.len());
208
209 for outcome in entity_outcomes {
210 let report = &outcome.report;
211 totals.files_total += report.results.files_total;
212 totals.rows_total += report.results.rows_total;
213 totals.accepted_total += report.results.accepted_total;
214 totals.rejected_total += report.results.rejected_total;
215 totals.warnings_total += report.results.warnings_total;
216 totals.errors_total += report.results.errors_total;
217
218 let file_statuses = report
219 .files
220 .iter()
221 .map(|file| file.status)
222 .collect::<Vec<_>>();
223 let (mut status, _) = report::compute_run_outcome(&file_statuses);
224 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
225 status = report::RunStatus::SuccessWithWarnings;
226 }
227 statuses.extend(file_statuses);
228
229 let report_file = context
230 .report_target
231 .as_ref()
232 .map(|target| {
233 target.join_relative(&report::ReportWriter::report_relative_path(
234 &context.run_id,
235 &report.entity.name,
236 ))
237 })
238 .unwrap_or_else(|| "disabled".to_string());
239 entities.push(report::EntitySummary {
240 name: report.entity.name.clone(),
241 status,
242 results: report.results.clone(),
243 report_file,
244 });
245 }
246
247 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
248 if status == report::RunStatus::Success && totals.warnings_total > 0 {
249 status = report::RunStatus::SuccessWithWarnings;
250 }
251
252 let finished_at = report::now_rfc3339();
253 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
254 let report_base_path = context
255 .report_base_path
256 .clone()
257 .unwrap_or_else(|| "disabled".to_string());
258 let report_file = context
259 .report_target
260 .as_ref()
261 .map(|target| {
262 target.join_relative(&report::ReportWriter::summary_relative_path(
263 &context.run_id,
264 ))
265 })
266 .unwrap_or_else(|| "disabled".to_string());
267
268 report::RunSummaryReport {
269 spec_version: context.config.version.clone(),
270 tool: report::ToolInfo {
271 name: "floe".to_string(),
272 version: env!("CARGO_PKG_VERSION").to_string(),
273 git: None,
274 },
275 run: report::RunInfo {
276 run_id: context.run_id.clone(),
277 started_at: context.started_at.clone(),
278 finished_at,
279 duration_ms,
280 status,
281 exit_code,
282 },
283 config: report::ConfigEcho {
284 path: context.config_path.display().to_string(),
285 version: context.config.version.clone(),
286 metadata: context.config.metadata.as_ref().map(project_metadata_json),
287 },
288 report: report::ReportEcho {
289 path: report_base_path,
290 report_file,
291 },
292 results: totals,
293 entities,
294 }
295}
296
297fn run_status_str(status: report::RunStatus) -> &'static str {
298 match status {
299 report::RunStatus::Success => "success",
300 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
301 report::RunStatus::Rejected => "rejected",
302 report::RunStatus::Aborted => "aborted",
303 report::RunStatus::Failed => "failed",
304 }
305}