1use std::collections::HashSet;
2use std::path::Path;
3use std::sync::Once;
4use std::time::Instant;
5
6use crate::errors::IoError;
7use crate::report::build::project_metadata_json;
8use crate::report::output::write_summary_report;
9use crate::runtime::{DefaultRuntime, Runtime};
10use crate::{config, io, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
11
12mod context;
13pub(crate) mod entity;
14pub mod events;
15mod file;
16mod output;
17mod perf;
18
19pub(crate) use context::RunContext;
20use entity::{run_entity, EntityRunResult, ResolvedEntityTargets};
21use events::{default_observer, event_time_ms, RunEvent};
22use serde_json::json;
23
24pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
25
26pub(crate) struct EntityRunPlan<'a> {
27 pub(crate) entity: &'a config::EntityConfig,
28 pub(crate) resolved_targets: ResolvedEntityTargets,
29 pub(crate) resolved_inputs: io::storage::inputs::ResolvedInputs,
30 pub(crate) temp_dir: Option<tempfile::TempDir>,
31}
32
33#[derive(Debug, Clone)]
34pub struct RunOutcome {
35 pub run_id: String,
36 pub report_base_path: Option<String>,
37 pub entity_outcomes: Vec<EntityOutcome>,
38 pub summary: report::RunSummaryReport,
39 pub dry_run_previews: Option<Vec<DryRunEntityPreview>>,
40}
41
42#[derive(Debug, Clone)]
43pub struct DryRunEntityPreview {
44 pub name: String,
45 pub input_path: String,
46 pub input_format: String,
47 pub accepted_path: String,
48 pub accepted_format: String,
49 pub rejected_path: Option<String>,
50 pub rejected_format: Option<String>,
51 pub archive_path: String,
52 pub archive_storage: Option<String>,
53 pub report_file: Option<String>,
54 pub scanned_files: Vec<String>,
55}
56
57#[derive(Debug, Clone)]
58pub struct EntityOutcome {
59 pub report: crate::report::RunReport,
60 pub file_timings_ms: Vec<Option<u64>>,
61}
62
63pub(crate) fn validate_entities(
64 config: &config::RootConfig,
65 selected: &[String],
66) -> FloeResult<()> {
67 let missing: Vec<String> = selected
68 .iter()
69 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
70 .cloned()
71 .collect();
72
73 if !missing.is_empty() {
74 return Err(Box::new(ConfigError(format!(
75 "entities not found: {}",
76 missing.join(", ")
77 ))));
78 }
79 Ok(())
80}
81
82pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
83 let config_base = config::ConfigBase::local_from_path(config_path);
84 run_with_base(config_path, config_base, options)
85}
86
87pub fn run_with_base(
88 config_path: &Path,
89 config_base: config::ConfigBase,
90 options: RunOptions,
91) -> FloeResult<RunOutcome> {
92 let mut runtime = DefaultRuntime::new();
93 run_with_runtime(config_path, config_base, options, &mut runtime)
94}
95
96pub fn run_with_manifest_path(manifest_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
97 let mut runtime = DefaultRuntime::new();
98 run_with_manifest_runtime(manifest_path, options, &mut runtime)
99}
100
101pub(crate) fn run_with_manifest_runtime(
102 manifest_path: &Path,
103 options: RunOptions,
104 runtime: &mut dyn Runtime,
105) -> FloeResult<RunOutcome> {
106 init_thread_pool();
107 let manifest_str = manifest_path.to_string_lossy();
108 let location = config::resolve_config_location(&manifest_str)?;
109 let json = std::fs::read_to_string(&location.path)?;
110 let (config, report_base_uri) = crate::manifest::config_from_manifest_json(&json)?;
111 let config_base = location.base.clone();
112 if !options.entities.is_empty() {
113 validate_entities(&config, &options.entities)?;
114 }
115 let context = RunContext::from_config(
116 config,
117 config_base,
118 manifest_path,
119 &report_base_uri,
120 &options,
121 )?;
122 run_from_context(context, options, runtime)
123}
124
125pub fn run_with_runtime(
126 config_path: &Path,
127 config_base: config::ConfigBase,
128 options: RunOptions,
129 runtime: &mut dyn Runtime,
130) -> FloeResult<RunOutcome> {
131 init_thread_pool();
132 let raw_config_env_vars = config::extract_raw_env_vars(config_path).unwrap_or_default();
133 let profile_vars = options
134 .profile
135 .as_ref()
136 .map(|p| {
137 crate::resolve_vars(crate::VarSources {
138 profile: &p.variables,
139 cli: &std::collections::HashMap::new(),
140 config: &raw_config_env_vars,
141 })
142 })
143 .transpose()?
144 .unwrap_or_default();
145 let validate_options = ValidateOptions {
146 entities: options.entities.clone(),
147 profile_vars: profile_vars.clone(),
148 profile_catalogs: options
149 .profile
150 .as_ref()
151 .and_then(|profile| profile.catalogs.clone()),
152 profile_storages: options
153 .profile
154 .as_ref()
155 .and_then(|profile| profile.storages.clone()),
156 profile_lineage: options
157 .profile
158 .as_ref()
159 .and_then(|profile| profile.lineage.clone()),
160 };
161 crate::validate_with_base(config_path, config_base.clone(), validate_options)?;
162 let context = RunContext::new(config_path, config_base, &options, profile_vars)?;
163 if !options.entities.is_empty() {
164 validate_entities(&context.config, &options.entities)?;
165 }
166
167 run_from_context(context, options, runtime)
168}
169
170fn run_from_context(
171 context: RunContext,
172 options: RunOptions,
173 runtime: &mut dyn Runtime,
174) -> FloeResult<RunOutcome> {
175 let observer = default_observer();
176 let perf_enabled = perf::phase_timing_enabled();
177 let selected_entities = select_entities(&context, &options);
178 if options.full_refresh {
179 for entity in &selected_entities {
180 if matches!(
181 entity.sink.write_mode,
182 config::WriteMode::MergeScd1 | config::WriteMode::MergeScd2
183 ) {
184 return Err(Box::new(ConfigError(format!(
185 "entity '{}': --full-refresh is not supported with write_mode '{}'",
186 entity.name,
187 entity.sink.write_mode.as_str()
188 ))));
189 }
190 }
191 }
192 let resolution_mode = if options.dry_run {
193 io::storage::inputs::ResolveInputsMode::ListOnly
194 } else {
195 io::storage::inputs::ResolveInputsMode::Download
196 };
197 if !options.dry_run {
198 observer.on_event(RunEvent::RunStarted {
199 run_id: context.run_id.clone(),
200 config: context.config_path.display().to_string(),
201 report_base: context.report_base_path.clone(),
202 ts_ms: event_time_ms(),
203 });
204 crate::run::events::mark_run_started();
205 }
206 let resolve_start = perf_enabled.then(Instant::now);
207 let plans = resolve_entity_plans(&context, runtime, &selected_entities, resolution_mode)?;
208 if let Some(start) = resolve_start {
209 perf::emit_perf_log(
210 observer,
211 &context.run_id,
212 None,
213 "perf_run_phase_timings",
214 json!({
215 "phase": "resolve_inputs",
216 "elapsed_ms": start.elapsed().as_millis() as u64,
217 "entity_count": selected_entities.len(),
218 "mode": match resolution_mode {
219 io::storage::inputs::ResolveInputsMode::ListOnly => "list_only",
220 io::storage::inputs::ResolveInputsMode::Download => "download",
221 },
222 }),
223 );
224 }
225 if options.dry_run {
226 return create_dry_run_outcome(&context, plans);
227 }
228
229 let mut entity_outcomes = Vec::new();
230 let mut abort_run = false;
231
232 for plan in plans {
233 let entity_name = plan.entity.name.clone();
234 observer.on_event(RunEvent::EntityStarted {
235 run_id: context.run_id.clone(),
236 name: entity_name.clone(),
237 ts_ms: event_time_ms(),
238 });
239 let entity_result = run_entity(&context, runtime, observer, plan);
240 if let Err(err) = entity_result {
241 observer.on_event(RunEvent::EntityFinished {
242 run_id: context.run_id.clone(),
243 name: entity_name,
244 status: "failed".to_string(),
245 files: 0,
246 files_skipped: 0,
247 rows: 0,
248 accepted: 0,
249 rejected: 0,
250 warnings: 0,
251 errors: 0,
252 ts_ms: event_time_ms(),
253 });
254 return Err(err);
255 }
256 let EntityRunResult {
257 outcome,
258 abort_run: aborted,
259 } = entity_result.unwrap();
260 let report = &outcome.report;
261 let (mut status, _) = report::compute_run_outcome(
262 &report
263 .files
264 .iter()
265 .map(|file| file.status)
266 .collect::<Vec<_>>(),
267 );
268 status = upgrade_status_for_warnings(status, report.results.warnings_total);
269 observer.on_event(RunEvent::EntityFinished {
270 run_id: context.run_id.clone(),
271 name: report.entity.name.clone(),
272 status: run_status_str(status).to_string(),
273 files: report.results.files_total,
274 files_skipped: report.results.files_skipped,
275 rows: report.results.rows_total,
276 accepted: report.results.accepted_total,
277 rejected: report.results.rejected_total,
278 warnings: report.results.warnings_total,
279 errors: report.results.errors_total,
280 ts_ms: event_time_ms(),
281 });
282 entity_outcomes.push(outcome);
283 abort_run = abort_run || aborted;
284 if abort_run {
285 break;
286 }
287 }
288 let summary = build_run_summary(&context, &entity_outcomes);
289 let written_summary_uri = if let Some(report_target) = &context.report_target {
290 let summary_write_start = perf_enabled.then(Instant::now);
291 let uri = write_summary_report(
292 report_target,
293 &context.run_id,
294 &summary,
295 runtime.storage(),
296 &context.storage_resolver,
297 )?;
298 if let Some(start) = summary_write_start {
299 perf::emit_perf_log(
300 observer,
301 &context.run_id,
302 None,
303 "perf_run_phase_timings",
304 json!({
305 "phase": "write_summary_report",
306 "elapsed_ms": start.elapsed().as_millis() as u64,
307 "entity_count": entity_outcomes.len(),
308 }),
309 );
310 }
311 Some(uri)
312 } else {
313 None
314 };
315 let entity_report_uris: std::collections::HashMap<String, String> = entity_outcomes
316 .iter()
317 .filter_map(|outcome| {
318 let uri = context.report_target.as_ref().map(|t| {
319 t.join_relative(&report::ReportWriter::report_relative_path(
320 &context.run_id,
321 &outcome.report.entity.name,
322 ))
323 })?;
324 Some((outcome.report.entity.name.clone(), uri))
325 })
326 .collect();
327 observer.on_event(RunEvent::RunFinished {
328 run_id: context.run_id.clone(),
329 status: run_status_str(summary.run.status).to_string(),
330 exit_code: summary.run.exit_code,
331 files: summary.results.files_total,
332 files_skipped: summary.results.files_skipped,
333 rows: summary.results.rows_total,
334 accepted: summary.results.accepted_total,
335 rejected: summary.results.rejected_total,
336 warnings: summary.results.warnings_total,
337 errors: summary.results.errors_total,
338 summary_uri: written_summary_uri,
339 report_base: context.report_base_path.clone(),
340 entity_report_uris,
341 ts_ms: event_time_ms(),
342 });
343
344 Ok(RunOutcome {
345 run_id: context.run_id.clone(),
346 report_base_path: context.report_base_path.clone(),
347 entity_outcomes,
348 summary,
349 dry_run_previews: None,
350 })
351}
352
353fn init_thread_pool() {
354 static INIT: Once = Once::new();
355 INIT.call_once(|| {
356 if std::env::var("RAYON_NUM_THREADS").is_ok() {
357 return;
358 }
359 let cap = std::env::var("FLOE_MAX_THREADS")
360 .ok()
361 .and_then(|value| value.parse::<usize>().ok())
362 .unwrap_or(4);
363 let available = std::thread::available_parallelism()
364 .map(|value| value.get())
365 .unwrap_or(1);
366 let threads = available.min(cap).max(1);
367 let _ = rayon::ThreadPoolBuilder::new()
368 .num_threads(threads)
369 .build_global();
370 });
371}
372
373fn select_entities<'a>(
374 context: &'a RunContext,
375 options: &RunOptions,
376) -> Vec<&'a config::EntityConfig> {
377 if options.entities.is_empty() {
378 context.config.entities.iter().collect()
379 } else {
380 let selected: HashSet<&str> = options.entities.iter().map(String::as_str).collect();
381 context
382 .config
383 .entities
384 .iter()
385 .filter(|entity| selected.contains(entity.name.as_str()))
386 .collect()
387 }
388}
389
390fn resolve_entity_plans<'a>(
391 context: &'a RunContext,
392 runtime: &mut dyn Runtime,
393 entities: &[&'a config::EntityConfig],
394 resolution_mode: io::storage::inputs::ResolveInputsMode,
395) -> FloeResult<Vec<EntityRunPlan<'a>>> {
396 let mut plans = Vec::with_capacity(entities.len());
397 for entity in entities {
398 let input_adapter = runtime.input_adapter(entity.source.format.as_str())?;
399 let resolved_targets = entity::resolve_entity_targets(&context.storage_resolver, entity)?;
400 let needs_temp = matches!(
401 resolution_mode,
402 io::storage::inputs::ResolveInputsMode::Download
403 ) && (resolved_targets.source.is_remote()
404 || resolved_targets.accepted.is_remote()
405 || resolved_targets
406 .rejected
407 .as_ref()
408 .is_some_and(io::storage::Target::is_remote));
409 let temp_dir = if needs_temp {
410 Some(
411 tempfile::TempDir::new()
412 .map_err(|err| Box::new(IoError(format!("tempdir failed: {err}"))))?,
413 )
414 } else {
415 None
416 };
417 let storage_client = Some(runtime.storage().client_for(
418 &context.storage_resolver,
419 resolved_targets.source.storage(),
420 entity,
421 )? as &dyn io::storage::StorageClient);
422 let resolved_inputs = io::storage::ops::resolve_inputs(
423 &context.config_dir,
424 entity,
425 input_adapter,
426 &resolved_targets.source,
427 resolution_mode,
428 temp_dir.as_ref().map(|dir| dir.path()),
429 storage_client,
430 )?;
431 plans.push(EntityRunPlan {
432 entity,
433 resolved_targets,
434 resolved_inputs,
435 temp_dir,
436 });
437 }
438 Ok(plans)
439}
440
441fn build_run_summary(
442 context: &RunContext,
443 entity_outcomes: &[EntityOutcome],
444) -> report::RunSummaryReport {
445 let mut totals = report::ResultsTotals {
446 files_total: 0,
447 files_skipped: 0,
448 rows_total: 0,
449 accepted_total: 0,
450 rejected_total: 0,
451 warnings_total: 0,
452 errors_total: 0,
453 };
454 let mut statuses = Vec::new();
455 let mut entities = Vec::with_capacity(entity_outcomes.len());
456
457 for outcome in entity_outcomes {
458 let report = &outcome.report;
459 totals.files_total += report.results.files_total;
460 totals.files_skipped += report.results.files_skipped;
461 totals.rows_total += report.results.rows_total;
462 totals.accepted_total += report.results.accepted_total;
463 totals.rejected_total += report.results.rejected_total;
464 totals.warnings_total += report.results.warnings_total;
465 totals.errors_total += report.results.errors_total;
466
467 let file_statuses = report
468 .files
469 .iter()
470 .map(|file| file.status)
471 .collect::<Vec<_>>();
472 let (mut status, _) = report::compute_run_outcome(&file_statuses);
473 status = upgrade_status_for_warnings(status, report.results.warnings_total);
474 statuses.extend(file_statuses);
475
476 let report_file = context
477 .report_target
478 .as_ref()
479 .map(|target| {
480 target.join_relative(&report::ReportWriter::report_relative_path(
481 &context.run_id,
482 &report.entity.name,
483 ))
484 })
485 .unwrap_or_else(|| "disabled".to_string());
486 entities.push(report::EntitySummary {
487 name: report.entity.name.clone(),
488 status,
489 results: report.results.clone(),
490 report_file,
491 });
492 }
493
494 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
495 status = upgrade_status_for_warnings(status, totals.warnings_total);
496
497 let finished_at = report::now_rfc3339();
498 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
499 let report_base_path = context
500 .report_base_path
501 .clone()
502 .unwrap_or_else(|| "disabled".to_string());
503 let report_file = context
504 .report_target
505 .as_ref()
506 .map(|target| {
507 target.join_relative(&report::ReportWriter::summary_relative_path(
508 &context.run_id,
509 ))
510 })
511 .unwrap_or_else(|| "disabled".to_string());
512
513 report::RunSummaryReport {
514 spec_version: context.config.version.clone(),
515 tool: build_tool_info(),
516 run: report::RunInfo {
517 run_id: context.run_id.clone(),
518 started_at: context.started_at.clone(),
519 finished_at,
520 duration_ms,
521 status,
522 exit_code,
523 },
524 config: build_config_echo(context),
525 report: build_report_echo(report_base_path, report_file),
526 results: totals,
527 entities,
528 }
529}
530
531fn create_dry_run_outcome(
532 context: &RunContext,
533 plans: Vec<EntityRunPlan<'_>>,
534) -> FloeResult<RunOutcome> {
535 let mut previews: Vec<DryRunEntityPreview> = Vec::new();
536
537 for plan in plans {
538 let entity = plan.entity;
539 let rejected_path = entity.sink.rejected.as_ref().map(|r| r.path.clone());
540 let rejected_format = entity.sink.rejected.as_ref().map(|r| r.format.clone());
541 let (archive_path, archive_storage) = entity
542 .sink
543 .archive
544 .as_ref()
545 .map(|a| (a.path.clone(), a.storage.clone()))
546 .unwrap_or_else(|| (String::new(), None));
547
548 let report_file = context.report_target.as_ref().map(|target| {
549 target.join_relative(&report::ReportWriter::report_relative_path(
550 &context.run_id,
551 &entity.name,
552 ))
553 });
554
555 previews.push(DryRunEntityPreview {
556 name: entity.name.clone(),
557 input_path: entity.source.path.clone(),
558 input_format: entity.source.format.clone(),
559 accepted_path: entity.sink.accepted.path.clone(),
560 accepted_format: entity.sink.accepted.format.clone(),
561 rejected_path,
562 rejected_format,
563 archive_path,
564 archive_storage,
565 report_file,
566 scanned_files: plan.resolved_inputs.listed,
567 });
568 }
569
570 Ok(RunOutcome {
571 run_id: context.run_id.clone(),
572 report_base_path: context.report_base_path.clone(),
573 entity_outcomes: Vec::new(),
574 summary: report::RunSummaryReport {
575 spec_version: context.config.version.clone(),
576 tool: build_tool_info(),
577 run: report::RunInfo {
578 run_id: context.run_id.clone(),
579 started_at: context.started_at.clone(),
580 finished_at: report::now_rfc3339(),
581 duration_ms: 0,
582 status: report::RunStatus::Success,
583 exit_code: 0,
584 },
585 config: build_config_echo(context),
586 report: build_report_echo(
587 context
588 .report_base_path
589 .clone()
590 .unwrap_or_else(|| "disabled".to_string()),
591 "disabled (dry-run)".to_string(),
592 ),
593 results: report::ResultsTotals {
594 files_total: 0,
595 files_skipped: 0,
596 rows_total: 0,
597 accepted_total: 0,
598 rejected_total: 0,
599 warnings_total: 0,
600 errors_total: 0,
601 },
602 entities: Vec::new(),
603 },
604 dry_run_previews: Some(previews),
605 })
606}
607
608fn upgrade_status_for_warnings(status: report::RunStatus, warnings: u64) -> report::RunStatus {
609 if status == report::RunStatus::Success && warnings > 0 {
610 report::RunStatus::SuccessWithWarnings
611 } else {
612 status
613 }
614}
615
616fn build_tool_info() -> report::ToolInfo {
617 report::ToolInfo {
618 name: "floe".to_string(),
619 version: env!("CARGO_PKG_VERSION").to_string(),
620 git: None,
621 }
622}
623
624fn build_config_echo(context: &RunContext) -> report::ConfigEcho {
625 report::ConfigEcho {
626 path: context.config_path.display().to_string(),
627 version: context.config.version.clone(),
628 metadata: context.config.metadata.as_ref().map(project_metadata_json),
629 }
630}
631
632fn build_report_echo(path: String, report_file: String) -> report::ReportEcho {
633 report::ReportEcho { path, report_file }
634}
635
636fn run_status_str(status: report::RunStatus) -> &'static str {
637 match status {
638 report::RunStatus::Success => "success",
639 report::RunStatus::SuccessWithWarnings => "success_with_warnings",
640 report::RunStatus::Rejected => "rejected",
641 report::RunStatus::Aborted => "aborted",
642 report::RunStatus::Failed => "failed",
643 }
644}