1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27 pub(crate) entity: &'a config::EntityConfig,
28 pub(crate) resolved_targets: ResolvedEntityTargets,
29 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30 pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35 pub run_id: String,
36 pub report_base_path: Option<String>,
37 pub entity_outcomes: Vec<EntityOutcome>,
38 pub summary: report::RunSummaryReport,
39 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44 pub name: String,
45 pub input_path: String,
46 pub input_format: String,
47 pub accepted_path: String,
48 pub accepted_format: String,
49 pub rejected_path: Option<String>,
50 pub rejected_format: Option<String>,
51 pub archive_path: String,
52 pub archive_storage: Option<String>,
53 pub report_file: Option<String>,
54 pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59 pub report: crate::report::RunReport,
60 pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64 config: &config::RootConfig,
65 selected: &[String],
66) -> FloeResult<()> {
67 let missing: Vec<String> = selected
68 .iter()
69 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70 .cloned()
71 .collect();
72
73 if !missing.is_empty() {
74 return Err(Box::new(ConfigError(format!(
75 "entities not found: {}",
76 missing.join(", ")
77 ))));
78 }
79 Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83 let config_base = config::ConfigBase::local_from_path(config_path);
84 run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88 config_path: &Path,
89 config_base: config::ConfigBase,
90 options: RunOptions,
91) -> FloeResult<RunOutcome> {
92 let mut runtime = DefaultRuntime::new();
93 run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_manifest_path(manifest_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
97 let mut runtime = DefaultRuntime::new();
98 run_with_manifest_runtime(manifest_path, options, &mut runtime)
99}
100
101pub(crate) fn run_with_manifest_runtime(
102 manifest_path: &Path,
103 options: RunOptions,
104 runtime: &mut dyn Runtime,
105) -> FloeResult<RunOutcome> {
106 init_thread_pool();
107 let json = std::fs::read_to_string(manifest_path)?;
108 let (config, report_base_uri) = crate::manifest::config_from_manifest_json(&json)?;
109 let config_base = config::ConfigBase::local_from_path(manifest_path);
110 if !options.entities.is_empty() {
111 validate_entities(&config, &options.entities)?;
112 }
113 let context = RunContext::from_config(
114 config,
115 config_base,
116 manifest_path,
117 &report_base_uri,
118 &options,
119 )?;
120 run_from_context(context, options, runtime)
121}
122
123pub fn run_with_runtime(
124 config_path: &Path,
125 config_base: config::ConfigBase,
126 options: RunOptions,
127 runtime: &mut dyn Runtime,
128) -> FloeResult<RunOutcome> {
129 init_thread_pool();
130 let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
131 let profile_vars = options
132 .profile
133 .as_ref()
134 .map(|p| {
135 crate::resolve_vars(crate::VarSources {
136 profile: &p.variables,
137 cli: &std::collections::HashMap::new(),
138 config: &raw_config_env_vars,
139 })
140 })
141 .transpose()?
142 .unwrap_or_default();
143 let validate_options = ValidateOptions {
144 entities: options.entities.clone(),
145 profile_vars: profile_vars.clone(),
146 profile_catalogs: options
147 .profile
148 .as_ref()
149 .and_then(|profile| profile.catalogs.clone()),
150 profile_storages: options
151 .profile
152 .as_ref()
153 .and_then(|profile| profile.storages.clone()),
154 profile_lineage: options
155 .profile
156 .as_ref()
157 .and_then(|profile| profile.lineage.clone()),
158 };
159 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
160 let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
161 if !options.entities.is_empty() {
162 validate_entities(&context.config, &options.entities)?;
163 }
164
165 run_from_context(context, options, runtime)
166}
167
168fn run_from_context(
169 context: RunContext,
170 options: RunOptions,
171 runtime: &mut dyn Runtime,
172) -> FloeResult<RunOutcome> {
173 let observer = default_observer();
174 let perf_enabled = perf::phase_timing_enabled();
175 let selected_entities = select_entities(&context, &options);
176 let resolution_mode = if options.dry_run {
177 io::storage::inputs::ResolveInputsMode::ListOnly
178 } else {
179 io::storage::inputs::ResolveInputsMode::Download
180 };
181 if !options.dry_run {
182 observer.on_event(RunEvent::RunStarted {
183 run_id: context.run_id.clone(),
184 config: context.config_path.display().to_string(),
185 report_base: context.report_base_path.clone(),
186 ts_ms: event_time_ms(),
187 });
188 crate::run::events::mark_run_started();
189 }
190 let resolve_start = perf_enabled.then(Instant::now);
191 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
192 if let Some(start) = resolve_start {
193 perf::emit_perf_log(
194 observer,
195 &context.run_id,
196 None,
197 "perf_run_phase_timings",
198 json!({
199 "phase": "resolve_inputs",
200 "elapsed_ms": start.elapsed().as_millis() as u64,
201 "entity_count": selected_entities.len(),
202 "mode": match resolution_mode {
203 io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
204 io::storage::inputs::ResolveInputsMode::Download => "download",
205 },
206 }),
207 );
208 }
209 if options.dry_run {
210 return create_dry_run_outcome(&context, plans);
211 }
212
213 let mut entity_outcomes = Vec::new();
214 let mut abort_run = false;
215
216 for plan in plans {
217 let entity_name = plan.entity.name.clone();
218 observer.on_event(RunEvent::EntityStarted {
219 run_id: context.run_id.clone(),
220 name: entity_name.clone(),
221 ts_ms: event_time_ms(),
222 });
223 let entity_result = run_entity(&context, runtime, observer, plan);
224 if let Err(err) = entity_result {
225 observer.on_event(RunEvent::EntityFinished {
226 run_id: context.run_id.clone(),
227 name: entity_name,
228 status: "failed".to_string(),
229 files: 0,
230 files_skipped: 0,
231 rows: 0,
232 accepted: 0,
233 rejected: 0,
234 warnings: 0,
235 errors: 0,
236 ts_ms: event_time_ms(),
237 });
238 return Err(err);
239 }
240 let EntityRunResult {
241 outcome,
242 abort_run: aborted,
243 } = entity_result.unwrap();
244 let report = &outcome.report;
245 let (mut status, _) = report::compute_run_outcome(
246 &report
247 .files
248 .iter()
249 .map(|file| file.status)
250 .collect::<Vec<_>>(),
251 );
252 status = upgrade_status_for_warnings(status, report.results.warnings_total);
253 observer.on_event(RunEvent::EntityFinished {
254 run_id: context.run_id.clone(),
255 name: report.entity.name.clone(),
256 status: run_status_str(status).to_string(),
257 files: report.results.files_total,
258 files_skipped: report.results.files_skipped,
259 rows: report.results.rows_total,
260 accepted: report.results.accepted_total,
261 rejected: report.results.rejected_total,
262 warnings: report.results.warnings_total,
263 errors: report.results.errors_total,
264 ts_ms: event_time_ms(),
265 });
266 entity_outcomes.push(outcome);
267 abort_run = abort_run || aborted;
268 if abort_run {
269 break;
270 }
271 }
272 let summary = build_run_summary(&context, &entity_outcomes);
273 if let Some(report_target) = &context.report_target {
274 let summary_write_start = perf_enabled.then(Instant::now);
275 write_summary_report(
276 report_target,
277 &context.run_id,
278 &summary,
279 runtime.storage(),
280 &context.storage_resolver,
281 )?;
282 if let Some(start) = summary_write_start {
283 perf::emit_perf_log(
284 observer,
285 &context.run_id,
286 None,
287 "perf_run_phase_timings",
288 json!({
289 "phase": "write_summary_report",
290 "elapsed_ms": start.elapsed().as_millis() as u64,
291 "entity_count": entity_outcomes.len(),
292 }),
293 );
294 }
295 }
296 observer.on_event(RunEvent::RunFinished {
297 run_id: context.run_id.clone(),
298 status: run_status_str(summary.run.status).to_string(),
299 exit_code: summary.run.exit_code,
300 files: summary.results.files_total,
301 files_skipped: summary.results.files_skipped,
302 rows: summary.results.rows_total,
303 accepted: summary.results.accepted_total,
304 rejected: summary.results.rejected_total,
305 warnings: summary.results.warnings_total,
306 errors: summary.results.errors_total,
307 summary_uri: context.report_target.as_ref().map(|target| {
308 target.join_relative(&report::ReportWriter::summary_relative_path(
309 &context.run_id,
310 ))
311 }),
312 ts_ms: event_time_ms(),
313 });
314
315 Ok(RunOutcome {
316 run_id: context.run_id.clone(),
317 report_base_path: context.report_base_path.clone(),
318 entity_outcomes,
319 summary,
320 dry_run_previews: None,
321 })
322}
323
324fn init_thread_pool() {
325 static INIT: Once = Once::new();
326 INIT.call_once(|| {
327 if std::env::var("RAYON_NUM_THREADS").is_ok() {
328 return;
329 }
330 let cap = std::env::var("FLOE_MAX_THREADS")
331 .ok()
332 .and_then(|value| value.parse::<usize>().ok())
333 .unwrap_or(4);
334 let available = std::thread::available_parallelism()
335 .map(|value| value.get())
336 .unwrap_or(1);
337 let threads = available.min(cap).max(1);
338 let _ = rayon::ThreadPoolBuilder::new()
339 .num_threads(threads)
340 .build_global();
341 });
342}
343
344fn select_entities<'a>(
345 context: &'a RunContext,
346 options: &RunOptions,
347) -> Vec<&'a config::EntityConfig> {
348 if options.entities.is_empty() {
349 context.config.entities.iter().collect()
350 } else {
351 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
352 context
353 .config
354 .entities
355 .iter()
356 .filter(|entity| selected.contains(entity.name.as_str()))
357 .collect()
358 }
359}
360
361fn resolve_entity_plans<'a>(
362 context: &'a RunContext,
363 runtime: &mut dyn Runtime,
364 entities: &[&'a config::EntityConfig],
365 resolution_mode: io::storage::inputs::ResolveInputsMode,
366) -> FloeResult<Vec<EntityRunPlan<'a>>> {
367 let mut plans = Vec::with_capacity(entities.len());
368 for entity in entities {
369 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
370 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
371 let needs_temp = matches!(
372 resolution_mode,
373 io::storage::inputs::ResolveInputsMode::Download
374 ) && (resolved_targets.source.is_remote()
375 || resolved_targets.accepted.is_remote()
376 || resolved_targets
377 .rejected
378 .as_ref()
379 .is_some_and(io::storage::Target::is_remote));
380 let temp_dir = if needs_temp {
381 Some(
382 tempfile::TempDir::new()
383 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
384 )
385 } else {
386 None
387 };
388 let storage_client = Some(runtime.storage().client_for(
389 &context.storage_resolver,
390 resolved_targets.source.storage(),
391 entity,
392 )? as &dyn io::storage::StorageClient);
393 let resolved_inputs = io::storage::ops::resolve_inputs(
394 &context.config_dir,
395 entity,
396 input_adapter,
397 &resolved_targets.source,
398 resolution_mode,
399 temp_dir.as_ref().map(|dir| dir.path()),
400 storage_client,
401 )?;
402 plans.push(EntityRunPlan {
403 entity,
404 resolved_targets,
405 resolved_inputs,
406 temp_dir,
407 });
408 }
409 Ok(plans)
410}
411
412fn build_run_summary(
413 context: &RunContext,
414 entity_outcomes: &[EntityOutcome],
415) -> report::RunSummaryReport {
416 let mut totals = report::ResultsTotals {
417 files_total: 0,
418 files_skipped: 0,
419 rows_total: 0,
420 accepted_total: 0,
421 rejected_total: 0,
422 warnings_total: 0,
423 errors_total: 0,
424 };
425 let mut statuses = Vec::new();
426 let mut entities = Vec::with_capacity(entity_outcomes.len());
427
428 for outcome in entity_outcomes {
429 let report = &outcome.report;
430 totals.files_total += report.results.files_total;
431 totals.files_skipped += report.results.files_skipped;
432 totals.rows_total += report.results.rows_total;
433 totals.accepted_total += report.results.accepted_total;
434 totals.rejected_total += report.results.rejected_total;
435 totals.warnings_total += report.results.warnings_total;
436 totals.errors_total += report.results.errors_total;
437
438 let file_statuses = report
439 .files
440 .iter()
441 .map(|file| file.status)
442 .collect::<Vec<_>>();
443 let (mut status, _) = report::compute_run_outcome(&file_statuses);
444 status = upgrade_status_for_warnings(status, report.results.warnings_total);
445 statuses.extend(file_statuses);
446
447 let report_file = context
448 .report_target
449 .as_ref()
450 .map(|target| {
451 target.join_relative(&report::ReportWriter::report_relative_path(
452 &context.run_id,
453 &report.entity.name,
454 ))
455 })
456 .unwrap_or_else(|| "disabled".to_string());
457 entities.push(report::EntitySummary {
458 name: report.entity.name.clone(),
459 status,
460 results: report.results.clone(),
461 report_file,
462 });
463 }
464
465 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
466 status = upgrade_status_for_warnings(status, totals.warnings_total);
467
468 let finished_at = report::now_rfc3339();
469 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
470 let report_base_path = context
471 .report_base_path
472 .clone()
473 .unwrap_or_else(|| "disabled".to_string());
474 let report_file = context
475 .report_target
476 .as_ref()
477 .map(|target| {
478 target.join_relative(&report::ReportWriter::summary_relative_path(
479 &context.run_id,
480 ))
481 })
482 .unwrap_or_else(|| "disabled".to_string());
483
484 report::RunSummaryReport {
485 spec_version: context.config.version.clone(),
486 tool: build_tool_info(),
487 run: report::RunInfo {
488 run_id: context.run_id.clone(),
489 started_at: context.started_at.clone(),
490 finished_at,
491 duration_ms,
492 status,
493 exit_code,
494 },
495 config: build_config_echo(context),
496 report: build_report_echo(report_base_path, report_file),
497 results: totals,
498 entities,
499 }
500}
501
502fn create_dry_run_outcome(
503 context: &RunContext,
504 plans: Vec<EntityRunPlan<'_>>,
505) -> FloeResult<RunOutcome> {
506 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
507
508 for plan in plans {
509 let entity = plan.entity;
510 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
511 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
512 let (archive_path, archive_storage) = entity
513 .sink
514 .archive
515 .as_ref()
516 .map(|a| (a.path.clone(), a.storage.clone()))
517 .unwrap_or_else(|| (String::new(), None));
518
519 let report_file = context.report_target.as_ref().map(|target| {
520 target.join_relative(&report::ReportWriter::report_relative_path(
521 &context.run_id,
522 &entity.name,
523 ))
524 });
525
526 previews.push(DryRunEntityPreview {
527 name: entity.name.clone(),
528 input_path: entity.source.path.clone(),
529 input_format: entity.source.format.clone(),
530 accepted_path: entity.sink.accepted.path.clone(),
531 accepted_format: entity.sink.accepted.format.clone(),
532 rejected_path,
533 rejected_format,
534 archive_path,
535 archive_storage,
536 report_file,
537 scanned_files: plan.resolved_inputs.listed,
538 });
539 }
540
541 Ok(RunOutcome {
542 run_id: context.run_id.clone(),
543 report_base_path: context.report_base_path.clone(),
544 entity_outcomes: Vec::new(),
545 summary: report::RunSummaryReport {
546 spec_version: context.config.version.clone(),
547 tool: build_tool_info(),
548 run: report::RunInfo {
549 run_id: context.run_id.clone(),
550 started_at: context.started_at.clone(),
551 finished_at: report::now_rfc3339(),
552 duration_ms: 0,
553 status: report::RunStatus::Success,
554 exit_code: 0,
555 },
556 config: build_config_echo(context),
557 report: build_report_echo(
558 context
559 .report_base_path
560 .clone()
561 .unwrap_or_else(|| "disabled".to_string()),
562 "disabled (dry-run)".to_string(),
563 ),
564 results: report::ResultsTotals {
565 files_total: 0,
566 files_skipped: 0,
567 rows_total: 0,
568 accepted_total: 0,
569 rejected_total: 0,
570 warnings_total: 0,
571 errors_total: 0,
572 },
573 entities: Vec::new(),
574 },
575 dry_run_previews: Some(previews),
576 })
577}
578
579fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
580 if status == report::RunStatus::Success && warnings > 0 {
581 report::RunStatus::SuccessWithWarnings
582 } else {
583 status
584 }
585}
586
587fn build_tool_info() -> report::ToolInfo {
588 report::ToolInfo {
589 name: "floe".to_string(),
590 version: env!("CARGO_PKG_VERSION").to_string(),
591 git: None,
592 }
593}
594
595fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
596 report::ConfigEcho {
597 path: context.config_path.display().to_string(),
598 version: context.config.version.clone(),
599 metadata: context.config.metadata.as_ref().map(project_metadata_json),
600 }
601}
602
603fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
604 report::ReportEcho { path, report_file }
605}
606
607fn run_status_str(status: report::RunStatus) -> &'static str {
608 match status {
609 report::RunStatus::Success => "success",
610 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
611 report::RunStatus::Rejected => "rejected",
612 report::RunStatus::Aborted => "aborted",
613 report::RunStatus::Failed => "failed",
614 }
615}