1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27 pub(crate) entity: &'a config::EntityConfig,
28 pub(crate) resolved_targets: ResolvedEntityTargets,
29 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30 pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35 pub run_id: String,
36 pub report_base_path: Option<String>,
37 pub entity_outcomes: Vec<EntityOutcome>,
38 pub summary: report::RunSummaryReport,
39 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44 pub name: String,
45 pub input_path: String,
46 pub input_format: String,
47 pub accepted_path: String,
48 pub accepted_format: String,
49 pub rejected_path: Option<String>,
50 pub rejected_format: Option<String>,
51 pub archive_path: String,
52 pub archive_storage: Option<String>,
53 pub report_file: Option<String>,
54 pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59 pub report: crate::report::RunReport,
60 pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64 config: &config::RootConfig,
65 selected: &[String],
66) -> FloeResult<()> {
67 let missing: Vec<String> = selected
68 .iter()
69 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70 .cloned()
71 .collect();
72
73 if !missing.is_empty() {
74 return Err(Box::new(ConfigError(format!(
75 "entities not found: {}",
76 missing.join(", ")
77 ))));
78 }
79 Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83 let config_base = config::ConfigBase::local_from_path(config_path);
84 run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88 config_path: &Path,
89 config_base: config::ConfigBase,
90 options: RunOptions,
91) -> FloeResult<RunOutcome> {
92 let mut runtime = DefaultRuntime::new();
93 run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_runtime(
97 config_path: &Path,
98 config_base: config::ConfigBase,
99 options: RunOptions,
100 runtime: &mut dyn Runtime,
101) -> FloeResult<RunOutcome> {
102 init_thread_pool();
103 let validate_options = ValidateOptions {
104 entities: options.entities.clone(),
105 };
106 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
107 let context = RunContext::new(config_path, config_base, &options)?;
108 if !options.entities.is_empty() {
109 validate_entities(&context.config, &options.entities)?;
110 }
111
112 let observer = default_observer();
113 let perf_enabled = perf::phase_timing_enabled();
114 let selected_entities = select_entities(&context, &options);
115 let resolution_mode = if options.dry_run {
116 io::storage::inputs::ResolveInputsMode::ListOnly
117 } else {
118 io::storage::inputs::ResolveInputsMode::Download
119 };
120 if !options.dry_run {
121 observer.on_event(RunEvent::RunStarted {
122 run_id: context.run_id.clone(),
123 config: context.config_path.display().to_string(),
124 report_base: context.report_base_path.clone(),
125 ts_ms: event_time_ms(),
126 });
127 }
128 let resolve_start = perf_enabled.then(Instant::now);
129 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
130 if let Some(start) = resolve_start {
131 perf::emit_perf_log(
132 observer,
133 &context.run_id,
134 None,
135 "perf_run_phase_timings",
136 json!({
137 "phase": "resolve_inputs",
138 "elapsed_ms": start.elapsed().as_millis() as u64,
139 "entity_count": selected_entities.len(),
140 "mode": match resolution_mode {
141 io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
142 io::storage::inputs::ResolveInputsMode::Download => "download",
143 },
144 }),
145 );
146 }
147 if options.dry_run {
148 return create_dry_run_outcome(&context, plans);
149 }
150
151 let mut entity_outcomes = Vec::new();
152 let mut abort_run = false;
153
154 for plan in plans {
155 observer.on_event(RunEvent::EntityStarted {
156 run_id: context.run_id.clone(),
157 name: plan.entity.name.clone(),
158 ts_ms: event_time_ms(),
159 });
160 let EntityRunResult {
161 outcome,
162 abort_run: aborted,
163 } = run_entity(&context, runtime, observer, plan)?;
164 let report = &outcome.report;
165 let (mut status, _) = report::compute_run_outcome(
166 &report
167 .files
168 .iter()
169 .map(|file| file.status)
170 .collect::<Vec<_>>(),
171 );
172 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
173 status = report::RunStatus::SuccessWithWarnings;
174 }
175 observer.on_event(RunEvent::EntityFinished {
176 run_id: context.run_id.clone(),
177 name: report.entity.name.clone(),
178 status: run_status_str(status).to_string(),
179 files: report.results.files_total,
180 rows: report.results.rows_total,
181 accepted: report.results.accepted_total,
182 rejected: report.results.rejected_total,
183 warnings: report.results.warnings_total,
184 errors: report.results.errors_total,
185 ts_ms: event_time_ms(),
186 });
187 entity_outcomes.push(outcome);
188 abort_run = abort_run || aborted;
189 if abort_run {
190 break;
191 }
192 }
193 let summary = build_run_summary(&context, &entity_outcomes);
194 if let Some(report_target) = &context.report_target {
195 let summary_write_start = perf_enabled.then(Instant::now);
196 write_summary_report(
197 report_target,
198 &context.run_id,
199 &summary,
200 runtime.storage(),
201 &context.storage_resolver,
202 )?;
203 if let Some(start) = summary_write_start {
204 perf::emit_perf_log(
205 observer,
206 &context.run_id,
207 None,
208 "perf_run_phase_timings",
209 json!({
210 "phase": "write_summary_report",
211 "elapsed_ms": start.elapsed().as_millis() as u64,
212 "entity_count": entity_outcomes.len(),
213 }),
214 );
215 }
216 }
217 observer.on_event(RunEvent::RunFinished {
218 run_id: context.run_id.clone(),
219 status: run_status_str(summary.run.status).to_string(),
220 exit_code: summary.run.exit_code,
221 files: summary.results.files_total,
222 rows: summary.results.rows_total,
223 accepted: summary.results.accepted_total,
224 rejected: summary.results.rejected_total,
225 warnings: summary.results.warnings_total,
226 errors: summary.results.errors_total,
227 summary_uri: context.report_target.as_ref().map(|target| {
228 target.join_relative(&report::ReportWriter::summary_relative_path(
229 &context.run_id,
230 ))
231 }),
232 ts_ms: event_time_ms(),
233 });
234
235 Ok(RunOutcome {
236 run_id: context.run_id.clone(),
237 report_base_path: context.report_base_path.clone(),
238 entity_outcomes,
239 summary,
240 dry_run_previews: None,
241 })
242}
243
244fn init_thread_pool() {
245 static INIT: Once = Once::new();
246 INIT.call_once(|| {
247 if std::env::var("RAYON_NUM_THREADS").is_ok() {
248 return;
249 }
250 let cap = std::env::var("FLOE_MAX_THREADS")
251 .ok()
252 .and_then(|value| value.parse::<usize>().ok())
253 .unwrap_or(4);
254 let available = std::thread::available_parallelism()
255 .map(|value| value.get())
256 .unwrap_or(1);
257 let threads = available.min(cap).max(1);
258 let _ = rayon::ThreadPoolBuilder::new()
259 .num_threads(threads)
260 .build_global();
261 });
262}
263
264fn select_entities<'a>(
265 context: &'a RunContext,
266 options: &RunOptions,
267) -> Vec<&'a config::EntityConfig> {
268 if options.entities.is_empty() {
269 context.config.entities.iter().collect()
270 } else {
271 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
272 context
273 .config
274 .entities
275 .iter()
276 .filter(|entity| selected.contains(entity.name.as_str()))
277 .collect()
278 }
279}
280
281fn resolve_entity_plans<'a>(
282 context: &'a RunContext,
283 runtime: &mut dyn Runtime,
284 entities: &[&'a config::EntityConfig],
285 resolution_mode: io::storage::inputs::ResolveInputsMode,
286) -> FloeResult<Vec<EntityRunPlan<'a>>> {
287 let mut plans = Vec::with_capacity(entities.len());
288 for entity in entities {
289 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
290 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
291 let needs_temp = matches!(
292 resolution_mode,
293 io::storage::inputs::ResolveInputsMode::Download
294 ) && (resolved_targets.source.is_remote()
295 || resolved_targets.accepted.is_remote()
296 || resolved_targets
297 .rejected
298 .as_ref()
299 .is_some_and(io::storage::Target::is_remote));
300 let temp_dir = if needs_temp {
301 Some(
302 tempfile::TempDir::new()
303 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
304 )
305 } else {
306 None
307 };
308 let storage_client = Some(runtime.storage().client_for(
309 &context.storage_resolver,
310 resolved_targets.source.storage(),
311 entity,
312 )? as &dyn io::storage::StorageClient);
313 let resolved_inputs = io::storage::ops::resolve_inputs(
314 &context.config_dir,
315 entity,
316 input_adapter,
317 &resolved_targets.source,
318 resolution_mode,
319 temp_dir.as_ref().map(|dir| dir.path()),
320 storage_client,
321 )?;
322 plans.push(EntityRunPlan {
323 entity,
324 resolved_targets,
325 resolved_inputs,
326 temp_dir,
327 });
328 }
329 Ok(plans)
330}
331
332fn build_run_summary(
333 context: &RunContext,
334 entity_outcomes: &[EntityOutcome],
335) -> report::RunSummaryReport {
336 let mut totals = report::ResultsTotals {
337 files_total: 0,
338 rows_total: 0,
339 accepted_total: 0,
340 rejected_total: 0,
341 warnings_total: 0,
342 errors_total: 0,
343 };
344 let mut statuses = Vec::new();
345 let mut entities = Vec::with_capacity(entity_outcomes.len());
346
347 for outcome in entity_outcomes {
348 let report = &outcome.report;
349 totals.files_total += report.results.files_total;
350 totals.rows_total += report.results.rows_total;
351 totals.accepted_total += report.results.accepted_total;
352 totals.rejected_total += report.results.rejected_total;
353 totals.warnings_total += report.results.warnings_total;
354 totals.errors_total += report.results.errors_total;
355
356 let file_statuses = report
357 .files
358 .iter()
359 .map(|file| file.status)
360 .collect::<Vec<_>>();
361 let (mut status, _) = report::compute_run_outcome(&file_statuses);
362 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
363 status = report::RunStatus::SuccessWithWarnings;
364 }
365 statuses.extend(file_statuses);
366
367 let report_file = context
368 .report_target
369 .as_ref()
370 .map(|target| {
371 target.join_relative(&report::ReportWriter::report_relative_path(
372 &context.run_id,
373 &report.entity.name,
374 ))
375 })
376 .unwrap_or_else(|| "disabled".to_string());
377 entities.push(report::EntitySummary {
378 name: report.entity.name.clone(),
379 status,
380 results: report.results.clone(),
381 report_file,
382 });
383 }
384
385 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
386 if status == report::RunStatus::Success && totals.warnings_total > 0 {
387 status = report::RunStatus::SuccessWithWarnings;
388 }
389
390 let finished_at = report::now_rfc3339();
391 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
392 let report_base_path = context
393 .report_base_path
394 .clone()
395 .unwrap_or_else(|| "disabled".to_string());
396 let report_file = context
397 .report_target
398 .as_ref()
399 .map(|target| {
400 target.join_relative(&report::ReportWriter::summary_relative_path(
401 &context.run_id,
402 ))
403 })
404 .unwrap_or_else(|| "disabled".to_string());
405
406 report::RunSummaryReport {
407 spec_version: context.config.version.clone(),
408 tool: report::ToolInfo {
409 name: "floe".to_string(),
410 version: env!("CARGO_PKG_VERSION").to_string(),
411 git: None,
412 },
413 run: report::RunInfo {
414 run_id: context.run_id.clone(),
415 started_at: context.started_at.clone(),
416 finished_at,
417 duration_ms,
418 status,
419 exit_code,
420 },
421 config: report::ConfigEcho {
422 path: context.config_path.display().to_string(),
423 version: context.config.version.clone(),
424 metadata: context.config.metadata.as_ref().map(project_metadata_json),
425 },
426 report: report::ReportEcho {
427 path: report_base_path,
428 report_file,
429 },
430 results: totals,
431 entities,
432 }
433}
434
435fn create_dry_run_outcome(
436 context: &RunContext,
437 plans: Vec<EntityRunPlan<'_>>,
438) -> FloeResult<RunOutcome> {
439 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
440
441 for plan in plans {
442 let entity = plan.entity;
443 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
444 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
445 let (archive_path, archive_storage) = entity
446 .sink
447 .archive
448 .as_ref()
449 .map(|a| (a.path.clone(), a.storage.clone()))
450 .unwrap_or_else(|| (String::new(), None));
451
452 let report_file = context.report_target.as_ref().map(|target| {
453 target.join_relative(&report::ReportWriter::report_relative_path(
454 &context.run_id,
455 &entity.name,
456 ))
457 });
458
459 previews.push(DryRunEntityPreview {
460 name: entity.name.clone(),
461 input_path: entity.source.path.clone(),
462 input_format: entity.source.format.clone(),
463 accepted_path: entity.sink.accepted.path.clone(),
464 accepted_format: entity.sink.accepted.format.clone(),
465 rejected_path,
466 rejected_format,
467 archive_path,
468 archive_storage,
469 report_file,
470 scanned_files: plan.resolved_inputs.listed,
471 });
472 }
473
474 Ok(RunOutcome {
475 run_id: context.run_id.clone(),
476 report_base_path: context.report_base_path.clone(),
477 entity_outcomes: Vec::new(),
478 summary: report::RunSummaryReport {
479 spec_version: context.config.version.clone(),
480 tool: report::ToolInfo {
481 name: "floe".to_string(),
482 version: env!("CARGO_PKG_VERSION").to_string(),
483 git: None,
484 },
485 run: report::RunInfo {
486 run_id: context.run_id.clone(),
487 started_at: context.started_at.clone(),
488 finished_at: report::now_rfc3339(),
489 duration_ms: 0,
490 status: report::RunStatus::Success,
491 exit_code: 0,
492 },
493 config: report::ConfigEcho {
494 path: context.config_path.display().to_string(),
495 version: context.config.version.clone(),
496 metadata: context.config.metadata.as_ref().map(project_metadata_json),
497 },
498 report: report::ReportEcho {
499 path: context
500 .report_base_path
501 .clone()
502 .unwrap_or_else(|| "disabled".to_string()),
503 report_file: "disabled (dry-run)".to_string(),
504 },
505 results: report::ResultsTotals {
506 files_total: 0,
507 rows_total: 0,
508 accepted_total: 0,
509 rejected_total: 0,
510 warnings_total: 0,
511 errors_total: 0,
512 },
513 entities: Vec::new(),
514 },
515 dry_run_previews: Some(previews),
516 })
517}
518
519fn run_status_str(status: report::RunStatus) -> &'static str {
520 match status {
521 report::RunStatus::Success => "success",
522 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
523 report::RunStatus::Rejected => "rejected",
524 report::RunStatus::Aborted => "aborted",
525 report::RunStatus::Failed => "failed",
526 }
527}