Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    pub schema_evolution: SchemaEvolutionSummary,
23    #[serde(default)]
24    #[serde(skip_serializing_if = "Vec::is_empty")]
25    pub unique_constraints: Vec<UniqueConstraintReport>,
26    pub results: ResultsTotals,
27    pub files: Vec<FileReport>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub struct RunSummaryReport {
33    pub spec_version: String,
34    pub tool: ToolInfo,
35    pub run: RunInfo,
36    pub config: ConfigEcho,
37    pub report: ReportEcho,
38    pub results: ResultsTotals,
39    pub entities: Vec<EntitySummary>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub struct EntitySummary {
45    pub name: String,
46    pub status: RunStatus,
47    pub results: ResultsTotals,
48    pub report_file: String,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub struct ToolInfo {
54    pub name: String,
55    pub version: String,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub git: Option<GitInfo>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub struct GitInfo {
63    pub commit: String,
64    pub dirty: bool,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69pub struct RunInfo {
70    pub run_id: String,
71    pub started_at: String,
72    pub finished_at: String,
73    pub duration_ms: u64,
74    pub status: RunStatus,
75    pub exit_code: i32,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub struct ConfigEcho {
81    pub path: String,
82    pub version: String,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub struct EntityEcho {
90    pub name: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub metadata: Option<serde_json::Value>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub struct SourceEcho {
98    pub format: String,
99    pub path: String,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub options: Option<serde_json::Value>,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub cast_mode: Option<String>,
104    pub read_plan: SourceReadPlan,
105    pub resolved_inputs: ResolvedInputs,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "snake_case")]
110pub struct ResolvedInputs {
111    pub mode: ResolvedInputMode,
112    pub file_count: u64,
113    pub files: Vec<String>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub struct SinkEcho {
119    pub accepted: SinkTargetEcho,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub rejected: Option<SinkTargetEcho>,
122    pub archive: SinkArchiveEcho,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub struct AcceptedOutputSummary {
128    pub path: String,
129    #[serde(default)]
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub table_root_uri: Option<String>,
132    #[serde(default)]
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub write_mode: Option<String>,
135    pub accepted_rows: u64,
136    #[serde(default)]
137    pub files_written: Option<u64>,
138    pub parts_written: u64,
139    #[serde(default)]
140    #[serde(skip_serializing_if = "Vec::is_empty")]
141    pub part_files: Vec<String>,
142    #[serde(default)]
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub table_version: Option<i64>,
145    #[serde(default)]
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub snapshot_id: Option<i64>,
148    #[serde(default)]
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub iceberg_catalog_name: Option<String>,
151    #[serde(default)]
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub iceberg_database: Option<String>,
154    #[serde(default)]
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub iceberg_namespace: Option<String>,
157    #[serde(default)]
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub iceberg_table: Option<String>,
160    #[serde(default)]
161    pub total_bytes_written: Option<u64>,
162    #[serde(default)]
163    pub avg_file_size_mb: Option<f64>,
164    #[serde(default)]
165    pub small_files_count: Option<u64>,
166    #[serde(default)]
167    #[serde(skip_serializing_if = "Vec::is_empty")]
168    pub merge_key: Vec<String>,
169    #[serde(default)]
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub inserted_count: Option<u64>,
172    #[serde(default)]
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub updated_count: Option<u64>,
175    #[serde(default)]
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub closed_count: Option<u64>,
178    #[serde(default)]
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub unchanged_count: Option<u64>,
181    #[serde(default)]
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub target_rows_before: Option<u64>,
184    #[serde(default)]
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub target_rows_after: Option<u64>,
187    #[serde(default)]
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub merge_elapsed_ms: Option<u64>,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, Default)]
193#[serde(rename_all = "snake_case")]
194pub struct SchemaEvolutionSummary {
195    pub enabled: bool,
196    pub mode: String,
197    pub applied: bool,
198    #[serde(default)]
199    #[serde(skip_serializing_if = "Vec::is_empty")]
200    pub added_columns: Vec<String>,
201    pub incompatible_changes_detected: bool,
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205#[serde(rename_all = "snake_case")]
206pub struct UniqueConstraintReport {
207    pub columns: Vec<String>,
208    pub duplicates_count: u64,
209    pub affected_rows_count: u64,
210    pub action: String,
211    pub status_effect: String,
212    #[serde(default)]
213    #[serde(skip_serializing_if = "Vec::is_empty")]
214    pub samples: Vec<UniqueConstraintSampleReport>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "snake_case")]
219pub struct UniqueConstraintSampleReport {
220    pub values: std::collections::BTreeMap<String, String>,
221    pub count: u64,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub struct SinkTargetEcho {
227    pub format: String,
228    pub path: String,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
232#[serde(rename_all = "snake_case")]
233pub struct ReportEcho {
234    pub path: String,
235    pub report_file: String,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239#[serde(rename_all = "snake_case")]
240pub struct SinkArchiveEcho {
241    pub enabled: bool,
242    #[serde(skip_serializing_if = "Option::is_none")]
243    pub path: Option<String>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247#[serde(rename_all = "snake_case")]
248pub struct PolicyEcho {
249    pub severity: Severity,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub struct ResultsTotals {
255    pub files_total: u64,
256    pub rows_total: u64,
257    pub accepted_total: u64,
258    pub rejected_total: u64,
259    pub warnings_total: u64,
260    pub errors_total: u64,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
264#[serde(rename_all = "snake_case")]
265pub struct FileReport {
266    pub input_file: String,
267    pub status: FileStatus,
268    pub row_count: u64,
269    pub accepted_count: u64,
270    pub rejected_count: u64,
271    pub mismatch: FileMismatch,
272    pub output: FileOutput,
273    pub validation: FileValidation,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
277#[serde(rename_all = "snake_case")]
278pub struct FileMismatch {
279    pub declared_columns_count: u64,
280    pub input_columns_count: u64,
281    pub missing_columns: Vec<String>,
282    pub extra_columns: Vec<String>,
283    pub mismatch_action: MismatchAction,
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub error: Option<MismatchIssue>,
286    #[serde(skip_serializing_if = "Option::is_none")]
287    pub warning: Option<String>,
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub struct MismatchIssue {
293    pub rule: String,
294    pub message: String,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize)]
298#[serde(rename_all = "snake_case")]
299pub struct FileOutput {
300    pub accepted_path: Option<String>,
301    pub rejected_path: Option<String>,
302    pub errors_path: Option<String>,
303    pub archived_path: Option<String>,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(rename_all = "snake_case")]
308pub struct FileValidation {
309    pub errors: u64,
310    pub warnings: u64,
311    pub rules: Vec<RuleSummary>,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
315#[serde(rename_all = "snake_case")]
316pub struct RuleSummary {
317    pub rule: RuleName,
318    pub severity: Severity,
319    pub violations: u64,
320    pub columns: Vec<ColumnSummary>,
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize)]
324#[serde(rename_all = "snake_case")]
325pub struct ColumnSummary {
326    pub column: String,
327    pub violations: u64,
328    #[serde(skip_serializing_if = "Option::is_none")]
329    pub target_type: Option<String>,
330    #[serde(skip_serializing_if = "Option::is_none")]
331    pub source: Option<String>,
332}
333
334#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
335#[serde(rename_all = "snake_case")]
336pub enum FileStatus {
337    Success,
338    Rejected,
339    Aborted,
340    Failed,
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
344#[serde(rename_all = "snake_case")]
345pub enum RunStatus {
346    Success,
347    SuccessWithWarnings,
348    Rejected,
349    Aborted,
350    Failed,
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
354#[serde(rename_all = "snake_case")]
355pub enum Severity {
356    Warn,
357    Reject,
358    Abort,
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
362#[serde(rename_all = "snake_case")]
363pub enum RuleName {
364    NotNull,
365    CastError,
366    Unique,
367    SchemaError,
368}
369
370#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
371#[serde(rename_all = "snake_case")]
372pub enum MismatchAction {
373    None,
374    FilledNulls,
375    IgnoredExtras,
376    RejectedFile,
377    Aborted,
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
381#[serde(rename_all = "snake_case")]
382pub enum ResolvedInputMode {
383    Directory,
384    File,
385    Glob,
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
389#[serde(rename_all = "snake_case")]
390pub enum SourceReadPlan {
391    RawAndTyped,
392}
393
394#[derive(Debug)]
395pub enum ReportError {
396    Io(std::io::Error),
397    Serialize(serde_json::Error),
398}
399
400impl std::fmt::Display for ReportError {
401    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
402        match self {
403            ReportError::Io(err) => write!(f, "report io error: {err}"),
404            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
405        }
406    }
407}
408
409impl std::error::Error for ReportError {}
410
411impl From<std::io::Error> for ReportError {
412    fn from(err: std::io::Error) -> Self {
413        Self::Io(err)
414    }
415}
416
417impl From<serde_json::Error> for ReportError {
418    fn from(err: serde_json::Error) -> Self {
419        Self::Serialize(err)
420    }
421}
422
423pub trait ReportFormatter {
424    fn format_name(&self) -> &'static str;
425    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
426    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
427}
428
429pub struct JsonReportFormatter;
430
431impl ReportFormatter for JsonReportFormatter {
432    fn format_name(&self) -> &'static str {
433        "json"
434    }
435
436    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
437        Ok(serde_json::to_string_pretty(report)?)
438    }
439
440    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
441        Ok(serde_json::to_string_pretty(report)?)
442    }
443}
444
445pub fn now_rfc3339() -> String {
446    OffsetDateTime::now_utc()
447        .format(&Rfc3339)
448        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
449}
450
451pub fn run_id_from_timestamp(timestamp: &str) -> String {
452    timestamp.replace(':', "-")
453}
454
455pub struct ReportWriter;
456
457impl ReportWriter {
458    pub fn run_dir_name(run_id: &str) -> String {
459        format!("run_{run_id}")
460    }
461
462    pub fn report_file_name() -> String {
463        "run.json".to_string()
464    }
465
466    pub fn summary_file_name() -> String {
467        "run.summary.json".to_string()
468    }
469
470    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
471        format!(
472            "{}/{}/{}",
473            Self::run_dir_name(run_id),
474            entity_name,
475            Self::report_file_name()
476        )
477    }
478
479    pub fn summary_relative_path(run_id: &str) -> String {
480        format!(
481            "{}/{}",
482            Self::run_dir_name(run_id),
483            Self::summary_file_name()
484        )
485    }
486
487    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
488        crate::io::storage::paths::normalize_local_path(
489            &report_dir
490                .join(Self::run_dir_name(run_id))
491                .join(entity_name),
492        )
493    }
494
495    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
496        crate::io::storage::paths::normalize_local_path(
497            &Self::entity_report_dir(report_dir, run_id, entity_name)
498                .join(Self::report_file_name()),
499        )
500    }
501
502    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
503        crate::io::storage::paths::normalize_local_path(
504            &report_dir
505                .join(Self::run_dir_name(run_id))
506                .join(Self::summary_file_name()),
507        )
508    }
509
510    pub fn write_report(
511        report_dir: &Path,
512        run_id: &str,
513        entity_name: &str,
514        report: &RunReport,
515    ) -> Result<PathBuf, ReportError> {
516        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
517        std::fs::create_dir_all(&entity_dir)?;
518        let report_path = Self::report_path(report_dir, run_id, entity_name);
519        let tmp_path = entity_dir.join(format!(
520            "{}.tmp-{}",
521            Self::report_file_name(),
522            unique_suffix()
523        ));
524
525        let json = serde_json::to_string_pretty(report)?;
526        let mut file = File::create(&tmp_path)?;
527        file.write_all(json.as_bytes())?;
528        file.sync_all()?;
529        std::fs::rename(&tmp_path, &report_path)?;
530
531        Ok(report_path)
532    }
533
534    pub fn write_summary(
535        report_dir: &Path,
536        run_id: &str,
537        report: &RunSummaryReport,
538    ) -> Result<PathBuf, ReportError> {
539        let run_dir = report_dir.join(Self::run_dir_name(run_id));
540        let run_dir = crate::io::storage::paths::normalize_local_path(&run_dir);
541        std::fs::create_dir_all(&run_dir)?;
542        let report_path = Self::summary_path(report_dir, run_id);
543        let tmp_path = run_dir.join(format!(
544            "{}.tmp-{}",
545            Self::summary_file_name(),
546            unique_suffix()
547        ));
548
549        let json = serde_json::to_string_pretty(report)?;
550        let mut file = File::create(&tmp_path)?;
551        file.write_all(json.as_bytes())?;
552        file.sync_all()?;
553        std::fs::rename(&tmp_path, &report_path)?;
554
555        Ok(report_path)
556    }
557}
558
559pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
560    if file_statuses.contains(&FileStatus::Failed) {
561        return (RunStatus::Failed, 1);
562    }
563    if file_statuses.contains(&FileStatus::Aborted) {
564        return (RunStatus::Aborted, 2);
565    }
566    if file_statuses.contains(&FileStatus::Rejected) {
567        return (RunStatus::Rejected, 0);
568    }
569    (RunStatus::Success, 0)
570}
571
572fn unique_suffix() -> String {
573    let nanos = SystemTime::now()
574        .duration_since(UNIX_EPOCH)
575        .map(|duration| duration.as_nanos())
576        .unwrap_or(0);
577    format!("{}-{}", std::process::id(), nanos)
578}