Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    pub results: ResultsTotals,
23    pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29    pub spec_version: String,
30    pub tool: ToolInfo,
31    pub run: RunInfo,
32    pub config: ConfigEcho,
33    pub report: ReportEcho,
34    pub results: ResultsTotals,
35    pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41    pub name: String,
42    pub status: RunStatus,
43    pub results: ResultsTotals,
44    pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50    pub name: String,
51    pub version: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59    pub commit: String,
60    pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66    pub run_id: String,
67    pub started_at: String,
68    pub finished_at: String,
69    pub duration_ms: u64,
70    pub status: RunStatus,
71    pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77    pub path: String,
78    pub version: String,
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86    pub name: String,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94    pub format: String,
95    pub path: String,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub options: Option<serde_json::Value>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub cast_mode: Option<String>,
100    pub read_plan: SourceReadPlan,
101    pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107    pub mode: ResolvedInputMode,
108    pub file_count: u64,
109    pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115    pub accepted: SinkTargetEcho,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub rejected: Option<SinkTargetEcho>,
118    pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124    pub path: String,
125    #[serde(default)]
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub table_root_uri: Option<String>,
128    #[serde(default)]
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub write_mode: Option<String>,
131    pub accepted_rows: u64,
132    #[serde(default)]
133    pub files_written: u64,
134    pub parts_written: u64,
135    #[serde(default)]
136    #[serde(skip_serializing_if = "Vec::is_empty")]
137    pub part_files: Vec<String>,
138    #[serde(default)]
139    #[serde(skip_serializing_if = "Option::is_none")]
140    pub table_version: Option<i64>,
141    #[serde(default)]
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub snapshot_id: Option<i64>,
144    #[serde(default)]
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub iceberg_catalog_name: Option<String>,
147    #[serde(default)]
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub iceberg_database: Option<String>,
150    #[serde(default)]
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub iceberg_namespace: Option<String>,
153    #[serde(default)]
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub iceberg_table: Option<String>,
156    #[serde(default)]
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub total_bytes_written: Option<u64>,
159    #[serde(default)]
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub avg_file_size_mb: Option<f64>,
162    #[serde(default)]
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub small_files_count: Option<u64>,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
168#[serde(rename_all = "snake_case")]
169pub struct SinkTargetEcho {
170    pub format: String,
171    pub path: String,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct ReportEcho {
177    pub path: String,
178    pub report_file: String,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182#[serde(rename_all = "snake_case")]
183pub struct SinkArchiveEcho {
184    pub enabled: bool,
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub path: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190#[serde(rename_all = "snake_case")]
191pub struct PolicyEcho {
192    pub severity: Severity,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196#[serde(rename_all = "snake_case")]
197pub struct ResultsTotals {
198    pub files_total: u64,
199    pub rows_total: u64,
200    pub accepted_total: u64,
201    pub rejected_total: u64,
202    pub warnings_total: u64,
203    pub errors_total: u64,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207#[serde(rename_all = "snake_case")]
208pub struct FileReport {
209    pub input_file: String,
210    pub status: FileStatus,
211    pub row_count: u64,
212    pub accepted_count: u64,
213    pub rejected_count: u64,
214    pub mismatch: FileMismatch,
215    pub output: FileOutput,
216    pub validation: FileValidation,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220#[serde(rename_all = "snake_case")]
221pub struct FileMismatch {
222    pub declared_columns_count: u64,
223    pub input_columns_count: u64,
224    pub missing_columns: Vec<String>,
225    pub extra_columns: Vec<String>,
226    pub mismatch_action: MismatchAction,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub error: Option<MismatchIssue>,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub warning: Option<String>,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub struct MismatchIssue {
236    pub rule: String,
237    pub message: String,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
241#[serde(rename_all = "snake_case")]
242pub struct FileOutput {
243    pub accepted_path: Option<String>,
244    pub rejected_path: Option<String>,
245    pub errors_path: Option<String>,
246    pub archived_path: Option<String>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
250#[serde(rename_all = "snake_case")]
251pub struct FileValidation {
252    pub errors: u64,
253    pub warnings: u64,
254    pub rules: Vec<RuleSummary>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258#[serde(rename_all = "snake_case")]
259pub struct RuleSummary {
260    pub rule: RuleName,
261    pub severity: Severity,
262    pub violations: u64,
263    pub columns: Vec<ColumnSummary>,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267#[serde(rename_all = "snake_case")]
268pub struct ColumnSummary {
269    pub column: String,
270    pub violations: u64,
271    #[serde(skip_serializing_if = "Option::is_none")]
272    pub target_type: Option<String>,
273    #[serde(skip_serializing_if = "Option::is_none")]
274    pub source: Option<String>,
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
278#[serde(rename_all = "snake_case")]
279pub enum FileStatus {
280    Success,
281    Rejected,
282    Aborted,
283    Failed,
284}
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(rename_all = "snake_case")]
288pub enum RunStatus {
289    Success,
290    SuccessWithWarnings,
291    Rejected,
292    Aborted,
293    Failed,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
297#[serde(rename_all = "snake_case")]
298pub enum Severity {
299    Warn,
300    Reject,
301    Abort,
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(rename_all = "snake_case")]
306pub enum RuleName {
307    NotNull,
308    CastError,
309    Unique,
310    SchemaError,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
314#[serde(rename_all = "snake_case")]
315pub enum MismatchAction {
316    None,
317    FilledNulls,
318    IgnoredExtras,
319    RejectedFile,
320    Aborted,
321}
322
323#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
324#[serde(rename_all = "snake_case")]
325pub enum ResolvedInputMode {
326    Directory,
327    File,
328    Glob,
329}
330
331#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
332#[serde(rename_all = "snake_case")]
333pub enum SourceReadPlan {
334    RawAndTyped,
335}
336
337#[derive(Debug)]
338pub enum ReportError {
339    Io(std::io::Error),
340    Serialize(serde_json::Error),
341}
342
343impl std::fmt::Display for ReportError {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        match self {
346            ReportError::Io(err) => write!(f, "report io error: {err}"),
347            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
348        }
349    }
350}
351
352impl std::error::Error for ReportError {}
353
354impl From<std::io::Error> for ReportError {
355    fn from(err: std::io::Error) -> Self {
356        Self::Io(err)
357    }
358}
359
360impl From<serde_json::Error> for ReportError {
361    fn from(err: serde_json::Error) -> Self {
362        Self::Serialize(err)
363    }
364}
365
366pub trait ReportFormatter {
367    fn format_name(&self) -> &'static str;
368    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
369    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
370}
371
372pub struct JsonReportFormatter;
373
374impl ReportFormatter for JsonReportFormatter {
375    fn format_name(&self) -> &'static str {
376        "json"
377    }
378
379    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
380        Ok(serde_json::to_string_pretty(report)?)
381    }
382
383    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
384        Ok(serde_json::to_string_pretty(report)?)
385    }
386}
387
388pub fn now_rfc3339() -> String {
389    OffsetDateTime::now_utc()
390        .format(&Rfc3339)
391        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
392}
393
394pub fn run_id_from_timestamp(timestamp: &str) -> String {
395    timestamp.replace(':', "-")
396}
397
398pub struct ReportWriter;
399
400impl ReportWriter {
401    pub fn run_dir_name(run_id: &str) -> String {
402        format!("run_{run_id}")
403    }
404
405    pub fn report_file_name() -> String {
406        "run.json".to_string()
407    }
408
409    pub fn summary_file_name() -> String {
410        "run.summary.json".to_string()
411    }
412
413    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
414        format!(
415            "{}/{}/{}",
416            Self::run_dir_name(run_id),
417            entity_name,
418            Self::report_file_name()
419        )
420    }
421
422    pub fn summary_relative_path(run_id: &str) -> String {
423        format!(
424            "{}/{}",
425            Self::run_dir_name(run_id),
426            Self::summary_file_name()
427        )
428    }
429
430    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
431        crate::io::storage::paths::normalize_local_path(
432            &report_dir
433                .join(Self::run_dir_name(run_id))
434                .join(entity_name),
435        )
436    }
437
438    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
439        crate::io::storage::paths::normalize_local_path(
440            &Self::entity_report_dir(report_dir, run_id, entity_name)
441                .join(Self::report_file_name()),
442        )
443    }
444
445    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
446        crate::io::storage::paths::normalize_local_path(
447            &report_dir
448                .join(Self::run_dir_name(run_id))
449                .join(Self::summary_file_name()),
450        )
451    }
452
453    pub fn write_report(
454        report_dir: &Path,
455        run_id: &str,
456        entity_name: &str,
457        report: &RunReport,
458    ) -> Result<PathBuf, ReportError> {
459        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
460        std::fs::create_dir_all(&entity_dir)?;
461        let report_path = Self::report_path(report_dir, run_id, entity_name);
462        let tmp_path = entity_dir.join(format!(
463            "{}.tmp-{}",
464            Self::report_file_name(),
465            unique_suffix()
466        ));
467
468        let json = serde_json::to_string_pretty(report)?;
469        let mut file = File::create(&tmp_path)?;
470        file.write_all(json.as_bytes())?;
471        file.sync_all()?;
472        std::fs::rename(&tmp_path, &report_path)?;
473
474        Ok(report_path)
475    }
476
477    pub fn write_summary(
478        report_dir: &Path,
479        run_id: &str,
480        report: &RunSummaryReport,
481    ) -> Result<PathBuf, ReportError> {
482        let run_dir = report_dir.join(Self::run_dir_name(run_id));
483        let run_dir = crate::io::storage::paths::normalize_local_path(&run_dir);
484        std::fs::create_dir_all(&run_dir)?;
485        let report_path = Self::summary_path(report_dir, run_id);
486        let tmp_path = run_dir.join(format!(
487            "{}.tmp-{}",
488            Self::summary_file_name(),
489            unique_suffix()
490        ));
491
492        let json = serde_json::to_string_pretty(report)?;
493        let mut file = File::create(&tmp_path)?;
494        file.write_all(json.as_bytes())?;
495        file.sync_all()?;
496        std::fs::rename(&tmp_path, &report_path)?;
497
498        Ok(report_path)
499    }
500}
501
502pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
503    if file_statuses.contains(&FileStatus::Failed) {
504        return (RunStatus::Failed, 1);
505    }
506    if file_statuses.contains(&FileStatus::Aborted) {
507        return (RunStatus::Aborted, 2);
508    }
509    if file_statuses.contains(&FileStatus::Rejected) {
510        return (RunStatus::Rejected, 0);
511    }
512    (RunStatus::Success, 0)
513}
514
515fn unique_suffix() -> String {
516    let nanos = SystemTime::now()
517        .duration_since(UNIX_EPOCH)
518        .map(|duration| duration.as_nanos())
519        .unwrap_or(0);
520    format!("{}-{}", std::process::id(), nanos)
521}