Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    pub schema_evolution: SchemaEvolutionSummary,
23    #[serde(default)]
24    #[serde(skip_serializing_if = "Vec::is_empty")]
25    pub unique_constraints: Vec<UniqueConstraintReport>,
26    pub results: ResultsTotals,
27    pub files: Vec<FileReport>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub struct RunSummaryReport {
33    pub spec_version: String,
34    pub tool: ToolInfo,
35    pub run: RunInfo,
36    pub config: ConfigEcho,
37    pub report: ReportEcho,
38    pub results: ResultsTotals,
39    pub entities: Vec<EntitySummary>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub struct EntitySummary {
45    pub name: String,
46    pub status: RunStatus,
47    pub results: ResultsTotals,
48    pub report_file: String,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub struct ToolInfo {
54    pub name: String,
55    pub version: String,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub git: Option<GitInfo>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub struct GitInfo {
63    pub commit: String,
64    pub dirty: bool,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69pub struct RunInfo {
70    pub run_id: String,
71    pub started_at: String,
72    pub finished_at: String,
73    pub duration_ms: u64,
74    pub status: RunStatus,
75    pub exit_code: i32,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub struct ConfigEcho {
81    pub path: String,
82    pub version: String,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub struct EntityEcho {
90    pub name: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub metadata: Option<serde_json::Value>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub struct SourceEcho {
98    pub format: String,
99    pub path: String,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub options: Option<serde_json::Value>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub cast_mode: Option<String>,
104    pub read_plan: SourceReadPlan,
105    pub resolved_inputs: ResolvedInputs,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "snake_case")]
110pub struct ResolvedInputs {
111    pub mode: ResolvedInputMode,
112    pub file_count: u64,
113    pub files: Vec<String>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub struct SinkEcho {
119    pub accepted: SinkTargetEcho,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub rejected: Option<SinkTargetEcho>,
122    pub archive: SinkArchiveEcho,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub struct AcceptedOutputSummary {
128    pub path: String,
129    #[serde(default)]
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub table_root_uri: Option<String>,
132    #[serde(default)]
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub write_mode: Option<String>,
135    pub accepted_rows: u64,
136    #[serde(default)]
137    pub files_written: Option<u64>,
138    pub parts_written: u64,
139    #[serde(default)]
140    #[serde(skip_serializing_if = "Vec::is_empty")]
141    pub part_files: Vec<String>,
142    #[serde(default)]
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub table_version: Option<i64>,
145    #[serde(default)]
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub snapshot_id: Option<i64>,
148    #[serde(default)]
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub iceberg_catalog_name: Option<String>,
151    #[serde(default)]
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub iceberg_database: Option<String>,
154    #[serde(default)]
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub iceberg_namespace: Option<String>,
157    #[serde(default)]
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub iceberg_table: Option<String>,
160    #[serde(default)]
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub delta_catalog_name: Option<String>,
163    #[serde(default)]
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub delta_catalog_schema: Option<String>,
166    #[serde(default)]
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub delta_catalog_table: Option<String>,
169    #[serde(default)]
170    pub total_bytes_written: Option<u64>,
171    #[serde(default)]
172    pub avg_file_size_mb: Option<f64>,
173    #[serde(default)]
174    pub small_files_count: Option<u64>,
175    #[serde(default)]
176    #[serde(skip_serializing_if = "Vec::is_empty")]
177    pub merge_key: Vec<String>,
178    #[serde(default)]
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub inserted_count: Option<u64>,
181    #[serde(default)]
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub updated_count: Option<u64>,
184    #[serde(default)]
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub closed_count: Option<u64>,
187    #[serde(default)]
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub unchanged_count: Option<u64>,
190    #[serde(default)]
191    #[serde(skip_serializing_if = "Option::is_none")]
192    pub target_rows_before: Option<u64>,
193    #[serde(default)]
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub target_rows_after: Option<u64>,
196    #[serde(default)]
197    #[serde(skip_serializing_if = "Option::is_none")]
198    pub merge_elapsed_ms: Option<u64>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize, Default)]
202#[serde(rename_all = "snake_case")]
203pub struct SchemaEvolutionSummary {
204    pub enabled: bool,
205    pub mode: String,
206    pub applied: bool,
207    #[serde(default)]
208    #[serde(skip_serializing_if = "Vec::is_empty")]
209    pub added_columns: Vec<String>,
210    pub incompatible_changes_detected: bool,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214#[serde(rename_all = "snake_case")]
215pub struct UniqueConstraintReport {
216    pub columns: Vec<String>,
217    pub duplicates_count: u64,
218    #[serde(default)]
219    #[serde(skip_serializing_if = "is_zero_u64")]
220    pub batch_duplicates_count: u64,
221    #[serde(default)]
222    #[serde(skip_serializing_if = "is_zero_u64")]
223    pub target_duplicates_count: u64,
224    pub affected_rows_count: u64,
225    pub action: String,
226    pub status_effect: String,
227    #[serde(default)]
228    #[serde(skip_serializing_if = "Vec::is_empty")]
229    pub samples: Vec<UniqueConstraintSampleReport>,
230}
231
232fn is_zero_u64(n: &u64) -> bool {
233    *n == 0
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(rename_all = "snake_case")]
238pub struct UniqueConstraintSampleReport {
239    pub values: std::collections::BTreeMap<String, String>,
240    pub count: u64,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub struct SinkTargetEcho {
246    pub format: String,
247    pub path: String,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251#[serde(rename_all = "snake_case")]
252pub struct ReportEcho {
253    pub path: String,
254    pub report_file: String,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258#[serde(rename_all = "snake_case")]
259pub struct SinkArchiveEcho {
260    pub enabled: bool,
261    #[serde(skip_serializing_if = "Option::is_none")]
262    pub path: Option<String>,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266#[serde(rename_all = "snake_case")]
267pub struct PolicyEcho {
268    pub severity: Severity,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272#[serde(rename_all = "snake_case")]
273pub struct ResultsTotals {
274    pub files_total: u64,
275    #[serde(default)]
276    pub files_skipped: u64,
277    pub rows_total: u64,
278    pub accepted_total: u64,
279    pub rejected_total: u64,
280    pub warnings_total: u64,
281    pub errors_total: u64,
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize)]
285#[serde(rename_all = "snake_case")]
286pub struct FileReport {
287    pub input_file: String,
288    pub status: FileStatus,
289    pub row_count: u64,
290    pub accepted_count: u64,
291    pub rejected_count: u64,
292    #[serde(skip_serializing_if = "Option::is_none")]
293    pub skip_reason: Option<String>,
294    pub mismatch: FileMismatch,
295    pub output: FileOutput,
296    pub validation: FileValidation,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300#[serde(rename_all = "snake_case")]
301pub struct FileMismatch {
302    pub declared_columns_count: u64,
303    pub input_columns_count: u64,
304    pub missing_columns: Vec<String>,
305    pub extra_columns: Vec<String>,
306    pub mismatch_action: MismatchAction,
307    #[serde(skip_serializing_if = "Option::is_none")]
308    pub error: Option<MismatchIssue>,
309    #[serde(skip_serializing_if = "Option::is_none")]
310    pub warning: Option<String>,
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
314#[serde(rename_all = "snake_case")]
315pub struct MismatchIssue {
316    pub rule: String,
317    pub message: String,
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize)]
321#[serde(rename_all = "snake_case")]
322pub struct FileOutput {
323    pub accepted_path: Option<String>,
324    pub rejected_path: Option<String>,
325    pub errors_path: Option<String>,
326    pub archived_path: Option<String>,
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize)]
330#[serde(rename_all = "snake_case")]
331pub struct FileValidation {
332    pub errors: u64,
333    pub warnings: u64,
334    pub rules: Vec<RuleSummary>,
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
338#[serde(rename_all = "snake_case")]
339pub struct RuleSummary {
340    pub rule: RuleName,
341    pub severity: Severity,
342    pub violations: u64,
343    pub columns: Vec<ColumnSummary>,
344}
345
346#[derive(Debug, Clone, Serialize, Deserialize)]
347#[serde(rename_all = "snake_case")]
348pub struct ColumnSummary {
349    pub column: String,
350    pub violations: u64,
351    #[serde(skip_serializing_if = "Option::is_none")]
352    pub target_type: Option<String>,
353    #[serde(skip_serializing_if = "Option::is_none")]
354    pub source: Option<String>,
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
358#[serde(rename_all = "snake_case")]
359pub enum FileStatus {
360    Success,
361    Rejected,
362    Aborted,
363    Failed,
364    Skipped,
365}
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
368#[serde(rename_all = "snake_case")]
369pub enum RunStatus {
370    Success,
371    SuccessWithWarnings,
372    Rejected,
373    Aborted,
374    Failed,
375}
376
377#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
378#[serde(rename_all = "snake_case")]
379pub enum Severity {
380    Warn,
381    Reject,
382    Abort,
383}
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
386#[serde(rename_all = "snake_case")]
387pub enum RuleName {
388    NotNull,
389    CastError,
390    Unique,
391    SchemaError,
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
395#[serde(rename_all = "snake_case")]
396pub enum MismatchAction {
397    None,
398    FilledNulls,
399    IgnoredExtras,
400    RejectedFile,
401    Aborted,
402}
403
404#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
405#[serde(rename_all = "snake_case")]
406pub enum ResolvedInputMode {
407    Directory,
408    File,
409    Glob,
410}
411
412#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
413#[serde(rename_all = "snake_case")]
414pub enum SourceReadPlan {
415    RawAndTyped,
416}
417
418#[derive(Debug)]
419pub enum ReportError {
420    Io(std::io::Error),
421    Serialize(serde_json::Error),
422}
423
424impl std::fmt::Display for ReportError {
425    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426        match self {
427            ReportError::Io(err) => write!(f, "report io error: {err}"),
428            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
429        }
430    }
431}
432
433impl std::error::Error for ReportError {}
434
435impl From<std::io::Error> for ReportError {
436    fn from(err: std::io::Error) -> Self {
437        Self::Io(err)
438    }
439}
440
441impl From<serde_json::Error> for ReportError {
442    fn from(err: serde_json::Error) -> Self {
443        Self::Serialize(err)
444    }
445}
446
447pub trait ReportFormatter {
448    fn format_name(&self) -> &'static str;
449    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
450    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
451}
452
453pub struct JsonReportFormatter;
454
455impl ReportFormatter for JsonReportFormatter {
456    fn format_name(&self) -> &'static str {
457        "json"
458    }
459
460    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
461        Ok(serde_json::to_string_pretty(report)?)
462    }
463
464    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
465        Ok(serde_json::to_string_pretty(report)?)
466    }
467}
468
469pub fn now_rfc3339() -> String {
470    OffsetDateTime::now_utc()
471        .format(&Rfc3339)
472        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
473}
474
475pub fn run_id_from_timestamp(timestamp: &str) -> String {
476    timestamp.replace(':', "-")
477}
478
479pub struct ReportWriter;
480
481impl ReportWriter {
482    pub fn run_dir_name(run_id: &str) -> String {
483        format!("run_{run_id}")
484    }
485
486    pub fn report_file_name() -> String {
487        "run.json".to_string()
488    }
489
490    pub fn summary_file_name() -> String {
491        "run.summary.json".to_string()
492    }
493
494    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
495        format!(
496            "{}/{}/{}",
497            Self::run_dir_name(run_id),
498            entity_name,
499            Self::report_file_name()
500        )
501    }
502
503    pub fn summary_relative_path(run_id: &str) -> String {
504        format!(
505            "{}/{}",
506            Self::run_dir_name(run_id),
507            Self::summary_file_name()
508        )
509    }
510
511    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
512        crate::io::storage::paths::normalize_local_path(
513            &report_dir
514                .join(Self::run_dir_name(run_id))
515                .join(entity_name),
516        )
517    }
518
519    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
520        crate::io::storage::paths::normalize_local_path(
521            &Self::entity_report_dir(report_dir, run_id, entity_name)
522                .join(Self::report_file_name()),
523        )
524    }
525
526    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
527        crate::io::storage::paths::normalize_local_path(
528            &report_dir
529                .join(Self::run_dir_name(run_id))
530                .join(Self::summary_file_name()),
531        )
532    }
533
534    pub fn write_report(
535        report_dir: &Path,
536        run_id: &str,
537        entity_name: &str,
538        report: &RunReport,
539    ) -> Result<PathBuf, ReportError> {
540        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
541        std::fs::create_dir_all(&entity_dir)?;
542        let report_path = Self::report_path(report_dir, run_id, entity_name);
543        let tmp_path = entity_dir.join(format!(
544            "{}.tmp-{}",
545            Self::report_file_name(),
546            unique_suffix()
547        ));
548
549        let json = serde_json::to_string_pretty(report)?;
550        let mut file = File::create(&tmp_path)?;
551        file.write_all(json.as_bytes())?;
552        file.sync_all()?;
553        std::fs::rename(&tmp_path, &report_path)?;
554
555        Ok(report_path)
556    }
557
558    pub fn write_summary(
559        report_dir: &Path,
560        run_id: &str,
561        report: &RunSummaryReport,
562    ) -> Result<PathBuf, ReportError> {
563        let run_dir = report_dir.join(Self::run_dir_name(run_id));
564        let run_dir = crate::io::storage::paths::normalize_local_path(&run_dir);
565        std::fs::create_dir_all(&run_dir)?;
566        let report_path = Self::summary_path(report_dir, run_id);
567        let tmp_path = run_dir.join(format!(
568            "{}.tmp-{}",
569            Self::summary_file_name(),
570            unique_suffix()
571        ));
572
573        let json = serde_json::to_string_pretty(report)?;
574        let mut file = File::create(&tmp_path)?;
575        file.write_all(json.as_bytes())?;
576        file.sync_all()?;
577        std::fs::rename(&tmp_path, &report_path)?;
578
579        Ok(report_path)
580    }
581}
582
583pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
584    if file_statuses.contains(&FileStatus::Failed) {
585        return (RunStatus::Failed, 1);
586    }
587    if file_statuses.contains(&FileStatus::Aborted) {
588        return (RunStatus::Aborted, 2);
589    }
590    if file_statuses.contains(&FileStatus::Rejected) {
591        return (RunStatus::Rejected, 0);
592    }
593    (RunStatus::Success, 0)
594}
595
596fn unique_suffix() -> String {
597    let nanos = SystemTime::now()
598        .duration_since(UNIX_EPOCH)
599        .map(|duration| duration.as_nanos())
600        .unwrap_or(0);
601    format!("{}-{}", std::process::id(), nanos)
602}