1use std::path::Path;
2
3use crate::io::storage::CloudClient;
4use crate::run::reporting::project_metadata_json;
5use crate::{config, report, ConfigError, FloeResult, RunOptions, ValidateOptions};
6
7mod context;
8pub(crate) mod entity;
9mod file;
10pub(crate) mod normalize;
11mod output;
12mod reporting;
13
14use context::RunContext;
15use entity::{run_entity, EntityRunResult};
16
17pub(super) const MAX_RESOLVED_INPUTS: usize = 50;
18
19#[derive(Debug, Clone)]
20pub struct RunOutcome {
21 pub run_id: String,
22 pub report_base_path: Option<String>,
23 pub entity_outcomes: Vec<EntityOutcome>,
24 pub summary: report::RunSummaryReport,
25}
26
27#[derive(Debug, Clone)]
28pub struct EntityOutcome {
29 pub report: crate::report::RunReport,
30 pub file_timings_ms: Vec<Option<u64>>,
31}
32
33pub(crate) fn validate_entities(
34 config: &config::RootConfig,
35 selected: &[String],
36) -> FloeResult<()> {
37 let missing: Vec<String> = selected
38 .iter()
39 .filter(|name| !config.entities.iter().any(|entity| &entity.name == *name))
40 .cloned()
41 .collect();
42
43 if !missing.is_empty() {
44 return Err(Box::new(ConfigError(format!(
45 "entities not found: {}",
46 missing.join(", ")
47 ))));
48 }
49 Ok(())
50}
51
52pub fn run(config_path: &Path, options: RunOptions) -> FloeResult<RunOutcome> {
53 let validate_options = ValidateOptions {
54 entities: options.entities.clone(),
55 };
56 crate::validate(config_path, validate_options)?;
57
58 let context = RunContext::new(config_path, &options)?;
59 if !options.entities.is_empty() {
60 validate_entities(&context.config, &options.entities)?;
61 }
62
63 let mut entity_outcomes = Vec::new();
64 let mut abort_run = false;
65 let mut cloud = CloudClient::new();
66 for entity in &context.config.entities {
67 let EntityRunResult {
68 outcome,
69 abort_run: aborted,
70 } = run_entity(&context, &mut cloud, entity)?;
71 entity_outcomes.push(outcome);
72 abort_run = abort_run || aborted;
73 if abort_run {
74 break;
75 }
76 }
77
78 let summary = build_run_summary(&context, &entity_outcomes);
79 if let Some(report_dir) = &context.report_dir {
80 report::ReportWriter::write_summary(report_dir, &context.run_id, &summary)?;
81 }
82
83 Ok(RunOutcome {
84 run_id: context.run_id.clone(),
85 report_base_path: context.report_base_path.clone(),
86 entity_outcomes,
87 summary,
88 })
89}
90
91fn build_run_summary(
92 context: &RunContext,
93 entity_outcomes: &[EntityOutcome],
94) -> report::RunSummaryReport {
95 let mut totals = report::ResultsTotals {
96 files_total: 0,
97 rows_total: 0,
98 accepted_total: 0,
99 rejected_total: 0,
100 warnings_total: 0,
101 errors_total: 0,
102 };
103 let mut statuses = Vec::new();
104 let mut entities = Vec::with_capacity(entity_outcomes.len());
105
106 for outcome in entity_outcomes {
107 let report = &outcome.report;
108 totals.files_total += report.results.files_total;
109 totals.rows_total += report.results.rows_total;
110 totals.accepted_total += report.results.accepted_total;
111 totals.rejected_total += report.results.rejected_total;
112 totals.warnings_total += report.results.warnings_total;
113 totals.errors_total += report.results.errors_total;
114
115 let file_statuses = report
116 .files
117 .iter()
118 .map(|file| file.status)
119 .collect::<Vec<_>>();
120 let (mut status, _) = report::compute_run_outcome(&file_statuses);
121 if status == report::RunStatus::Success && report.results.warnings_total > 0 {
122 status = report::RunStatus::SuccessWithWarnings;
123 }
124 statuses.extend(file_statuses);
125
126 let report_file = context
127 .report_dir
128 .as_ref()
129 .map(|dir| report::ReportWriter::report_path(dir, &context.run_id, &report.entity.name))
130 .map(|path| path.display().to_string())
131 .unwrap_or_else(|| "disabled".to_string());
132 entities.push(report::EntitySummary {
133 name: report.entity.name.clone(),
134 status,
135 results: report.results.clone(),
136 report_file,
137 });
138 }
139
140 let (mut status, exit_code) = report::compute_run_outcome(&statuses);
141 if status == report::RunStatus::Success && totals.warnings_total > 0 {
142 status = report::RunStatus::SuccessWithWarnings;
143 }
144
145 let finished_at = report::now_rfc3339();
146 let duration_ms = context.run_timer.elapsed().as_millis() as u64;
147 let report_base_path = context
148 .report_base_path
149 .clone()
150 .unwrap_or_else(|| "disabled".to_string());
151 let report_file = context
152 .report_dir
153 .as_ref()
154 .map(|dir| report::ReportWriter::summary_path(dir, &context.run_id))
155 .map(|path| path.display().to_string())
156 .unwrap_or_else(|| "disabled".to_string());
157
158 report::RunSummaryReport {
159 spec_version: context.config.version.clone(),
160 tool: report::ToolInfo {
161 name: "floe".to_string(),
162 version: env!("CARGO_PKG_VERSION").to_string(),
163 git: None,
164 },
165 run: report::RunInfo {
166 run_id: context.run_id.clone(),
167 started_at: context.started_at.clone(),
168 finished_at,
169 duration_ms,
170 status,
171 exit_code,
172 },
173 config: report::ConfigEcho {
174 path: context.config_path.display().to_string(),
175 version: context.config.version.clone(),
176 metadata: context.config.metadata.as_ref().map(project_metadata_json),
177 },
178 report: report::ReportEcho {
179 path: report_base_path,
180 report_file,
181 },
182 results: totals,
183 entities,
184 }
185}