1use std::path::Path;
2use std::sync::Once;
3
4use crate::io::storage::CloudClient;
5use crate::report::build::project_metadata_json;
6use crate::report::output::write_summary_report;
7use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
8
9mod context;
10pub(crate) mod entity;
11mod events;
12mod file;
13pub mod normalize;
14mod output;
15
16pub(crate) use context::RunContext;
17use entity::{run_entity, EntityRunResult};
18use events::{default_observer, RunEvent};
19
20pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
21
22#[derive(Debug, Clone)]
23pub struct RunOutcome {
24 pub run_id: String,
25 pub report_base_path: Option<String>,
26 pub entity_outcomes: Vec<EntityOutcome>,
27 pub summary: report::RunSummaryReport,
28}
29
30#[derive(Debug, Clone)]
31pub struct EntityOutcome {
32 pub report: crate::report::RunReport,
33 pub file_timings_ms: Vec<Option<u64>>,
34}
35
36pub(crate) fn validate_entities(
37 config: &config::RootConfig,
38 selected: &[String],
39) -> FloeResult<()> {
40 let missing: Vec<String> = selected
41 .iter()
42 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
43 .cloned()
44 .collect();
45
46 if !missing.is_empty() {
47 return Err(Box::new(ConfigError(format!(
48 "entities not found: {}",
49 missing.join(", ")
50 ))));
51 }
52 Ok(())
53}
54
55pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
56 let config_base = config::ConfigBase::local_from_path(config_path);
57 run_with_base(config_path, config_base, options)
58}
59
60pub fn run_with_base(
61 config_path: &Path,
62 config_base: config::ConfigBase,
63 options: RunOptions,
64) -> FloeResult<RunOutcome> {
65 init_thread_pool();
66 let validate_options = ValidateOptions {
67 entities: options.entities.clone(),
68 };
69 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
70
71 let context = RunContext::new(config_path, config_base, &options)?;
72 if !options.entities.is_empty() {
73 validate_entities(&context.config, &options.entities)?;
74 }
75
76 let mut entity_outcomes = Vec::new();
77 let mut abort_run = false;
78 let mut cloud = CloudClient::new();
79 let observer = default_observer();
80 observer.on_event(RunEvent::RunStarted {
81 run_id: context.run_id.clone(),
82 });
83 for entity in &context.config.entities {
84 observer.on_event(RunEvent::EntityStarted {
85 name: entity.name.clone(),
86 });
87 let EntityRunResult {
88 outcome,
89 abort_run: aborted,
90 } = run_entity(&context, &mut cloud, entity)?;
91 observer.on_event(RunEvent::EntityFinished {
92 name: entity.name.clone(),
93 });
94 entity_outcomes.push(outcome);
95 abort_run = abort_run || aborted;
96 if abort_run {
97 break;
98 }
99 }
100 observer.on_event(RunEvent::RunFinished {
101 run_id: context.run_id.clone(),
102 });
103
104 let summary = build_run_summary(&context, &entity_outcomes);
105 if let Some(report_target) = &context.report_target {
106 write_summary_report(
107 report_target,
108 &context.run_id,
109 &summary,
110 &mut cloud,
111 &context.storage_resolver,
112 )?;
113 }
114
115 Ok(RunOutcome {
116 run_id: context.run_id.clone(),
117 report_base_path: context.report_base_path.clone(),
118 entity_outcomes,
119 summary,
120 })
121}
122
123fn init_thread_pool() {
124 static INIT: Once = Once::new();
125 INIT.call_once(|| {
126 if std::env::var("RAYON_NUM_THREADS").is_ok() {
127 return;
128 }
129 let cap = std::env::var("FLOE_MAX_THREADS")
130 .ok()
131 .and_then(|value| value.parse::<usize>().ok())
132 .unwrap_or(4);
133 let available = std::thread::available_parallelism()
134 .map(|value| value.get())
135 .unwrap_or(1);
136 let threads = available.min(cap).max(1);
137 let _ = rayon::ThreadPoolBuilder::new()
138 .num_threads(threads)
139 .build_global();
140 });
141}
142
143fn build_run_summary(
144 context: &RunContext,
145 entity_outcomes: &[EntityOutcome],
146) -> report::RunSummaryReport {
147 let mut totals = report::ResultsTotals {
148 files_total: 0,
149 rows_total: 0,
150 accepted_total: 0,
151 rejected_total: 0,
152 warnings_total: 0,
153 errors_total: 0,
154 };
155 let mut statuses = Vec::new();
156 let mut entities = Vec::with_capacity(entity_outcomes.len());
157
158 for outcome in entity_outcomes {
159 let report = &outcome.report;
160 totals.files_total += report.results.files_total;
161 totals.rows_total += report.results.rows_total;
162 totals.accepted_total += report.results.accepted_total;
163 totals.rejected_total += report.results.rejected_total;
164 totals.warnings_total += report.results.warnings_total;
165 totals.errors_total += report.results.errors_total;
166
167 let file_statuses = report
168 .files
169 .iter()
170 .map(|file| file.status)
171 .collect::<Vec<_>>();
172 let (mut status, _) = report::compute_run_outcome(&file_statuses);
173 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
174 status = report::RunStatus::SuccessWithWarnings;
175 }
176 statuses.extend(file_statuses);
177
178 let report_file = context
179 .report_target
180 .as_ref()
181 .map(|target| {
182 target.join_relative(&report::ReportWriter::report_relative_path(
183 &context.run_id,
184 &report.entity.name,
185 ))
186 })
187 .unwrap_or_else(|| "disabled".to_string());
188 entities.push(report::EntitySummary {
189 name: report.entity.name.clone(),
190 status,
191 results: report.results.clone(),
192 report_file,
193 });
194 }
195
196 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
197 if status == report::RunStatus::Success && totals.warnings_total > 0 {
198 status = report::RunStatus::SuccessWithWarnings;
199 }
200
201 let finished_at = report::now_rfc3339();
202 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
203 let report_base_path = context
204 .report_base_path
205 .clone()
206 .unwrap_or_else(|| "disabled".to_string());
207 let report_file = context
208 .report_target
209 .as_ref()
210 .map(|target| {
211 target.join_relative(&report::ReportWriter::summary_relative_path(
212 &context.run_id,
213 ))
214 })
215 .unwrap_or_else(|| "disabled".to_string());
216
217 report::RunSummaryReport {
218 spec_version: context.config.version.clone(),
219 tool: report::ToolInfo {
220 name: "floe".to_string(),
221 version: env!("CARGO_PKG_VERSION").to_string(),
222 git: None,
223 },
224 run: report::RunInfo {
225 run_id: context.run_id.clone(),
226 started_at: context.started_at.clone(),
227 finished_at,
228 duration_ms,
229 status,
230 exit_code,
231 },
232 config: report::ConfigEcho {
233 path: context.config_path.display().to_string(),
234 version: context.config.version.clone(),
235 metadata: context.config.metadata.as_ref().map(project_metadata_json),
236 },
237 report: report::ReportEcho {
238 path: report_base_path,
239 report_file,
240 },
241 results: totals,
242 entities,
243 }
244}