Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    pub results: ResultsTotals,
23    pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29    pub spec_version: String,
30    pub tool: ToolInfo,
31    pub run: RunInfo,
32    pub config: ConfigEcho,
33    pub report: ReportEcho,
34    pub results: ResultsTotals,
35    pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41    pub name: String,
42    pub status: RunStatus,
43    pub results: ResultsTotals,
44    pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50    pub name: String,
51    pub version: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59    pub commit: String,
60    pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66    pub run_id: String,
67    pub started_at: String,
68    pub finished_at: String,
69    pub duration_ms: u64,
70    pub status: RunStatus,
71    pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77    pub path: String,
78    pub version: String,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86    pub name: String,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94    pub format: String,
95    pub path: String,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub options: Option<serde_json::Value>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub cast_mode: Option<String>,
100    pub read_plan: SourceReadPlan,
101    pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107    pub mode: ResolvedInputMode,
108    pub file_count: u64,
109    pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115    pub accepted: SinkTargetEcho,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub rejected: Option<SinkTargetEcho>,
118    pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124    pub path: String,
125    #[serde(default)]
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub table_root_uri: Option<String>,
128    #[serde(default)]
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub write_mode: Option<String>,
131    pub accepted_rows: u64,
132    #[serde(default)]
133    pub files_written: u64,
134    pub parts_written: u64,
135    #[serde(default)]
136    #[serde(skip_serializing_if = "Vec::is_empty")]
137    pub part_files: Vec<String>,
138    #[serde(default)]
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub table_version: Option<i64>,
141    #[serde(default)]
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub snapshot_id: Option<i64>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147#[serde(rename_all = "snake_case")]
148pub struct SinkTargetEcho {
149    pub format: String,
150    pub path: String,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub struct ReportEcho {
156    pub path: String,
157    pub report_file: String,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161#[serde(rename_all = "snake_case")]
162pub struct SinkArchiveEcho {
163    pub enabled: bool,
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub path: Option<String>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub struct PolicyEcho {
171    pub severity: Severity,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct ResultsTotals {
177    pub files_total: u64,
178    pub rows_total: u64,
179    pub accepted_total: u64,
180    pub rejected_total: u64,
181    pub warnings_total: u64,
182    pub errors_total: u64,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
186#[serde(rename_all = "snake_case")]
187pub struct FileReport {
188    pub input_file: String,
189    pub status: FileStatus,
190    pub row_count: u64,
191    pub accepted_count: u64,
192    pub rejected_count: u64,
193    pub mismatch: FileMismatch,
194    pub output: FileOutput,
195    pub validation: FileValidation,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(rename_all = "snake_case")]
200pub struct FileMismatch {
201    pub declared_columns_count: u64,
202    pub input_columns_count: u64,
203    pub missing_columns: Vec<String>,
204    pub extra_columns: Vec<String>,
205    pub mismatch_action: MismatchAction,
206    #[serde(skip_serializing_if = "Option::is_none")]
207    pub error: Option<MismatchIssue>,
208    #[serde(skip_serializing_if = "Option::is_none")]
209    pub warning: Option<String>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
213#[serde(rename_all = "snake_case")]
214pub struct MismatchIssue {
215    pub rule: String,
216    pub message: String,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220#[serde(rename_all = "snake_case")]
221pub struct FileOutput {
222    pub accepted_path: Option<String>,
223    pub rejected_path: Option<String>,
224    pub errors_path: Option<String>,
225    pub archived_path: Option<String>,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
229#[serde(rename_all = "snake_case")]
230pub struct FileValidation {
231    pub errors: u64,
232    pub warnings: u64,
233    pub rules: Vec<RuleSummary>,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(rename_all = "snake_case")]
238pub struct RuleSummary {
239    pub rule: RuleName,
240    pub severity: Severity,
241    pub violations: u64,
242    pub columns: Vec<ColumnSummary>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
246#[serde(rename_all = "snake_case")]
247pub struct ColumnSummary {
248    pub column: String,
249    pub violations: u64,
250    #[serde(skip_serializing_if = "Option::is_none")]
251    pub target_type: Option<String>,
252    #[serde(skip_serializing_if = "Option::is_none")]
253    pub source: Option<String>,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
257#[serde(rename_all = "snake_case")]
258pub enum FileStatus {
259    Success,
260    Rejected,
261    Aborted,
262    Failed,
263}
264
265#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
266#[serde(rename_all = "snake_case")]
267pub enum RunStatus {
268    Success,
269    SuccessWithWarnings,
270    Rejected,
271    Aborted,
272    Failed,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
276#[serde(rename_all = "snake_case")]
277pub enum Severity {
278    Warn,
279    Reject,
280    Abort,
281}
282
283#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
284#[serde(rename_all = "snake_case")]
285pub enum RuleName {
286    NotNull,
287    CastError,
288    Unique,
289    SchemaError,
290}
291
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
293#[serde(rename_all = "snake_case")]
294pub enum MismatchAction {
295    None,
296    FilledNulls,
297    IgnoredExtras,
298    RejectedFile,
299    Aborted,
300}
301
302#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
303#[serde(rename_all = "snake_case")]
304pub enum ResolvedInputMode {
305    Directory,
306    File,
307    Glob,
308}
309
310#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
311#[serde(rename_all = "snake_case")]
312pub enum SourceReadPlan {
313    RawAndTyped,
314}
315
316#[derive(Debug)]
317pub enum ReportError {
318    Io(std::io::Error),
319    Serialize(serde_json::Error),
320}
321
322impl std::fmt::Display for ReportError {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        match self {
325            ReportError::Io(err) => write!(f, "report io error: {err}"),
326            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
327        }
328    }
329}
330
331impl std::error::Error for ReportError {}
332
333impl From<std::io::Error> for ReportError {
334    fn from(err: std::io::Error) -> Self {
335        Self::Io(err)
336    }
337}
338
339impl From<serde_json::Error> for ReportError {
340    fn from(err: serde_json::Error) -> Self {
341        Self::Serialize(err)
342    }
343}
344
345pub trait ReportFormatter {
346    fn format_name(&self) -> &'static str;
347    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
348    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
349}
350
351pub struct JsonReportFormatter;
352
353impl ReportFormatter for JsonReportFormatter {
354    fn format_name(&self) -> &'static str {
355        "json"
356    }
357
358    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
359        Ok(serde_json::to_string_pretty(report)?)
360    }
361
362    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
363        Ok(serde_json::to_string_pretty(report)?)
364    }
365}
366
367pub fn now_rfc3339() -> String {
368    OffsetDateTime::now_utc()
369        .format(&Rfc3339)
370        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
371}
372
373pub fn run_id_from_timestamp(timestamp: &str) -> String {
374    timestamp.replace(':', "-")
375}
376
377pub struct ReportWriter;
378
379impl ReportWriter {
380    pub fn run_dir_name(run_id: &str) -> String {
381        format!("run_{run_id}")
382    }
383
384    pub fn report_file_name() -> String {
385        "run.json".to_string()
386    }
387
388    pub fn summary_file_name() -> String {
389        "run.summary.json".to_string()
390    }
391
392    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
393        format!(
394            "{}/{}/{}",
395            Self::run_dir_name(run_id),
396            entity_name,
397            Self::report_file_name()
398        )
399    }
400
401    pub fn summary_relative_path(run_id: &str) -> String {
402        format!(
403            "{}/{}",
404            Self::run_dir_name(run_id),
405            Self::summary_file_name()
406        )
407    }
408
409    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
410        report_dir
411            .join(Self::run_dir_name(run_id))
412            .join(entity_name)
413    }
414
415    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
416        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
417    }
418
419    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
420        report_dir
421            .join(Self::run_dir_name(run_id))
422            .join(Self::summary_file_name())
423    }
424
425    pub fn write_report(
426        report_dir: &Path,
427        run_id: &str,
428        entity_name: &str,
429        report: &RunReport,
430    ) -> Result<PathBuf, ReportError> {
431        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
432        std::fs::create_dir_all(&entity_dir)?;
433        let report_path = Self::report_path(report_dir, run_id, entity_name);
434        let tmp_path = entity_dir.join(format!(
435            "{}.tmp-{}",
436            Self::report_file_name(),
437            unique_suffix()
438        ));
439
440        let json = serde_json::to_string_pretty(report)?;
441        let mut file = File::create(&tmp_path)?;
442        file.write_all(json.as_bytes())?;
443        file.sync_all()?;
444        std::fs::rename(&tmp_path, &report_path)?;
445
446        Ok(report_path)
447    }
448
449    pub fn write_summary(
450        report_dir: &Path,
451        run_id: &str,
452        report: &RunSummaryReport,
453    ) -> Result<PathBuf, ReportError> {
454        let run_dir = report_dir.join(Self::run_dir_name(run_id));
455        std::fs::create_dir_all(&run_dir)?;
456        let report_path = Self::summary_path(report_dir, run_id);
457        let tmp_path = run_dir.join(format!(
458            "{}.tmp-{}",
459            Self::summary_file_name(),
460            unique_suffix()
461        ));
462
463        let json = serde_json::to_string_pretty(report)?;
464        let mut file = File::create(&tmp_path)?;
465        file.write_all(json.as_bytes())?;
466        file.sync_all()?;
467        std::fs::rename(&tmp_path, &report_path)?;
468
469        Ok(report_path)
470    }
471}
472
473pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
474    if file_statuses.contains(&FileStatus::Failed) {
475        return (RunStatus::Failed, 1);
476    }
477    if file_statuses.contains(&FileStatus::Aborted) {
478        return (RunStatus::Aborted, 2);
479    }
480    if file_statuses.contains(&FileStatus::Rejected) {
481        return (RunStatus::Rejected, 0);
482    }
483    (RunStatus::Success, 0)
484}
485
486fn unique_suffix() -> String {
487    let nanos = SystemTime::now()
488        .duration_since(UNIX_EPOCH)
489        .map(|duration| duration.as_nanos())
490        .unwrap_or(0);
491    format!("{}-{}", std::process::id(), nanos)
492}