1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27 pub(crate) entity: &'a config::EntityConfig,
28 pub(crate) resolved_targets: ResolvedEntityTargets,
29 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30 pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35 pub run_id: String,
36 pub report_base_path: Option<String>,
37 pub entity_outcomes: Vec<EntityOutcome>,
38 pub summary: report::RunSummaryReport,
39 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44 pub name: String,
45 pub input_path: String,
46 pub input_format: String,
47 pub accepted_path: String,
48 pub accepted_format: String,
49 pub rejected_path: Option<String>,
50 pub rejected_format: Option<String>,
51 pub archive_path: String,
52 pub archive_storage: Option<String>,
53 pub report_file: Option<String>,
54 pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59 pub report: crate::report::RunReport,
60 pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64 config: &config::RootConfig,
65 selected: &[String],
66) -> FloeResult<()> {
67 let missing: Vec<String> = selected
68 .iter()
69 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70 .cloned()
71 .collect();
72
73 if !missing.is_empty() {
74 return Err(Box::new(ConfigError(format!(
75 "entities not found: {}",
76 missing.join(", ")
77 ))));
78 }
79 Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83 let config_base = config::ConfigBase::local_from_path(config_path);
84 run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88 config_path: &Path,
89 config_base: config::ConfigBase,
90 options: RunOptions,
91) -> FloeResult<RunOutcome> {
92 let mut runtime = DefaultRuntime::new();
93 run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97 config_path: &Path,
98 config_base: config::ConfigBase,
99 options: RunOptions,
100 runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102 init_thread_pool();
103 let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
104 let profile_vars = options
105 .profile
106 .as_ref()
107 .map(|p| {
108 crate::resolve_vars(crate::VarSources {
109 profile: &p.variables,
110 cli: &std::collections::HashMap::new(),
111 config: &raw_config_env_vars,
112 })
113 })
114 .transpose()?
115 .unwrap_or_default();
116 let validate_options = ValidateOptions {
117 entities: options.entities.clone(),
118 profile_vars: profile_vars.clone(),
119 profile_catalogs: options
120 .profile
121 .as_ref()
122 .and_then(|profile| profile.catalogs.clone()),
123 profile_storages: options
124 .profile
125 .as_ref()
126 .and_then(|profile| profile.storages.clone()),
127 profile_lineage: options
128 .profile
129 .as_ref()
130 .and_then(|profile| profile.lineage.clone()),
131 };
132 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
133 let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
134 if !options.entities.is_empty() {
135 validate_entities(&context.config, &options.entities)?;
136 }
137
138 let observer = default_observer();
139 let perf_enabled = perf::phase_timing_enabled();
140 let selected_entities = select_entities(&context, &options);
141 let resolution_mode = if options.dry_run {
142 io::storage::inputs::ResolveInputsMode::ListOnly
143 } else {
144 io::storage::inputs::ResolveInputsMode::Download
145 };
146 if !options.dry_run {
147 observer.on_event(RunEvent::RunStarted {
148 run_id: context.run_id.clone(),
149 config: context.config_path.display().to_string(),
150 report_base: context.report_base_path.clone(),
151 ts_ms: event_time_ms(),
152 });
153 crate::run::events::mark_run_started();
154 }
155 let resolve_start = perf_enabled.then(Instant::now);
156 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
157 if let Some(start) = resolve_start {
158 perf::emit_perf_log(
159 observer,
160 &context.run_id,
161 None,
162 "perf_run_phase_timings",
163 json!({
164 "phase": "resolve_inputs",
165 "elapsed_ms": start.elapsed().as_millis() as u64,
166 "entity_count": selected_entities.len(),
167 "mode": match resolution_mode {
168 io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
169 io::storage::inputs::ResolveInputsMode::Download => "download",
170 },
171 }),
172 );
173 }
174 if options.dry_run {
175 return create_dry_run_outcome(&context, plans);
176 }
177
178 let mut entity_outcomes = Vec::new();
179 let mut abort_run = false;
180
181 for plan in plans {
182 let entity_name = plan.entity.name.clone();
183 observer.on_event(RunEvent::EntityStarted {
184 run_id: context.run_id.clone(),
185 name: entity_name.clone(),
186 ts_ms: event_time_ms(),
187 });
188 let entity_result = run_entity(&context, runtime, observer, plan);
189 if let Err(err) = entity_result {
190 observer.on_event(RunEvent::EntityFinished {
191 run_id: context.run_id.clone(),
192 name: entity_name,
193 status: "failed".to_string(),
194 files: 0,
195 rows: 0,
196 accepted: 0,
197 rejected: 0,
198 warnings: 0,
199 errors: 0,
200 ts_ms: event_time_ms(),
201 });
202 return Err(err);
203 }
204 let EntityRunResult {
205 outcome,
206 abort_run: aborted,
207 } = entity_result.unwrap();
208 let report = &outcome.report;
209 let (mut status, _) = report::compute_run_outcome(
210 &report
211 .files
212 .iter()
213 .map(|file| file.status)
214 .collect::<Vec<_>>(),
215 );
216 status = upgrade_status_for_warnings(status, report.results.warnings_total);
217 observer.on_event(RunEvent::EntityFinished {
218 run_id: context.run_id.clone(),
219 name: report.entity.name.clone(),
220 status: run_status_str(status).to_string(),
221 files: report.results.files_total,
222 rows: report.results.rows_total,
223 accepted: report.results.accepted_total,
224 rejected: report.results.rejected_total,
225 warnings: report.results.warnings_total,
226 errors: report.results.errors_total,
227 ts_ms: event_time_ms(),
228 });
229 entity_outcomes.push(outcome);
230 abort_run = abort_run || aborted;
231 if abort_run {
232 break;
233 }
234 }
235 let summary = build_run_summary(&context, &entity_outcomes);
236 if let Some(report_target) = &context.report_target {
237 let summary_write_start = perf_enabled.then(Instant::now);
238 write_summary_report(
239 report_target,
240 &context.run_id,
241 &summary,
242 runtime.storage(),
243 &context.storage_resolver,
244 )?;
245 if let Some(start) = summary_write_start {
246 perf::emit_perf_log(
247 observer,
248 &context.run_id,
249 None,
250 "perf_run_phase_timings",
251 json!({
252 "phase": "write_summary_report",
253 "elapsed_ms": start.elapsed().as_millis() as u64,
254 "entity_count": entity_outcomes.len(),
255 }),
256 );
257 }
258 }
259 observer.on_event(RunEvent::RunFinished {
260 run_id: context.run_id.clone(),
261 status: run_status_str(summary.run.status).to_string(),
262 exit_code: summary.run.exit_code,
263 files: summary.results.files_total,
264 rows: summary.results.rows_total,
265 accepted: summary.results.accepted_total,
266 rejected: summary.results.rejected_total,
267 warnings: summary.results.warnings_total,
268 errors: summary.results.errors_total,
269 summary_uri: context.report_target.as_ref().map(|target| {
270 target.join_relative(&report::ReportWriter::summary_relative_path(
271 &context.run_id,
272 ))
273 }),
274 ts_ms: event_time_ms(),
275 });
276
277 Ok(RunOutcome {
278 run_id: context.run_id.clone(),
279 report_base_path: context.report_base_path.clone(),
280 entity_outcomes,
281 summary,
282 dry_run_previews: None,
283 })
284}
285
286fn init_thread_pool() {
287 static INIT: Once = Once::new();
288 INIT.call_once(|| {
289 if std::env::var("RAYON_NUM_THREADS").is_ok() {
290 return;
291 }
292 let cap = std::env::var("FLOE_MAX_THREADS")
293 .ok()
294 .and_then(|value| value.parse::<usize>().ok())
295 .unwrap_or(4);
296 let available = std::thread::available_parallelism()
297 .map(|value| value.get())
298 .unwrap_or(1);
299 let threads = available.min(cap).max(1);
300 let _ = rayon::ThreadPoolBuilder::new()
301 .num_threads(threads)
302 .build_global();
303 });
304}
305
306fn select_entities<'a>(
307 context: &'a RunContext,
308 options: &RunOptions,
309) -> Vec<&'a config::EntityConfig> {
310 if options.entities.is_empty() {
311 context.config.entities.iter().collect()
312 } else {
313 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
314 context
315 .config
316 .entities
317 .iter()
318 .filter(|entity| selected.contains(entity.name.as_str()))
319 .collect()
320 }
321}
322
323fn resolve_entity_plans<'a>(
324 context: &'a RunContext,
325 runtime: &mut dyn Runtime,
326 entities: &[&'a config::EntityConfig],
327 resolution_mode: io::storage::inputs::ResolveInputsMode,
328) -> FloeResult<Vec<EntityRunPlan<'a>>> {
329 let mut plans = Vec::with_capacity(entities.len());
330 for entity in entities {
331 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
332 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
333 let needs_temp = matches!(
334 resolution_mode,
335 io::storage::inputs::ResolveInputsMode::Download
336 ) && (resolved_targets.source.is_remote()
337 || resolved_targets.accepted.is_remote()
338 || resolved_targets
339 .rejected
340 .as_ref()
341 .is_some_and(io::storage::Target::is_remote));
342 let temp_dir = if needs_temp {
343 Some(
344 tempfile::TempDir::new()
345 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
346 )
347 } else {
348 None
349 };
350 let storage_client = Some(runtime.storage().client_for(
351 &context.storage_resolver,
352 resolved_targets.source.storage(),
353 entity,
354 )? as &dyn io::storage::StorageClient);
355 let resolved_inputs = io::storage::ops::resolve_inputs(
356 &context.config_dir,
357 entity,
358 input_adapter,
359 &resolved_targets.source,
360 resolution_mode,
361 temp_dir.as_ref().map(|dir| dir.path()),
362 storage_client,
363 )?;
364 plans.push(EntityRunPlan {
365 entity,
366 resolved_targets,
367 resolved_inputs,
368 temp_dir,
369 });
370 }
371 Ok(plans)
372}
373
374fn build_run_summary(
375 context: &RunContext,
376 entity_outcomes: &[EntityOutcome],
377) -> report::RunSummaryReport {
378 let mut totals = report::ResultsTotals {
379 files_total: 0,
380 rows_total: 0,
381 accepted_total: 0,
382 rejected_total: 0,
383 warnings_total: 0,
384 errors_total: 0,
385 };
386 let mut statuses = Vec::new();
387 let mut entities = Vec::with_capacity(entity_outcomes.len());
388
389 for outcome in entity_outcomes {
390 let report = &outcome.report;
391 totals.files_total += report.results.files_total;
392 totals.rows_total += report.results.rows_total;
393 totals.accepted_total += report.results.accepted_total;
394 totals.rejected_total += report.results.rejected_total;
395 totals.warnings_total += report.results.warnings_total;
396 totals.errors_total += report.results.errors_total;
397
398 let file_statuses = report
399 .files
400 .iter()
401 .map(|file| file.status)
402 .collect::<Vec<_>>();
403 let (mut status, _) = report::compute_run_outcome(&file_statuses);
404 status = upgrade_status_for_warnings(status, report.results.warnings_total);
405 statuses.extend(file_statuses);
406
407 let report_file = context
408 .report_target
409 .as_ref()
410 .map(|target| {
411 target.join_relative(&report::ReportWriter::report_relative_path(
412 &context.run_id,
413 &report.entity.name,
414 ))
415 })
416 .unwrap_or_else(|| "disabled".to_string());
417 entities.push(report::EntitySummary {
418 name: report.entity.name.clone(),
419 status,
420 results: report.results.clone(),
421 report_file,
422 });
423 }
424
425 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
426 status = upgrade_status_for_warnings(status, totals.warnings_total);
427
428 let finished_at = report::now_rfc3339();
429 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
430 let report_base_path = context
431 .report_base_path
432 .clone()
433 .unwrap_or_else(|| "disabled".to_string());
434 let report_file = context
435 .report_target
436 .as_ref()
437 .map(|target| {
438 target.join_relative(&report::ReportWriter::summary_relative_path(
439 &context.run_id,
440 ))
441 })
442 .unwrap_or_else(|| "disabled".to_string());
443
444 report::RunSummaryReport {
445 spec_version: context.config.version.clone(),
446 tool: build_tool_info(),
447 run: report::RunInfo {
448 run_id: context.run_id.clone(),
449 started_at: context.started_at.clone(),
450 finished_at,
451 duration_ms,
452 status,
453 exit_code,
454 },
455 config: build_config_echo(context),
456 report: build_report_echo(report_base_path, report_file),
457 results: totals,
458 entities,
459 }
460}
461
462fn create_dry_run_outcome(
463 context: &RunContext,
464 plans: Vec<EntityRunPlan<'_>>,
465) -> FloeResult<RunOutcome> {
466 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
467
468 for plan in plans {
469 let entity = plan.entity;
470 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
471 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
472 let (archive_path, archive_storage) = entity
473 .sink
474 .archive
475 .as_ref()
476 .map(|a| (a.path.clone(), a.storage.clone()))
477 .unwrap_or_else(|| (String::new(), None));
478
479 let report_file = context.report_target.as_ref().map(|target| {
480 target.join_relative(&report::ReportWriter::report_relative_path(
481 &context.run_id,
482 &entity.name,
483 ))
484 });
485
486 previews.push(DryRunEntityPreview {
487 name: entity.name.clone(),
488 input_path: entity.source.path.clone(),
489 input_format: entity.source.format.clone(),
490 accepted_path: entity.sink.accepted.path.clone(),
491 accepted_format: entity.sink.accepted.format.clone(),
492 rejected_path,
493 rejected_format,
494 archive_path,
495 archive_storage,
496 report_file,
497 scanned_files: plan.resolved_inputs.listed,
498 });
499 }
500
501 Ok(RunOutcome {
502 run_id: context.run_id.clone(),
503 report_base_path: context.report_base_path.clone(),
504 entity_outcomes: Vec::new(),
505 summary: report::RunSummaryReport {
506 spec_version: context.config.version.clone(),
507 tool: build_tool_info(),
508 run: report::RunInfo {
509 run_id: context.run_id.clone(),
510 started_at: context.started_at.clone(),
511 finished_at: report::now_rfc3339(),
512 duration_ms: 0,
513 status: report::RunStatus::Success,
514 exit_code: 0,
515 },
516 config: build_config_echo(context),
517 report: build_report_echo(
518 context
519 .report_base_path
520 .clone()
521 .unwrap_or_else(|| "disabled".to_string()),
522 "disabled (dry-run)".to_string(),
523 ),
524 results: report::ResultsTotals {
525 files_total: 0,
526 rows_total: 0,
527 accepted_total: 0,
528 rejected_total: 0,
529 warnings_total: 0,
530 errors_total: 0,
531 },
532 entities: Vec::new(),
533 },
534 dry_run_previews: Some(previews),
535 })
536}
537
538fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
539 if status == report::RunStatus::Success && warnings > 0 {
540 report::RunStatus::SuccessWithWarnings
541 } else {
542 status
543 }
544}
545
546fn build_tool_info() -> report::ToolInfo {
547 report::ToolInfo {
548 name: "floe".to_string(),
549 version: env!("CARGO_PKG_VERSION").to_string(),
550 git: None,
551 }
552}
553
554fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
555 report::ConfigEcho {
556 path: context.config_path.display().to_string(),
557 version: context.config.version.clone(),
558 metadata: context.config.metadata.as_ref().map(project_metadata_json),
559 }
560}
561
562fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
563 report::ReportEcho { path, report_file }
564}
565
566fn run_status_str(status: report::RunStatus) -> &'static str {
567 match status {
568 report::RunStatus::Success => "success",
569 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
570 report::RunStatus::Rejected => "rejected",
571 report::RunStatus::Aborted => "aborted",
572 report::RunStatus::Failed => "failed",
573 }
574}