Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    pub results: ResultsTotals,
23    pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29    pub spec_version: String,
30    pub tool: ToolInfo,
31    pub run: RunInfo,
32    pub config: ConfigEcho,
33    pub report: ReportEcho,
34    pub results: ResultsTotals,
35    pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41    pub name: String,
42    pub status: RunStatus,
43    pub results: ResultsTotals,
44    pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50    pub name: String,
51    pub version: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59    pub commit: String,
60    pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66    pub run_id: String,
67    pub started_at: String,
68    pub finished_at: String,
69    pub duration_ms: u64,
70    pub status: RunStatus,
71    pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77    pub path: String,
78    pub version: String,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86    pub name: String,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94    pub format: String,
95    pub path: String,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub options: Option<serde_json::Value>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub cast_mode: Option<String>,
100    pub read_plan: SourceReadPlan,
101    pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107    pub mode: ResolvedInputMode,
108    pub file_count: u64,
109    pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115    pub accepted: SinkTargetEcho,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub rejected: Option<SinkTargetEcho>,
118    pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124    pub path: String,
125    pub accepted_rows: u64,
126    pub parts_written: u64,
127    #[serde(default)]
128    #[serde(skip_serializing_if = "Vec::is_empty")]
129    pub part_files: Vec<String>,
130    #[serde(default)]
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub table_version: Option<i64>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub struct SinkTargetEcho {
138    pub format: String,
139    pub path: String,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(rename_all = "snake_case")]
144pub struct ReportEcho {
145    pub path: String,
146    pub report_file: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150#[serde(rename_all = "snake_case")]
151pub struct SinkArchiveEcho {
152    pub enabled: bool,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub path: Option<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub struct PolicyEcho {
160    pub severity: Severity,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(rename_all = "snake_case")]
165pub struct ResultsTotals {
166    pub files_total: u64,
167    pub rows_total: u64,
168    pub accepted_total: u64,
169    pub rejected_total: u64,
170    pub warnings_total: u64,
171    pub errors_total: u64,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct FileReport {
177    pub input_file: String,
178    pub status: FileStatus,
179    pub row_count: u64,
180    pub accepted_count: u64,
181    pub rejected_count: u64,
182    pub mismatch: FileMismatch,
183    pub output: FileOutput,
184    pub validation: FileValidation,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188#[serde(rename_all = "snake_case")]
189pub struct FileMismatch {
190    pub declared_columns_count: u64,
191    pub input_columns_count: u64,
192    pub missing_columns: Vec<String>,
193    pub extra_columns: Vec<String>,
194    pub mismatch_action: MismatchAction,
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub error: Option<MismatchIssue>,
197    #[serde(skip_serializing_if = "Option::is_none")]
198    pub warning: Option<String>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub struct MismatchIssue {
204    pub rule: String,
205    pub message: String,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct FileOutput {
211    pub accepted_path: Option<String>,
212    pub rejected_path: Option<String>,
213    pub errors_path: Option<String>,
214    pub archived_path: Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "snake_case")]
219pub struct FileValidation {
220    pub errors: u64,
221    pub warnings: u64,
222    pub rules: Vec<RuleSummary>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(rename_all = "snake_case")]
227pub struct RuleSummary {
228    pub rule: RuleName,
229    pub severity: Severity,
230    pub violations: u64,
231    pub columns: Vec<ColumnSummary>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235#[serde(rename_all = "snake_case")]
236pub struct ColumnSummary {
237    pub column: String,
238    pub violations: u64,
239    #[serde(skip_serializing_if = "Option::is_none")]
240    pub target_type: Option<String>,
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum FileStatus {
246    Success,
247    Rejected,
248    Aborted,
249    Failed,
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum RunStatus {
255    Success,
256    SuccessWithWarnings,
257    Rejected,
258    Aborted,
259    Failed,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
263#[serde(rename_all = "snake_case")]
264pub enum Severity {
265    Warn,
266    Reject,
267    Abort,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
271#[serde(rename_all = "snake_case")]
272pub enum RuleName {
273    NotNull,
274    CastError,
275    Unique,
276    SchemaError,
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(rename_all = "snake_case")]
281pub enum MismatchAction {
282    None,
283    FilledNulls,
284    IgnoredExtras,
285    RejectedFile,
286    Aborted,
287}
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
290#[serde(rename_all = "snake_case")]
291pub enum ResolvedInputMode {
292    Directory,
293    File,
294    Glob,
295}
296
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
298#[serde(rename_all = "snake_case")]
299pub enum SourceReadPlan {
300    RawAndTyped,
301}
302
303#[derive(Debug)]
304pub enum ReportError {
305    Io(std::io::Error),
306    Serialize(serde_json::Error),
307}
308
309impl std::fmt::Display for ReportError {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        match self {
312            ReportError::Io(err) => write!(f, "report io error: {err}"),
313            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
314        }
315    }
316}
317
318impl std::error::Error for ReportError {}
319
320impl From<std::io::Error> for ReportError {
321    fn from(err: std::io::Error) -> Self {
322        Self::Io(err)
323    }
324}
325
326impl From<serde_json::Error> for ReportError {
327    fn from(err: serde_json::Error) -> Self {
328        Self::Serialize(err)
329    }
330}
331
332pub trait ReportFormatter {
333    fn format_name(&self) -> &'static str;
334    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
335    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
336}
337
338pub struct JsonReportFormatter;
339
340impl ReportFormatter for JsonReportFormatter {
341    fn format_name(&self) -> &'static str {
342        "json"
343    }
344
345    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
346        Ok(serde_json::to_string_pretty(report)?)
347    }
348
349    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
350        Ok(serde_json::to_string_pretty(report)?)
351    }
352}
353
354pub fn now_rfc3339() -> String {
355    OffsetDateTime::now_utc()
356        .format(&Rfc3339)
357        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
358}
359
360pub fn run_id_from_timestamp(timestamp: &str) -> String {
361    timestamp.replace(':', "-")
362}
363
364pub struct ReportWriter;
365
366impl ReportWriter {
367    pub fn run_dir_name(run_id: &str) -> String {
368        format!("run_{run_id}")
369    }
370
371    pub fn report_file_name() -> String {
372        "run.json".to_string()
373    }
374
375    pub fn summary_file_name() -> String {
376        "run.summary.json".to_string()
377    }
378
379    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
380        format!(
381            "{}/{}/{}",
382            Self::run_dir_name(run_id),
383            entity_name,
384            Self::report_file_name()
385        )
386    }
387
388    pub fn summary_relative_path(run_id: &str) -> String {
389        format!(
390            "{}/{}",
391            Self::run_dir_name(run_id),
392            Self::summary_file_name()
393        )
394    }
395
396    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
397        report_dir
398            .join(Self::run_dir_name(run_id))
399            .join(entity_name)
400    }
401
402    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
403        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
404    }
405
406    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
407        report_dir
408            .join(Self::run_dir_name(run_id))
409            .join(Self::summary_file_name())
410    }
411
412    pub fn write_report(
413        report_dir: &Path,
414        run_id: &str,
415        entity_name: &str,
416        report: &RunReport,
417    ) -> Result<PathBuf, ReportError> {
418        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
419        std::fs::create_dir_all(&entity_dir)?;
420        let report_path = Self::report_path(report_dir, run_id, entity_name);
421        let tmp_path = entity_dir.join(format!(
422            "{}.tmp-{}",
423            Self::report_file_name(),
424            unique_suffix()
425        ));
426
427        let json = serde_json::to_string_pretty(report)?;
428        let mut file = File::create(&tmp_path)?;
429        file.write_all(json.as_bytes())?;
430        file.sync_all()?;
431        std::fs::rename(&tmp_path, &report_path)?;
432
433        Ok(report_path)
434    }
435
436    pub fn write_summary(
437        report_dir: &Path,
438        run_id: &str,
439        report: &RunSummaryReport,
440    ) -> Result<PathBuf, ReportError> {
441        let run_dir = report_dir.join(Self::run_dir_name(run_id));
442        std::fs::create_dir_all(&run_dir)?;
443        let report_path = Self::summary_path(report_dir, run_id);
444        let tmp_path = run_dir.join(format!(
445            "{}.tmp-{}",
446            Self::summary_file_name(),
447            unique_suffix()
448        ));
449
450        let json = serde_json::to_string_pretty(report)?;
451        let mut file = File::create(&tmp_path)?;
452        file.write_all(json.as_bytes())?;
453        file.sync_all()?;
454        std::fs::rename(&tmp_path, &report_path)?;
455
456        Ok(report_path)
457    }
458}
459
460pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
461    if file_statuses.contains(&FileStatus::Failed) {
462        return (RunStatus::Failed, 1);
463    }
464    if file_statuses.contains(&FileStatus::Aborted) {
465        return (RunStatus::Aborted, 2);
466    }
467    if file_statuses.contains(&FileStatus::Rejected) {
468        return (RunStatus::Rejected, 0);
469    }
470    (RunStatus::Success, 0)
471}
472
473fn unique_suffix() -> String {
474    let nanos = SystemTime::now()
475        .duration_since(UNIX_EPOCH)
476        .map(|duration| duration.as_nanos())
477        .unwrap_or(0);
478    format!("{}-{}", std::process::id(), nanos)
479}