1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27 pub(crate) entity: &'a config::EntityConfig,
28 pub(crate) resolved_targets: ResolvedEntityTargets,
29 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30 pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35 pub run_id: String,
36 pub report_base_path: Option<String>,
37 pub entity_outcomes: Vec<EntityOutcome>,
38 pub summary: report::RunSummaryReport,
39 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44 pub name: String,
45 pub input_path: String,
46 pub input_format: String,
47 pub accepted_path: String,
48 pub accepted_format: String,
49 pub rejected_path: Option<String>,
50 pub rejected_format: Option<String>,
51 pub archive_path: String,
52 pub archive_storage: Option<String>,
53 pub report_file: Option<String>,
54 pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59 pub report: crate::report::RunReport,
60 pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64 config: &config::RootConfig,
65 selected: &[String],
66) -> FloeResult<()> {
67 let missing: Vec<String> = selected
68 .iter()
69 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70 .cloned()
71 .collect();
72
73 if !missing.is_empty() {
74 return Err(Box::new(ConfigError(format!(
75 "entities not found: {}",
76 missing.join(", ")
77 ))));
78 }
79 Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83 let config_base = config::ConfigBase::local_from_path(config_path);
84 run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88 config_path: &Path,
89 config_base: config::ConfigBase,
90 options: RunOptions,
91) -> FloeResult<RunOutcome> {
92 let mut runtime = DefaultRuntime::new();
93 run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97 config_path: &Path,
98 config_base: config::ConfigBase,
99 options: RunOptions,
100 runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102 init_thread_pool();
103 let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
104 let profile_vars = options
105 .profile
106 .as_ref()
107 .map(|p| {
108 crate::resolve_vars(crate::VarSources {
109 profile: &p.variables,
110 cli: &std::collections::HashMap::new(),
111 config: &raw_config_env_vars,
112 })
113 })
114 .transpose()?
115 .unwrap_or_default();
116 let validate_options = ValidateOptions {
117 entities: options.entities.clone(),
118 profile_vars: profile_vars.clone(),
119 };
120 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
121 let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
122 if !options.entities.is_empty() {
123 validate_entities(&context.config, &options.entities)?;
124 }
125
126 let observer = default_observer();
127 let perf_enabled = perf::phase_timing_enabled();
128 let selected_entities = select_entities(&context, &options);
129 let resolution_mode = if options.dry_run {
130 io::storage::inputs::ResolveInputsMode::ListOnly
131 } else {
132 io::storage::inputs::ResolveInputsMode::Download
133 };
134 if !options.dry_run {
135 observer.on_event(RunEvent::RunStarted {
136 run_id: context.run_id.clone(),
137 config: context.config_path.display().to_string(),
138 report_base: context.report_base_path.clone(),
139 ts_ms: event_time_ms(),
140 });
141 }
142 let resolve_start = perf_enabled.then(Instant::now);
143 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
144 if let Some(start) = resolve_start {
145 perf::emit_perf_log(
146 observer,
147 &context.run_id,
148 None,
149 "perf_run_phase_timings",
150 json!({
151 "phase": "resolve_inputs",
152 "elapsed_ms": start.elapsed().as_millis() as u64,
153 "entity_count": selected_entities.len(),
154 "mode": match resolution_mode {
155 io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
156 io::storage::inputs::ResolveInputsMode::Download => "download",
157 },
158 }),
159 );
160 }
161 if options.dry_run {
162 return create_dry_run_outcome(&context, plans);
163 }
164
165 let mut entity_outcomes = Vec::new();
166 let mut abort_run = false;
167
168 for plan in plans {
169 observer.on_event(RunEvent::EntityStarted {
170 run_id: context.run_id.clone(),
171 name: plan.entity.name.clone(),
172 ts_ms: event_time_ms(),
173 });
174 let EntityRunResult {
175 outcome,
176 abort_run: aborted,
177 } = run_entity(&context, runtime, observer, plan)?;
178 let report = &outcome.report;
179 let (mut status, _) = report::compute_run_outcome(
180 &report
181 .files
182 .iter()
183 .map(|file| file.status)
184 .collect::<Vec<_>>(),
185 );
186 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
187 status = report::RunStatus::SuccessWithWarnings;
188 }
189 observer.on_event(RunEvent::EntityFinished {
190 run_id: context.run_id.clone(),
191 name: report.entity.name.clone(),
192 status: run_status_str(status).to_string(),
193 files: report.results.files_total,
194 rows: report.results.rows_total,
195 accepted: report.results.accepted_total,
196 rejected: report.results.rejected_total,
197 warnings: report.results.warnings_total,
198 errors: report.results.errors_total,
199 ts_ms: event_time_ms(),
200 });
201 entity_outcomes.push(outcome);
202 abort_run = abort_run || aborted;
203 if abort_run {
204 break;
205 }
206 }
207 let summary = build_run_summary(&context, &entity_outcomes);
208 if let Some(report_target) = &context.report_target {
209 let summary_write_start = perf_enabled.then(Instant::now);
210 write_summary_report(
211 report_target,
212 &context.run_id,
213 &summary,
214 runtime.storage(),
215 &context.storage_resolver,
216 )?;
217 if let Some(start) = summary_write_start {
218 perf::emit_perf_log(
219 observer,
220 &context.run_id,
221 None,
222 "perf_run_phase_timings",
223 json!({
224 "phase": "write_summary_report",
225 "elapsed_ms": start.elapsed().as_millis() as u64,
226 "entity_count": entity_outcomes.len(),
227 }),
228 );
229 }
230 }
231 observer.on_event(RunEvent::RunFinished {
232 run_id: context.run_id.clone(),
233 status: run_status_str(summary.run.status).to_string(),
234 exit_code: summary.run.exit_code,
235 files: summary.results.files_total,
236 rows: summary.results.rows_total,
237 accepted: summary.results.accepted_total,
238 rejected: summary.results.rejected_total,
239 warnings: summary.results.warnings_total,
240 errors: summary.results.errors_total,
241 summary_uri: context.report_target.as_ref().map(|target| {
242 target.join_relative(&report::ReportWriter::summary_relative_path(
243 &context.run_id,
244 ))
245 }),
246 ts_ms: event_time_ms(),
247 });
248
249 Ok(RunOutcome {
250 run_id: context.run_id.clone(),
251 report_base_path: context.report_base_path.clone(),
252 entity_outcomes,
253 summary,
254 dry_run_previews: None,
255 })
256}
257
258fn init_thread_pool() {
259 static INIT: Once = Once::new();
260 INIT.call_once(|| {
261 if std::env::var("RAYON_NUM_THREADS").is_ok() {
262 return;
263 }
264 let cap = std::env::var("FLOE_MAX_THREADS")
265 .ok()
266 .and_then(|value| value.parse::<usize>().ok())
267 .unwrap_or(4);
268 let available = std::thread::available_parallelism()
269 .map(|value| value.get())
270 .unwrap_or(1);
271 let threads = available.min(cap).max(1);
272 let _ = rayon::ThreadPoolBuilder::new()
273 .num_threads(threads)
274 .build_global();
275 });
276}
277
278fn select_entities<'a>(
279 context: &'a RunContext,
280 options: &RunOptions,
281) -> Vec<&'a config::EntityConfig> {
282 if options.entities.is_empty() {
283 context.config.entities.iter().collect()
284 } else {
285 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
286 context
287 .config
288 .entities
289 .iter()
290 .filter(|entity| selected.contains(entity.name.as_str()))
291 .collect()
292 }
293}
294
295fn resolve_entity_plans<'a>(
296 context: &'a RunContext,
297 runtime: &mut dyn Runtime,
298 entities: &[&'a config::EntityConfig],
299 resolution_mode: io::storage::inputs::ResolveInputsMode,
300) -> FloeResult<Vec<EntityRunPlan<'a>>> {
301 let mut plans = Vec::with_capacity(entities.len());
302 for entity in entities {
303 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
304 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
305 let needs_temp = matches!(
306 resolution_mode,
307 io::storage::inputs::ResolveInputsMode::Download
308 ) && (resolved_targets.source.is_remote()
309 || resolved_targets.accepted.is_remote()
310 || resolved_targets
311 .rejected
312 .as_ref()
313 .is_some_and(io::storage::Target::is_remote));
314 let temp_dir = if needs_temp {
315 Some(
316 tempfile::TempDir::new()
317 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
318 )
319 } else {
320 None
321 };
322 let storage_client = Some(runtime.storage().client_for(
323 &context.storage_resolver,
324 resolved_targets.source.storage(),
325 entity,
326 )? as &dyn io::storage::StorageClient);
327 let resolved_inputs = io::storage::ops::resolve_inputs(
328 &context.config_dir,
329 entity,
330 input_adapter,
331 &resolved_targets.source,
332 resolution_mode,
333 temp_dir.as_ref().map(|dir| dir.path()),
334 storage_client,
335 )?;
336 plans.push(EntityRunPlan {
337 entity,
338 resolved_targets,
339 resolved_inputs,
340 temp_dir,
341 });
342 }
343 Ok(plans)
344}
345
346fn build_run_summary(
347 context: &RunContext,
348 entity_outcomes: &[EntityOutcome],
349) -> report::RunSummaryReport {
350 let mut totals = report::ResultsTotals {
351 files_total: 0,
352 rows_total: 0,
353 accepted_total: 0,
354 rejected_total: 0,
355 warnings_total: 0,
356 errors_total: 0,
357 };
358 let mut statuses = Vec::new();
359 let mut entities = Vec::with_capacity(entity_outcomes.len());
360
361 for outcome in entity_outcomes {
362 let report = &outcome.report;
363 totals.files_total += report.results.files_total;
364 totals.rows_total += report.results.rows_total;
365 totals.accepted_total += report.results.accepted_total;
366 totals.rejected_total += report.results.rejected_total;
367 totals.warnings_total += report.results.warnings_total;
368 totals.errors_total += report.results.errors_total;
369
370 let file_statuses = report
371 .files
372 .iter()
373 .map(|file| file.status)
374 .collect::<Vec<_>>();
375 let (mut status, _) = report::compute_run_outcome(&file_statuses);
376 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
377 status = report::RunStatus::SuccessWithWarnings;
378 }
379 statuses.extend(file_statuses);
380
381 let report_file = context
382 .report_target
383 .as_ref()
384 .map(|target| {
385 target.join_relative(&report::ReportWriter::report_relative_path(
386 &context.run_id,
387 &report.entity.name,
388 ))
389 })
390 .unwrap_or_else(|| "disabled".to_string());
391 entities.push(report::EntitySummary {
392 name: report.entity.name.clone(),
393 status,
394 results: report.results.clone(),
395 report_file,
396 });
397 }
398
399 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
400 if status == report::RunStatus::Success && totals.warnings_total > 0 {
401 status = report::RunStatus::SuccessWithWarnings;
402 }
403
404 let finished_at = report::now_rfc3339();
405 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
406 let report_base_path = context
407 .report_base_path
408 .clone()
409 .unwrap_or_else(|| "disabled".to_string());
410 let report_file = context
411 .report_target
412 .as_ref()
413 .map(|target| {
414 target.join_relative(&report::ReportWriter::summary_relative_path(
415 &context.run_id,
416 ))
417 })
418 .unwrap_or_else(|| "disabled".to_string());
419
420 report::RunSummaryReport {
421 spec_version: context.config.version.clone(),
422 tool: report::ToolInfo {
423 name: "floe".to_string(),
424 version: env!("CARGO_PKG_VERSION").to_string(),
425 git: None,
426 },
427 run: report::RunInfo {
428 run_id: context.run_id.clone(),
429 started_at: context.started_at.clone(),
430 finished_at,
431 duration_ms,
432 status,
433 exit_code,
434 },
435 config: report::ConfigEcho {
436 path: context.config_path.display().to_string(),
437 version: context.config.version.clone(),
438 metadata: context.config.metadata.as_ref().map(project_metadata_json),
439 },
440 report: report::ReportEcho {
441 path: report_base_path,
442 report_file,
443 },
444 results: totals,
445 entities,
446 }
447}
448
449fn create_dry_run_outcome(
450 context: &RunContext,
451 plans: Vec<EntityRunPlan<'_>>,
452) -> FloeResult<RunOutcome> {
453 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
454
455 for plan in plans {
456 let entity = plan.entity;
457 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
458 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
459 let (archive_path, archive_storage) = entity
460 .sink
461 .archive
462 .as_ref()
463 .map(|a| (a.path.clone(), a.storage.clone()))
464 .unwrap_or_else(|| (String::new(), None));
465
466 let report_file = context.report_target.as_ref().map(|target| {
467 target.join_relative(&report::ReportWriter::report_relative_path(
468 &context.run_id,
469 &entity.name,
470 ))
471 });
472
473 previews.push(DryRunEntityPreview {
474 name: entity.name.clone(),
475 input_path: entity.source.path.clone(),
476 input_format: entity.source.format.clone(),
477 accepted_path: entity.sink.accepted.path.clone(),
478 accepted_format: entity.sink.accepted.format.clone(),
479 rejected_path,
480 rejected_format,
481 archive_path,
482 archive_storage,
483 report_file,
484 scanned_files: plan.resolved_inputs.listed,
485 });
486 }
487
488 Ok(RunOutcome {
489 run_id: context.run_id.clone(),
490 report_base_path: context.report_base_path.clone(),
491 entity_outcomes: Vec::new(),
492 summary: report::RunSummaryReport {
493 spec_version: context.config.version.clone(),
494 tool: report::ToolInfo {
495 name: "floe".to_string(),
496 version: env!("CARGO_PKG_VERSION").to_string(),
497 git: None,
498 },
499 run: report::RunInfo {
500 run_id: context.run_id.clone(),
501 started_at: context.started_at.clone(),
502 finished_at: report::now_rfc3339(),
503 duration_ms: 0,
504 status: report::RunStatus::Success,
505 exit_code: 0,
506 },
507 config: report::ConfigEcho {
508 path: context.config_path.display().to_string(),
509 version: context.config.version.clone(),
510 metadata: context.config.metadata.as_ref().map(project_metadata_json),
511 },
512 report: report::ReportEcho {
513 path: context
514 .report_base_path
515 .clone()
516 .unwrap_or_else(|| "disabled".to_string()),
517 report_file: "disabled (dry-run)".to_string(),
518 },
519 results: report::ResultsTotals {
520 files_total: 0,
521 rows_total: 0,
522 accepted_total: 0,
523 rejected_total: 0,
524 warnings_total: 0,
525 errors_total: 0,
526 },
527 entities: Vec::new(),
528 },
529 dry_run_previews: Some(previews),
530 })
531}
532
533fn run_status_str(status: report::RunStatus) -> &'static str {
534 match status {
535 report::RunStatus::Success => "success",
536 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
537 report::RunStatus::Rejected => "rejected",
538 report::RunStatus::Aborted => "aborted",
539 report::RunStatus::Failed => "failed",
540 }
541}