Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12    pub spec_version: String,
13    pub entity: EntityEcho,
14    pub source: SourceEcho,
15    pub sink: SinkEcho,
16    pub policy: PolicyEcho,
17    pub accepted_output: AcceptedOutputSummary,
18    pub results: ResultsTotals,
19    pub files: Vec<FileReport>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub struct RunSummaryReport {
25    pub spec_version: String,
26    pub tool: ToolInfo,
27    pub run: RunInfo,
28    pub config: ConfigEcho,
29    pub report: ReportEcho,
30    pub results: ResultsTotals,
31    pub entities: Vec<EntitySummary>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub struct EntitySummary {
37    pub name: String,
38    pub status: RunStatus,
39    pub results: ResultsTotals,
40    pub report_file: String,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub struct ToolInfo {
46    pub name: String,
47    pub version: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub git: Option<GitInfo>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub struct GitInfo {
55    pub commit: String,
56    pub dirty: bool,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub struct RunInfo {
62    pub run_id: String,
63    pub started_at: String,
64    pub finished_at: String,
65    pub duration_ms: u64,
66    pub status: RunStatus,
67    pub exit_code: i32,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub struct ConfigEcho {
73    pub path: String,
74    pub version: String,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub metadata: Option<serde_json::Value>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub struct EntityEcho {
82    pub name: String,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub struct SourceEcho {
90    pub format: String,
91    pub path: String,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub options: Option<serde_json::Value>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub cast_mode: Option<String>,
96    pub read_plan: SourceReadPlan,
97    pub resolved_inputs: ResolvedInputs,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub struct ResolvedInputs {
103    pub mode: ResolvedInputMode,
104    pub file_count: u64,
105    pub files: Vec<String>,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "snake_case")]
110pub struct SinkEcho {
111    pub accepted: SinkTargetEcho,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub rejected: Option<SinkTargetEcho>,
114    pub archive: SinkArchiveEcho,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "snake_case")]
119pub struct AcceptedOutputSummary {
120    pub path: String,
121    pub accepted_rows: u64,
122    pub parts_written: u64,
123    #[serde(default)]
124    #[serde(skip_serializing_if = "Vec::is_empty")]
125    pub part_files: Vec<String>,
126    #[serde(default)]
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub table_version: Option<i64>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132#[serde(rename_all = "snake_case")]
133pub struct SinkTargetEcho {
134    pub format: String,
135    pub path: String,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub struct ReportEcho {
141    pub path: String,
142    pub report_file: String,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "snake_case")]
147pub struct SinkArchiveEcho {
148    pub enabled: bool,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub path: Option<String>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub struct PolicyEcho {
156    pub severity: Severity,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(rename_all = "snake_case")]
161pub struct ResultsTotals {
162    pub files_total: u64,
163    pub rows_total: u64,
164    pub accepted_total: u64,
165    pub rejected_total: u64,
166    pub warnings_total: u64,
167    pub errors_total: u64,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(rename_all = "snake_case")]
172pub struct FileReport {
173    pub input_file: String,
174    pub status: FileStatus,
175    pub row_count: u64,
176    pub accepted_count: u64,
177    pub rejected_count: u64,
178    pub mismatch: FileMismatch,
179    pub output: FileOutput,
180    pub validation: FileValidation,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
184#[serde(rename_all = "snake_case")]
185pub struct FileMismatch {
186    pub declared_columns_count: u64,
187    pub input_columns_count: u64,
188    pub missing_columns: Vec<String>,
189    pub extra_columns: Vec<String>,
190    pub mismatch_action: MismatchAction,
191    #[serde(skip_serializing_if = "Option::is_none")]
192    pub error: Option<MismatchIssue>,
193    #[serde(skip_serializing_if = "Option::is_none")]
194    pub warning: Option<String>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198#[serde(rename_all = "snake_case")]
199pub struct MismatchIssue {
200    pub rule: String,
201    pub message: String,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205#[serde(rename_all = "snake_case")]
206pub struct FileOutput {
207    pub accepted_path: Option<String>,
208    pub rejected_path: Option<String>,
209    pub errors_path: Option<String>,
210    pub archived_path: Option<String>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214#[serde(rename_all = "snake_case")]
215pub struct FileValidation {
216    pub errors: u64,
217    pub warnings: u64,
218    pub rules: Vec<RuleSummary>,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
222#[serde(rename_all = "snake_case")]
223pub struct RuleSummary {
224    pub rule: RuleName,
225    pub severity: Severity,
226    pub violations: u64,
227    pub columns: Vec<ColumnSummary>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
231#[serde(rename_all = "snake_case")]
232pub struct ColumnSummary {
233    pub column: String,
234    pub violations: u64,
235    #[serde(skip_serializing_if = "Option::is_none")]
236    pub target_type: Option<String>,
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(rename_all = "snake_case")]
241pub enum FileStatus {
242    Success,
243    Rejected,
244    Aborted,
245    Failed,
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
249#[serde(rename_all = "snake_case")]
250pub enum RunStatus {
251    Success,
252    SuccessWithWarnings,
253    Rejected,
254    Aborted,
255    Failed,
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
259#[serde(rename_all = "snake_case")]
260pub enum Severity {
261    Warn,
262    Reject,
263    Abort,
264}
265
266#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
267#[serde(rename_all = "snake_case")]
268pub enum RuleName {
269    NotNull,
270    CastError,
271    Unique,
272    SchemaError,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
276#[serde(rename_all = "snake_case")]
277pub enum MismatchAction {
278    None,
279    FilledNulls,
280    IgnoredExtras,
281    RejectedFile,
282    Aborted,
283}
284
285#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
286#[serde(rename_all = "snake_case")]
287pub enum ResolvedInputMode {
288    Directory,
289    File,
290    Glob,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
294#[serde(rename_all = "snake_case")]
295pub enum SourceReadPlan {
296    RawAndTyped,
297}
298
299#[derive(Debug)]
300pub enum ReportError {
301    Io(std::io::Error),
302    Serialize(serde_json::Error),
303}
304
305impl std::fmt::Display for ReportError {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        match self {
308            ReportError::Io(err) => write!(f, "report io error: {err}"),
309            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
310        }
311    }
312}
313
314impl std::error::Error for ReportError {}
315
316impl From<std::io::Error> for ReportError {
317    fn from(err: std::io::Error) -> Self {
318        Self::Io(err)
319    }
320}
321
322impl From<serde_json::Error> for ReportError {
323    fn from(err: serde_json::Error) -> Self {
324        Self::Serialize(err)
325    }
326}
327
328pub fn now_rfc3339() -> String {
329    OffsetDateTime::now_utc()
330        .format(&Rfc3339)
331        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
332}
333
334pub fn run_id_from_timestamp(timestamp: &str) -> String {
335    timestamp.replace(':', "-")
336}
337
338pub struct ReportWriter;
339
340impl ReportWriter {
341    pub fn run_dir_name(run_id: &str) -> String {
342        format!("run_{run_id}")
343    }
344
345    pub fn report_file_name() -> String {
346        "run.json".to_string()
347    }
348
349    pub fn summary_file_name() -> String {
350        "run.summary.json".to_string()
351    }
352
353    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
354        report_dir
355            .join(Self::run_dir_name(run_id))
356            .join(entity_name)
357    }
358
359    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
360        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
361    }
362
363    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
364        report_dir
365            .join(Self::run_dir_name(run_id))
366            .join(Self::summary_file_name())
367    }
368
369    pub fn write_report(
370        report_dir: &Path,
371        run_id: &str,
372        entity_name: &str,
373        report: &RunReport,
374    ) -> Result<PathBuf, ReportError> {
375        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
376        std::fs::create_dir_all(&entity_dir)?;
377        let report_path = Self::report_path(report_dir, run_id, entity_name);
378        let tmp_path = entity_dir.join(format!(
379            "{}.tmp-{}",
380            Self::report_file_name(),
381            unique_suffix()
382        ));
383
384        let json = serde_json::to_string_pretty(report)?;
385        let mut file = File::create(&tmp_path)?;
386        file.write_all(json.as_bytes())?;
387        file.sync_all()?;
388        std::fs::rename(&tmp_path, &report_path)?;
389
390        Ok(report_path)
391    }
392
393    pub fn write_summary(
394        report_dir: &Path,
395        run_id: &str,
396        report: &RunSummaryReport,
397    ) -> Result<PathBuf, ReportError> {
398        let run_dir = report_dir.join(Self::run_dir_name(run_id));
399        std::fs::create_dir_all(&run_dir)?;
400        let report_path = Self::summary_path(report_dir, run_id);
401        let tmp_path = run_dir.join(format!(
402            "{}.tmp-{}",
403            Self::summary_file_name(),
404            unique_suffix()
405        ));
406
407        let json = serde_json::to_string_pretty(report)?;
408        let mut file = File::create(&tmp_path)?;
409        file.write_all(json.as_bytes())?;
410        file.sync_all()?;
411        std::fs::rename(&tmp_path, &report_path)?;
412
413        Ok(report_path)
414    }
415}
416
417pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
418    if file_statuses.contains(&FileStatus::Failed) {
419        return (RunStatus::Failed, 1);
420    }
421    if file_statuses.contains(&FileStatus::Aborted) {
422        return (RunStatus::Aborted, 2);
423    }
424    if file_statuses.contains(&FileStatus::Rejected) {
425        return (RunStatus::Rejected, 0);
426    }
427    (RunStatus::Success, 0)
428}
429
430fn unique_suffix() -> String {
431    let nanos = SystemTime::now()
432        .duration_since(UNIX_EPOCH)
433        .map(|duration| duration.as_nanos())
434        .unwrap_or(0);
435    format!("{}-{}", std::process::id(), nanos)
436}