Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    pub results: ResultsTotals,
23    pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29    pub spec_version: String,
30    pub tool: ToolInfo,
31    pub run: RunInfo,
32    pub config: ConfigEcho,
33    pub report: ReportEcho,
34    pub results: ResultsTotals,
35    pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41    pub name: String,
42    pub status: RunStatus,
43    pub results: ResultsTotals,
44    pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50    pub name: String,
51    pub version: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59    pub commit: String,
60    pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66    pub run_id: String,
67    pub started_at: String,
68    pub finished_at: String,
69    pub duration_ms: u64,
70    pub status: RunStatus,
71    pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77    pub path: String,
78    pub version: String,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86    pub name: String,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94    pub format: String,
95    pub path: String,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub options: Option<serde_json::Value>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub cast_mode: Option<String>,
100    pub read_plan: SourceReadPlan,
101    pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107    pub mode: ResolvedInputMode,
108    pub file_count: u64,
109    pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115    pub accepted: SinkTargetEcho,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub rejected: Option<SinkTargetEcho>,
118    pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124    pub path: String,
125    pub accepted_rows: u64,
126    pub parts_written: u64,
127    #[serde(default)]
128    #[serde(skip_serializing_if = "Vec::is_empty")]
129    pub part_files: Vec<String>,
130    #[serde(default)]
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub table_version: Option<i64>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub struct SinkTargetEcho {
138    pub format: String,
139    pub path: String,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(rename_all = "snake_case")]
144pub struct ReportEcho {
145    pub path: String,
146    pub report_file: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150#[serde(rename_all = "snake_case")]
151pub struct SinkArchiveEcho {
152    pub enabled: bool,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub path: Option<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub struct PolicyEcho {
160    pub severity: Severity,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(rename_all = "snake_case")]
165pub struct ResultsTotals {
166    pub files_total: u64,
167    pub rows_total: u64,
168    pub accepted_total: u64,
169    pub rejected_total: u64,
170    pub warnings_total: u64,
171    pub errors_total: u64,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct FileReport {
177    pub input_file: String,
178    pub status: FileStatus,
179    pub row_count: u64,
180    pub accepted_count: u64,
181    pub rejected_count: u64,
182    pub mismatch: FileMismatch,
183    pub output: FileOutput,
184    pub validation: FileValidation,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188#[serde(rename_all = "snake_case")]
189pub struct FileMismatch {
190    pub declared_columns_count: u64,
191    pub input_columns_count: u64,
192    pub missing_columns: Vec<String>,
193    pub extra_columns: Vec<String>,
194    pub mismatch_action: MismatchAction,
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub error: Option<MismatchIssue>,
197    #[serde(skip_serializing_if = "Option::is_none")]
198    pub warning: Option<String>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub struct MismatchIssue {
204    pub rule: String,
205    pub message: String,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct FileOutput {
211    pub accepted_path: Option<String>,
212    pub rejected_path: Option<String>,
213    pub errors_path: Option<String>,
214    pub archived_path: Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "snake_case")]
219pub struct FileValidation {
220    pub errors: u64,
221    pub warnings: u64,
222    pub rules: Vec<RuleSummary>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(rename_all = "snake_case")]
227pub struct RuleSummary {
228    pub rule: RuleName,
229    pub severity: Severity,
230    pub violations: u64,
231    pub columns: Vec<ColumnSummary>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235#[serde(rename_all = "snake_case")]
236pub struct ColumnSummary {
237    pub column: String,
238    pub violations: u64,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub target_type: Option<String>,
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub source: Option<String>,
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
246#[serde(rename_all = "snake_case")]
247pub enum FileStatus {
248    Success,
249    Rejected,
250    Aborted,
251    Failed,
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
255#[serde(rename_all = "snake_case")]
256pub enum RunStatus {
257    Success,
258    SuccessWithWarnings,
259    Rejected,
260    Aborted,
261    Failed,
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
265#[serde(rename_all = "snake_case")]
266pub enum Severity {
267    Warn,
268    Reject,
269    Abort,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
273#[serde(rename_all = "snake_case")]
274pub enum RuleName {
275    NotNull,
276    CastError,
277    Unique,
278    SchemaError,
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub enum MismatchAction {
284    None,
285    FilledNulls,
286    IgnoredExtras,
287    RejectedFile,
288    Aborted,
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
292#[serde(rename_all = "snake_case")]
293pub enum ResolvedInputMode {
294    Directory,
295    File,
296    Glob,
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
300#[serde(rename_all = "snake_case")]
301pub enum SourceReadPlan {
302    RawAndTyped,
303}
304
305#[derive(Debug)]
306pub enum ReportError {
307    Io(std::io::Error),
308    Serialize(serde_json::Error),
309}
310
311impl std::fmt::Display for ReportError {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        match self {
314            ReportError::Io(err) => write!(f, "report io error: {err}"),
315            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
316        }
317    }
318}
319
320impl std::error::Error for ReportError {}
321
322impl From<std::io::Error> for ReportError {
323    fn from(err: std::io::Error) -> Self {
324        Self::Io(err)
325    }
326}
327
328impl From<serde_json::Error> for ReportError {
329    fn from(err: serde_json::Error) -> Self {
330        Self::Serialize(err)
331    }
332}
333
334pub trait ReportFormatter {
335    fn format_name(&self) -> &'static str;
336    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
337    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
338}
339
340pub struct JsonReportFormatter;
341
342impl ReportFormatter for JsonReportFormatter {
343    fn format_name(&self) -> &'static str {
344        "json"
345    }
346
347    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
348        Ok(serde_json::to_string_pretty(report)?)
349    }
350
351    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
352        Ok(serde_json::to_string_pretty(report)?)
353    }
354}
355
356pub fn now_rfc3339() -> String {
357    OffsetDateTime::now_utc()
358        .format(&Rfc3339)
359        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
360}
361
362pub fn run_id_from_timestamp(timestamp: &str) -> String {
363    timestamp.replace(':', "-")
364}
365
366pub struct ReportWriter;
367
368impl ReportWriter {
369    pub fn run_dir_name(run_id: &str) -> String {
370        format!("run_{run_id}")
371    }
372
373    pub fn report_file_name() -> String {
374        "run.json".to_string()
375    }
376
377    pub fn summary_file_name() -> String {
378        "run.summary.json".to_string()
379    }
380
381    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
382        format!(
383            "{}/{}/{}",
384            Self::run_dir_name(run_id),
385            entity_name,
386            Self::report_file_name()
387        )
388    }
389
390    pub fn summary_relative_path(run_id: &str) -> String {
391        format!(
392            "{}/{}",
393            Self::run_dir_name(run_id),
394            Self::summary_file_name()
395        )
396    }
397
398    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
399        report_dir
400            .join(Self::run_dir_name(run_id))
401            .join(entity_name)
402    }
403
404    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
405        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
406    }
407
408    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
409        report_dir
410            .join(Self::run_dir_name(run_id))
411            .join(Self::summary_file_name())
412    }
413
414    pub fn write_report(
415        report_dir: &Path,
416        run_id: &str,
417        entity_name: &str,
418        report: &RunReport,
419    ) -> Result<PathBuf, ReportError> {
420        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
421        std::fs::create_dir_all(&entity_dir)?;
422        let report_path = Self::report_path(report_dir, run_id, entity_name);
423        let tmp_path = entity_dir.join(format!(
424            "{}.tmp-{}",
425            Self::report_file_name(),
426            unique_suffix()
427        ));
428
429        let json = serde_json::to_string_pretty(report)?;
430        let mut file = File::create(&tmp_path)?;
431        file.write_all(json.as_bytes())?;
432        file.sync_all()?;
433        std::fs::rename(&tmp_path, &report_path)?;
434
435        Ok(report_path)
436    }
437
438    pub fn write_summary(
439        report_dir: &Path,
440        run_id: &str,
441        report: &RunSummaryReport,
442    ) -> Result<PathBuf, ReportError> {
443        let run_dir = report_dir.join(Self::run_dir_name(run_id));
444        std::fs::create_dir_all(&run_dir)?;
445        let report_path = Self::summary_path(report_dir, run_id);
446        let tmp_path = run_dir.join(format!(
447            "{}.tmp-{}",
448            Self::summary_file_name(),
449            unique_suffix()
450        ));
451
452        let json = serde_json::to_string_pretty(report)?;
453        let mut file = File::create(&tmp_path)?;
454        file.write_all(json.as_bytes())?;
455        file.sync_all()?;
456        std::fs::rename(&tmp_path, &report_path)?;
457
458        Ok(report_path)
459    }
460}
461
462pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
463    if file_statuses.contains(&FileStatus::Failed) {
464        return (RunStatus::Failed, 1);
465    }
466    if file_statuses.contains(&FileStatus::Aborted) {
467        return (RunStatus::Aborted, 2);
468    }
469    if file_statuses.contains(&FileStatus::Rejected) {
470        return (RunStatus::Rejected, 0);
471    }
472    (RunStatus::Success, 0)
473}
474
475fn unique_suffix() -> String {
476    let nanos = SystemTime::now()
477        .duration_since(UNIX_EPOCH)
478        .map(|duration| duration.as_nanos())
479        .unwrap_or(0);
480    format!("{}-{}", std::process::id(), nanos)
481}