1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27 pub(crate) entity: &'a config::EntityConfig,
28 pub(crate) resolved_targets: ResolvedEntityTargets,
29 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30 pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35 pub run_id: String,
36 pub report_base_path: Option<String>,
37 pub entity_outcomes: Vec<EntityOutcome>,
38 pub summary: report::RunSummaryReport,
39 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44 pub name: String,
45 pub input_path: String,
46 pub input_format: String,
47 pub accepted_path: String,
48 pub accepted_format: String,
49 pub rejected_path: Option<String>,
50 pub rejected_format: Option<String>,
51 pub archive_path: String,
52 pub archive_storage: Option<String>,
53 pub report_file: Option<String>,
54 pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59 pub report: crate::report::RunReport,
60 pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64 config: &config::RootConfig,
65 selected: &[String],
66) -> FloeResult<()> {
67 let missing: Vec<String> = selected
68 .iter()
69 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70 .cloned()
71 .collect();
72
73 if !missing.is_empty() {
74 return Err(Box::new(ConfigError(format!(
75 "entities not found: {}",
76 missing.join(", ")
77 ))));
78 }
79 Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83 let config_base = config::ConfigBase::local_from_path(config_path);
84 run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88 config_path: &Path,
89 config_base: config::ConfigBase,
90 options: RunOptions,
91) -> FloeResult<RunOutcome> {
92 let mut runtime = DefaultRuntime::new();
93 run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97 config_path: &Path,
98 config_base: config::ConfigBase,
99 options: RunOptions,
100 runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102 init_thread_pool();
103 let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
104 let profile_vars = options
105 .profile
106 .as_ref()
107 .map(|p| {
108 crate::resolve_vars(crate::VarSources {
109 profile: &p.variables,
110 cli: &std::collections::HashMap::new(),
111 config: &raw_config_env_vars,
112 })
113 })
114 .transpose()?
115 .unwrap_or_default();
116 let validate_options = ValidateOptions {
117 entities: options.entities.clone(),
118 profile_vars: profile_vars.clone(),
119 profile_catalogs: options
120 .profile
121 .as_ref()
122 .and_then(|profile| profile.catalogs.clone()),
123 };
124 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
125 let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
126 if !options.entities.is_empty() {
127 validate_entities(&context.config, &options.entities)?;
128 }
129
130 let observer = default_observer();
131 let perf_enabled = perf::phase_timing_enabled();
132 let selected_entities = select_entities(&context, &options);
133 let resolution_mode = if options.dry_run {
134 io::storage::inputs::ResolveInputsMode::ListOnly
135 } else {
136 io::storage::inputs::ResolveInputsMode::Download
137 };
138 if !options.dry_run {
139 observer.on_event(RunEvent::RunStarted {
140 run_id: context.run_id.clone(),
141 config: context.config_path.display().to_string(),
142 report_base: context.report_base_path.clone(),
143 ts_ms: event_time_ms(),
144 });
145 crate::run::events::mark_run_started();
146 }
147 let resolve_start = perf_enabled.then(Instant::now);
148 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
149 if let Some(start) = resolve_start {
150 perf::emit_perf_log(
151 observer,
152 &context.run_id,
153 None,
154 "perf_run_phase_timings",
155 json!({
156 "phase": "resolve_inputs",
157 "elapsed_ms": start.elapsed().as_millis() as u64,
158 "entity_count": selected_entities.len(),
159 "mode": match resolution_mode {
160 io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
161 io::storage::inputs::ResolveInputsMode::Download => "download",
162 },
163 }),
164 );
165 }
166 if options.dry_run {
167 return create_dry_run_outcome(&context, plans);
168 }
169
170 let mut entity_outcomes = Vec::new();
171 let mut abort_run = false;
172
173 for plan in plans {
174 let entity_name = plan.entity.name.clone();
175 observer.on_event(RunEvent::EntityStarted {
176 run_id: context.run_id.clone(),
177 name: entity_name.clone(),
178 ts_ms: event_time_ms(),
179 });
180 let entity_result = run_entity(&context, runtime, observer, plan);
181 if let Err(err) = entity_result {
182 observer.on_event(RunEvent::EntityFinished {
183 run_id: context.run_id.clone(),
184 name: entity_name,
185 status: "failed".to_string(),
186 files: 0,
187 rows: 0,
188 accepted: 0,
189 rejected: 0,
190 warnings: 0,
191 errors: 0,
192 ts_ms: event_time_ms(),
193 });
194 return Err(err);
195 }
196 let EntityRunResult {
197 outcome,
198 abort_run: aborted,
199 } = entity_result.unwrap();
200 let report = &outcome.report;
201 let (mut status, _) = report::compute_run_outcome(
202 &report
203 .files
204 .iter()
205 .map(|file| file.status)
206 .collect::<Vec<_>>(),
207 );
208 status = upgrade_status_for_warnings(status, report.results.warnings_total);
209 observer.on_event(RunEvent::EntityFinished {
210 run_id: context.run_id.clone(),
211 name: report.entity.name.clone(),
212 status: run_status_str(status).to_string(),
213 files: report.results.files_total,
214 rows: report.results.rows_total,
215 accepted: report.results.accepted_total,
216 rejected: report.results.rejected_total,
217 warnings: report.results.warnings_total,
218 errors: report.results.errors_total,
219 ts_ms: event_time_ms(),
220 });
221 entity_outcomes.push(outcome);
222 abort_run = abort_run || aborted;
223 if abort_run {
224 break;
225 }
226 }
227 let summary = build_run_summary(&context, &entity_outcomes);
228 if let Some(report_target) = &context.report_target {
229 let summary_write_start = perf_enabled.then(Instant::now);
230 write_summary_report(
231 report_target,
232 &context.run_id,
233 &summary,
234 runtime.storage(),
235 &context.storage_resolver,
236 )?;
237 if let Some(start) = summary_write_start {
238 perf::emit_perf_log(
239 observer,
240 &context.run_id,
241 None,
242 "perf_run_phase_timings",
243 json!({
244 "phase": "write_summary_report",
245 "elapsed_ms": start.elapsed().as_millis() as u64,
246 "entity_count": entity_outcomes.len(),
247 }),
248 );
249 }
250 }
251 observer.on_event(RunEvent::RunFinished {
252 run_id: context.run_id.clone(),
253 status: run_status_str(summary.run.status).to_string(),
254 exit_code: summary.run.exit_code,
255 files: summary.results.files_total,
256 rows: summary.results.rows_total,
257 accepted: summary.results.accepted_total,
258 rejected: summary.results.rejected_total,
259 warnings: summary.results.warnings_total,
260 errors: summary.results.errors_total,
261 summary_uri: context.report_target.as_ref().map(|target| {
262 target.join_relative(&report::ReportWriter::summary_relative_path(
263 &context.run_id,
264 ))
265 }),
266 ts_ms: event_time_ms(),
267 });
268
269 Ok(RunOutcome {
270 run_id: context.run_id.clone(),
271 report_base_path: context.report_base_path.clone(),
272 entity_outcomes,
273 summary,
274 dry_run_previews: None,
275 })
276}
277
278fn init_thread_pool() {
279 static INIT: Once = Once::new();
280 INIT.call_once(|| {
281 if std::env::var("RAYON_NUM_THREADS").is_ok() {
282 return;
283 }
284 let cap = std::env::var("FLOE_MAX_THREADS")
285 .ok()
286 .and_then(|value| value.parse::<usize>().ok())
287 .unwrap_or(4);
288 let available = std::thread::available_parallelism()
289 .map(|value| value.get())
290 .unwrap_or(1);
291 let threads = available.min(cap).max(1);
292 let _ = rayon::ThreadPoolBuilder::new()
293 .num_threads(threads)
294 .build_global();
295 });
296}
297
298fn select_entities<'a>(
299 context: &'a RunContext,
300 options: &RunOptions,
301) -> Vec<&'a config::EntityConfig> {
302 if options.entities.is_empty() {
303 context.config.entities.iter().collect()
304 } else {
305 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
306 context
307 .config
308 .entities
309 .iter()
310 .filter(|entity| selected.contains(entity.name.as_str()))
311 .collect()
312 }
313}
314
315fn resolve_entity_plans<'a>(
316 context: &'a RunContext,
317 runtime: &mut dyn Runtime,
318 entities: &[&'a config::EntityConfig],
319 resolution_mode: io::storage::inputs::ResolveInputsMode,
320) -> FloeResult<Vec<EntityRunPlan<'a>>> {
321 let mut plans = Vec::with_capacity(entities.len());
322 for entity in entities {
323 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
324 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
325 let needs_temp = matches!(
326 resolution_mode,
327 io::storage::inputs::ResolveInputsMode::Download
328 ) && (resolved_targets.source.is_remote()
329 || resolved_targets.accepted.is_remote()
330 || resolved_targets
331 .rejected
332 .as_ref()
333 .is_some_and(io::storage::Target::is_remote));
334 let temp_dir = if needs_temp {
335 Some(
336 tempfile::TempDir::new()
337 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
338 )
339 } else {
340 None
341 };
342 let storage_client = Some(runtime.storage().client_for(
343 &context.storage_resolver,
344 resolved_targets.source.storage(),
345 entity,
346 )? as &dyn io::storage::StorageClient);
347 let resolved_inputs = io::storage::ops::resolve_inputs(
348 &context.config_dir,
349 entity,
350 input_adapter,
351 &resolved_targets.source,
352 resolution_mode,
353 temp_dir.as_ref().map(|dir| dir.path()),
354 storage_client,
355 )?;
356 plans.push(EntityRunPlan {
357 entity,
358 resolved_targets,
359 resolved_inputs,
360 temp_dir,
361 });
362 }
363 Ok(plans)
364}
365
366fn build_run_summary(
367 context: &RunContext,
368 entity_outcomes: &[EntityOutcome],
369) -> report::RunSummaryReport {
370 let mut totals = report::ResultsTotals {
371 files_total: 0,
372 rows_total: 0,
373 accepted_total: 0,
374 rejected_total: 0,
375 warnings_total: 0,
376 errors_total: 0,
377 };
378 let mut statuses = Vec::new();
379 let mut entities = Vec::with_capacity(entity_outcomes.len());
380
381 for outcome in entity_outcomes {
382 let report = &outcome.report;
383 totals.files_total += report.results.files_total;
384 totals.rows_total += report.results.rows_total;
385 totals.accepted_total += report.results.accepted_total;
386 totals.rejected_total += report.results.rejected_total;
387 totals.warnings_total += report.results.warnings_total;
388 totals.errors_total += report.results.errors_total;
389
390 let file_statuses = report
391 .files
392 .iter()
393 .map(|file| file.status)
394 .collect::<Vec<_>>();
395 let (mut status, _) = report::compute_run_outcome(&file_statuses);
396 status = upgrade_status_for_warnings(status, report.results.warnings_total);
397 statuses.extend(file_statuses);
398
399 let report_file = context
400 .report_target
401 .as_ref()
402 .map(|target| {
403 target.join_relative(&report::ReportWriter::report_relative_path(
404 &context.run_id,
405 &report.entity.name,
406 ))
407 })
408 .unwrap_or_else(|| "disabled".to_string());
409 entities.push(report::EntitySummary {
410 name: report.entity.name.clone(),
411 status,
412 results: report.results.clone(),
413 report_file,
414 });
415 }
416
417 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
418 status = upgrade_status_for_warnings(status, totals.warnings_total);
419
420 let finished_at = report::now_rfc3339();
421 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
422 let report_base_path = context
423 .report_base_path
424 .clone()
425 .unwrap_or_else(|| "disabled".to_string());
426 let report_file = context
427 .report_target
428 .as_ref()
429 .map(|target| {
430 target.join_relative(&report::ReportWriter::summary_relative_path(
431 &context.run_id,
432 ))
433 })
434 .unwrap_or_else(|| "disabled".to_string());
435
436 report::RunSummaryReport {
437 spec_version: context.config.version.clone(),
438 tool: build_tool_info(),
439 run: report::RunInfo {
440 run_id: context.run_id.clone(),
441 started_at: context.started_at.clone(),
442 finished_at,
443 duration_ms,
444 status,
445 exit_code,
446 },
447 config: build_config_echo(context),
448 report: build_report_echo(report_base_path, report_file),
449 results: totals,
450 entities,
451 }
452}
453
454fn create_dry_run_outcome(
455 context: &RunContext,
456 plans: Vec<EntityRunPlan<'_>>,
457) -> FloeResult<RunOutcome> {
458 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
459
460 for plan in plans {
461 let entity = plan.entity;
462 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
463 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
464 let (archive_path, archive_storage) = entity
465 .sink
466 .archive
467 .as_ref()
468 .map(|a| (a.path.clone(), a.storage.clone()))
469 .unwrap_or_else(|| (String::new(), None));
470
471 let report_file = context.report_target.as_ref().map(|target| {
472 target.join_relative(&report::ReportWriter::report_relative_path(
473 &context.run_id,
474 &entity.name,
475 ))
476 });
477
478 previews.push(DryRunEntityPreview {
479 name: entity.name.clone(),
480 input_path: entity.source.path.clone(),
481 input_format: entity.source.format.clone(),
482 accepted_path: entity.sink.accepted.path.clone(),
483 accepted_format: entity.sink.accepted.format.clone(),
484 rejected_path,
485 rejected_format,
486 archive_path,
487 archive_storage,
488 report_file,
489 scanned_files: plan.resolved_inputs.listed,
490 });
491 }
492
493 Ok(RunOutcome {
494 run_id: context.run_id.clone(),
495 report_base_path: context.report_base_path.clone(),
496 entity_outcomes: Vec::new(),
497 summary: report::RunSummaryReport {
498 spec_version: context.config.version.clone(),
499 tool: build_tool_info(),
500 run: report::RunInfo {
501 run_id: context.run_id.clone(),
502 started_at: context.started_at.clone(),
503 finished_at: report::now_rfc3339(),
504 duration_ms: 0,
505 status: report::RunStatus::Success,
506 exit_code: 0,
507 },
508 config: build_config_echo(context),
509 report: build_report_echo(
510 context
511 .report_base_path
512 .clone()
513 .unwrap_or_else(|| "disabled".to_string()),
514 "disabled (dry-run)".to_string(),
515 ),
516 results: report::ResultsTotals {
517 files_total: 0,
518 rows_total: 0,
519 accepted_total: 0,
520 rejected_total: 0,
521 warnings_total: 0,
522 errors_total: 0,
523 },
524 entities: Vec::new(),
525 },
526 dry_run_previews: Some(previews),
527 })
528}
529
530fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
531 if status == report::RunStatus::Success && warnings > 0 {
532 report::RunStatus::SuccessWithWarnings
533 } else {
534 status
535 }
536}
537
538fn build_tool_info() -> report::ToolInfo {
539 report::ToolInfo {
540 name: "floe".to_string(),
541 version: env!("CARGO_PKG_VERSION").to_string(),
542 git: None,
543 }
544}
545
546fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
547 report::ConfigEcho {
548 path: context.config_path.display().to_string(),
549 version: context.config.version.clone(),
550 metadata: context.config.metadata.as_ref().map(project_metadata_json),
551 }
552}
553
554fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
555 report::ReportEcho { path, report_file }
556}
557
558fn run_status_str(status: report::RunStatus) -> &'static str {
559 match status {
560 report::RunStatus::Success => "success",
561 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
562 report::RunStatus::Rejected => "rejected",
563 report::RunStatus::Aborted => "aborted",
564 report::RunStatus::Failed => "failed",
565 }
566}