1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4
5use crate::errors::IoError;
6use crate::report::build::project_metadata_json;
7use crate::report::output::write_summary_report;
8use crate::runtime::{DefaultRuntime, Runtime};
9use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
10
11mod context;
12pub(crate) mod entity;
13pub mod events;
14mod file;
15mod output;
16
17pub(crate) use context::RunContext;
18use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
19use events::{default_observer, event_time_ms, RunEvent};
20
21pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
22
23pub(crate) struct EntityRunPlan<'a> {
24 pub(crate) entity: &'a config::EntityConfig,
25 pub(crate) resolved_targets: ResolvedEntityTargets,
26 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
27 pub(crate) temp_dir: Option<tempfile::TempDir>,
28}
29
30#[derive(Debug, Clone)]
31pub struct RunOutcome {
32 pub run_id: String,
33 pub report_base_path: Option<String>,
34 pub entity_outcomes: Vec<EntityOutcome>,
35 pub summary: report::RunSummaryReport,
36 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct DryRunEntityPreview {
41 pub name: String,
42 pub input_path: String,
43 pub input_format: String,
44 pub accepted_path: String,
45 pub accepted_format: String,
46 pub rejected_path: Option<String>,
47 pub rejected_format: Option<String>,
48 pub archive_path: String,
49 pub archive_storage: Option<String>,
50 pub report_file: Option<String>,
51 pub scanned_files: Vec<String>,
52}
53
54#[derive(Debug, Clone)]
55pub struct EntityOutcome {
56 pub report: crate::report::RunReport,
57 pub file_timings_ms: Vec<Option<u64>>,
58}
59
60pub(crate) fn validate_entities(
61 config: &config::RootConfig,
62 selected: &[String],
63) -> FloeResult<()> {
64 let missing: Vec<String> = selected
65 .iter()
66 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
67 .cloned()
68 .collect();
69
70 if !missing.is_empty() {
71 return Err(Box::new(ConfigError(format!(
72 "entities not found: {}",
73 missing.join(", ")
74 ))));
75 }
76 Ok(())
77}
78
79pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
80 let config_base = config::ConfigBase::local_from_path(config_path);
81 run_with_base(config_path, config_base, options)
82}
83
84pub fn run_with_base(
85 config_path: &Path,
86 config_base: config::ConfigBase,
87 options: RunOptions,
88) -> FloeResult<RunOutcome> {
89 let mut runtime = DefaultRuntime::new();
90 run_with_runtime(config_path, config_base, options, &mut runtime)
91}
92
93pub fn run_with_runtime(
94 config_path: &Path,
95 config_base: config::ConfigBase,
96 options: RunOptions,
97 runtime: &mut dyn Runtime,
98) -> FloeResult<RunOutcome> {
99 init_thread_pool();
100 let validate_options = ValidateOptions {
101 entities: options.entities.clone(),
102 };
103 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
104
105 let context = RunContext::new(config_path, config_base, &options)?;
106 if !options.entities.is_empty() {
107 validate_entities(&context.config, &options.entities)?;
108 }
109
110 let selected_entities = select_entities(&context, &options);
111 let resolution_mode = if options.dry_run {
112 io::storage::inputs::ResolveInputsMode::ListOnly
113 } else {
114 io::storage::inputs::ResolveInputsMode::Download
115 };
116 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
117 if options.dry_run {
118 return create_dry_run_outcome(&context, plans);
119 }
120
121 let mut entity_outcomes = Vec::new();
122 let mut abort_run = false;
123 let observer = default_observer();
124 observer.on_event(RunEvent::RunStarted {
125 run_id: context.run_id.clone(),
126 config: context.config_path.display().to_string(),
127 report_base: context.report_base_path.clone(),
128 ts_ms: event_time_ms(),
129 });
130
131 for plan in plans {
132 observer.on_event(RunEvent::EntityStarted {
133 run_id: context.run_id.clone(),
134 name: plan.entity.name.clone(),
135 ts_ms: event_time_ms(),
136 });
137 let EntityRunResult {
138 outcome,
139 abort_run: aborted,
140 } = run_entity(&context, runtime, observer, plan)?;
141 let report = &outcome.report;
142 let (mut status, _) = report::compute_run_outcome(
143 &report
144 .files
145 .iter()
146 .map(|file| file.status)
147 .collect::<Vec<_>>(),
148 );
149 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
150 status = report::RunStatus::SuccessWithWarnings;
151 }
152 observer.on_event(RunEvent::EntityFinished {
153 run_id: context.run_id.clone(),
154 name: report.entity.name.clone(),
155 status: run_status_str(status).to_string(),
156 files: report.results.files_total,
157 rows: report.results.rows_total,
158 accepted: report.results.accepted_total,
159 rejected: report.results.rejected_total,
160 warnings: report.results.warnings_total,
161 errors: report.results.errors_total,
162 ts_ms: event_time_ms(),
163 });
164 entity_outcomes.push(outcome);
165 abort_run = abort_run || aborted;
166 if abort_run {
167 break;
168 }
169 }
170 let summary = build_run_summary(&context, &entity_outcomes);
171 if let Some(report_target) = &context.report_target {
172 write_summary_report(
173 report_target,
174 &context.run_id,
175 &summary,
176 runtime.storage(),
177 &context.storage_resolver,
178 )?;
179 }
180 observer.on_event(RunEvent::RunFinished {
181 run_id: context.run_id.clone(),
182 status: run_status_str(summary.run.status).to_string(),
183 exit_code: summary.run.exit_code,
184 files: summary.results.files_total,
185 rows: summary.results.rows_total,
186 accepted: summary.results.accepted_total,
187 rejected: summary.results.rejected_total,
188 warnings: summary.results.warnings_total,
189 errors: summary.results.errors_total,
190 summary_uri: context.report_target.as_ref().map(|target| {
191 target.join_relative(&report::ReportWriter::summary_relative_path(
192 &context.run_id,
193 ))
194 }),
195 ts_ms: event_time_ms(),
196 });
197
198 Ok(RunOutcome {
199 run_id: context.run_id.clone(),
200 report_base_path: context.report_base_path.clone(),
201 entity_outcomes,
202 summary,
203 dry_run_previews: None,
204 })
205}
206
207fn init_thread_pool() {
208 static INIT: Once = Once::new();
209 INIT.call_once(|| {
210 if std::env::var("RAYON_NUM_THREADS").is_ok() {
211 return;
212 }
213 let cap = std::env::var("FLOE_MAX_THREADS")
214 .ok()
215 .and_then(|value| value.parse::<usize>().ok())
216 .unwrap_or(4);
217 let available = std::thread::available_parallelism()
218 .map(|value| value.get())
219 .unwrap_or(1);
220 let threads = available.min(cap).max(1);
221 let _ = rayon::ThreadPoolBuilder::new()
222 .num_threads(threads)
223 .build_global();
224 });
225}
226
227fn select_entities<'a>(
228 context: &'a RunContext,
229 options: &RunOptions,
230) -> Vec<&'a config::EntityConfig> {
231 if options.entities.is_empty() {
232 context.config.entities.iter().collect()
233 } else {
234 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
235 context
236 .config
237 .entities
238 .iter()
239 .filter(|entity| selected.contains(entity.name.as_str()))
240 .collect()
241 }
242}
243
244fn resolve_entity_plans<'a>(
245 context: &'a RunContext,
246 runtime: &mut dyn Runtime,
247 entities: &[&'a config::EntityConfig],
248 resolution_mode: io::storage::inputs::ResolveInputsMode,
249) -> FloeResult<Vec<EntityRunPlan<'a>>> {
250 let mut plans = Vec::with_capacity(entities.len());
251 for entity in entities {
252 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
253 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
254 let needs_temp = matches!(
255 resolution_mode,
256 io::storage::inputs::ResolveInputsMode::Download
257 ) && (resolved_targets.source.is_remote()
258 || resolved_targets.accepted.is_remote()
259 || resolved_targets
260 .rejected
261 .as_ref()
262 .is_some_and(io::storage::Target::is_remote));
263 let temp_dir = if needs_temp {
264 Some(
265 tempfile::TempDir::new()
266 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
267 )
268 } else {
269 None
270 };
271 let storage_client = Some(runtime.storage().client_for(
272 &context.storage_resolver,
273 resolved_targets.source.storage(),
274 entity,
275 )? as &dyn io::storage::StorageClient);
276 let resolved_inputs = io::storage::ops::resolve_inputs(
277 &context.config_dir,
278 entity,
279 input_adapter,
280 &resolved_targets.source,
281 resolution_mode,
282 temp_dir.as_ref().map(|dir| dir.path()),
283 storage_client,
284 )?;
285 plans.push(EntityRunPlan {
286 entity,
287 resolved_targets,
288 resolved_inputs,
289 temp_dir,
290 });
291 }
292 Ok(plans)
293}
294
295fn build_run_summary(
296 context: &RunContext,
297 entity_outcomes: &[EntityOutcome],
298) -> report::RunSummaryReport {
299 let mut totals = report::ResultsTotals {
300 files_total: 0,
301 rows_total: 0,
302 accepted_total: 0,
303 rejected_total: 0,
304 warnings_total: 0,
305 errors_total: 0,
306 };
307 let mut statuses = Vec::new();
308 let mut entities = Vec::with_capacity(entity_outcomes.len());
309
310 for outcome in entity_outcomes {
311 let report = &outcome.report;
312 totals.files_total += report.results.files_total;
313 totals.rows_total += report.results.rows_total;
314 totals.accepted_total += report.results.accepted_total;
315 totals.rejected_total += report.results.rejected_total;
316 totals.warnings_total += report.results.warnings_total;
317 totals.errors_total += report.results.errors_total;
318
319 let file_statuses = report
320 .files
321 .iter()
322 .map(|file| file.status)
323 .collect::<Vec<_>>();
324 let (mut status, _) = report::compute_run_outcome(&file_statuses);
325 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
326 status = report::RunStatus::SuccessWithWarnings;
327 }
328 statuses.extend(file_statuses);
329
330 let report_file = context
331 .report_target
332 .as_ref()
333 .map(|target| {
334 target.join_relative(&report::ReportWriter::report_relative_path(
335 &context.run_id,
336 &report.entity.name,
337 ))
338 })
339 .unwrap_or_else(|| "disabled".to_string());
340 entities.push(report::EntitySummary {
341 name: report.entity.name.clone(),
342 status,
343 results: report.results.clone(),
344 report_file,
345 });
346 }
347
348 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
349 if status == report::RunStatus::Success && totals.warnings_total > 0 {
350 status = report::RunStatus::SuccessWithWarnings;
351 }
352
353 let finished_at = report::now_rfc3339();
354 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
355 let report_base_path = context
356 .report_base_path
357 .clone()
358 .unwrap_or_else(|| "disabled".to_string());
359 let report_file = context
360 .report_target
361 .as_ref()
362 .map(|target| {
363 target.join_relative(&report::ReportWriter::summary_relative_path(
364 &context.run_id,
365 ))
366 })
367 .unwrap_or_else(|| "disabled".to_string());
368
369 report::RunSummaryReport {
370 spec_version: context.config.version.clone(),
371 tool: report::ToolInfo {
372 name: "floe".to_string(),
373 version: env!("CARGO_PKG_VERSION").to_string(),
374 git: None,
375 },
376 run: report::RunInfo {
377 run_id: context.run_id.clone(),
378 started_at: context.started_at.clone(),
379 finished_at,
380 duration_ms,
381 status,
382 exit_code,
383 },
384 config: report::ConfigEcho {
385 path: context.config_path.display().to_string(),
386 version: context.config.version.clone(),
387 metadata: context.config.metadata.as_ref().map(project_metadata_json),
388 },
389 report: report::ReportEcho {
390 path: report_base_path,
391 report_file,
392 },
393 results: totals,
394 entities,
395 }
396}
397
398fn create_dry_run_outcome(
399 context: &RunContext,
400 plans: Vec<EntityRunPlan<'_>>,
401) -> FloeResult<RunOutcome> {
402 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
403
404 for plan in plans {
405 let entity = plan.entity;
406 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
407 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
408 let (archive_path, archive_storage) = entity
409 .sink
410 .archive
411 .as_ref()
412 .map(|a| (a.path.clone(), a.storage.clone()))
413 .unwrap_or_else(|| (String::new(), None));
414
415 let report_file = context.report_target.as_ref().map(|target| {
416 target.join_relative(&report::ReportWriter::report_relative_path(
417 &context.run_id,
418 &entity.name,
419 ))
420 });
421
422 previews.push(DryRunEntityPreview {
423 name: entity.name.clone(),
424 input_path: entity.source.path.clone(),
425 input_format: entity.source.format.clone(),
426 accepted_path: entity.sink.accepted.path.clone(),
427 accepted_format: entity.sink.accepted.format.clone(),
428 rejected_path,
429 rejected_format,
430 archive_path,
431 archive_storage,
432 report_file,
433 scanned_files: plan.resolved_inputs.listed,
434 });
435 }
436
437 Ok(RunOutcome {
438 run_id: context.run_id.clone(),
439 report_base_path: context.report_base_path.clone(),
440 entity_outcomes: Vec::new(),
441 summary: report::RunSummaryReport {
442 spec_version: context.config.version.clone(),
443 tool: report::ToolInfo {
444 name: "floe".to_string(),
445 version: env!("CARGO_PKG_VERSION").to_string(),
446 git: None,
447 },
448 run: report::RunInfo {
449 run_id: context.run_id.clone(),
450 started_at: context.started_at.clone(),
451 finished_at: report::now_rfc3339(),
452 duration_ms: 0,
453 status: report::RunStatus::Success,
454 exit_code: 0,
455 },
456 config: report::ConfigEcho {
457 path: context.config_path.display().to_string(),
458 version: context.config.version.clone(),
459 metadata: context.config.metadata.as_ref().map(project_metadata_json),
460 },
461 report: report::ReportEcho {
462 path: context
463 .report_base_path
464 .clone()
465 .unwrap_or_else(|| "disabled".to_string()),
466 report_file: "disabled (dry-run)".to_string(),
467 },
468 results: report::ResultsTotals {
469 files_total: 0,
470 rows_total: 0,
471 accepted_total: 0,
472 rejected_total: 0,
473 warnings_total: 0,
474 errors_total: 0,
475 },
476 entities: Vec::new(),
477 },
478 dry_run_previews: Some(previews),
479 })
480}
481
482fn run_status_str(status: report::RunStatus) -> &'static str {
483 match status {
484 report::RunStatus::Success => "success",
485 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
486 report::RunStatus::Rejected => "rejected",
487 report::RunStatus::Aborted => "aborted",
488 report::RunStatus::Failed => "failed",
489 }
490}