Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16    pub spec_version: String,
17    pub entity: EntityEcho,
18    pub source: SourceEcho,
19    pub sink: SinkEcho,
20    pub policy: PolicyEcho,
21    pub accepted_output: AcceptedOutputSummary,
22    #[serde(default)]
23    #[serde(skip_serializing_if = "Vec::is_empty")]
24    pub unique_constraints: Vec<UniqueConstraintReport>,
25    pub results: ResultsTotals,
26    pub files: Vec<FileReport>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "snake_case")]
31pub struct RunSummaryReport {
32    pub spec_version: String,
33    pub tool: ToolInfo,
34    pub run: RunInfo,
35    pub config: ConfigEcho,
36    pub report: ReportEcho,
37    pub results: ResultsTotals,
38    pub entities: Vec<EntitySummary>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub struct EntitySummary {
44    pub name: String,
45    pub status: RunStatus,
46    pub results: ResultsTotals,
47    pub report_file: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct ToolInfo {
53    pub name: String,
54    pub version: String,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub git: Option<GitInfo>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub struct GitInfo {
62    pub commit: String,
63    pub dirty: bool,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RunInfo {
69    pub run_id: String,
70    pub started_at: String,
71    pub finished_at: String,
72    pub duration_ms: u64,
73    pub status: RunStatus,
74    pub exit_code: i32,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub struct ConfigEcho {
80    pub path: String,
81    pub version: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub metadata: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub struct EntityEcho {
89    pub name: String,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub metadata: Option<serde_json::Value>,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(rename_all = "snake_case")]
96pub struct SourceEcho {
97    pub format: String,
98    pub path: String,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub options: Option<serde_json::Value>,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub cast_mode: Option<String>,
103    pub read_plan: SourceReadPlan,
104    pub resolved_inputs: ResolvedInputs,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub struct ResolvedInputs {
110    pub mode: ResolvedInputMode,
111    pub file_count: u64,
112    pub files: Vec<String>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116#[serde(rename_all = "snake_case")]
117pub struct SinkEcho {
118    pub accepted: SinkTargetEcho,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub rejected: Option<SinkTargetEcho>,
121    pub archive: SinkArchiveEcho,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub struct AcceptedOutputSummary {
127    pub path: String,
128    #[serde(default)]
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub table_root_uri: Option<String>,
131    #[serde(default)]
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub write_mode: Option<String>,
134    pub accepted_rows: u64,
135    #[serde(default)]
136    pub files_written: u64,
137    pub parts_written: u64,
138    #[serde(default)]
139    #[serde(skip_serializing_if = "Vec::is_empty")]
140    pub part_files: Vec<String>,
141    #[serde(default)]
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub table_version: Option<i64>,
144    #[serde(default)]
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub snapshot_id: Option<i64>,
147    #[serde(default)]
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub iceberg_catalog_name: Option<String>,
150    #[serde(default)]
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub iceberg_database: Option<String>,
153    #[serde(default)]
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub iceberg_namespace: Option<String>,
156    #[serde(default)]
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub iceberg_table: Option<String>,
159    #[serde(default)]
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub total_bytes_written: Option<u64>,
162    #[serde(default)]
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub avg_file_size_mb: Option<f64>,
165    #[serde(default)]
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub small_files_count: Option<u64>,
168    #[serde(default)]
169    #[serde(skip_serializing_if = "Vec::is_empty")]
170    pub merge_key: Vec<String>,
171    #[serde(default)]
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub inserted_count: Option<u64>,
174    #[serde(default)]
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub updated_count: Option<u64>,
177    #[serde(default)]
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub target_rows_before: Option<u64>,
180    #[serde(default)]
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub target_rows_after: Option<u64>,
183    #[serde(default)]
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub merge_elapsed_ms: Option<u64>,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(rename_all = "snake_case")]
190pub struct UniqueConstraintReport {
191    pub columns: Vec<String>,
192    pub duplicates_count: u64,
193    pub affected_rows_count: u64,
194    pub action: String,
195    pub status_effect: String,
196    #[serde(default)]
197    #[serde(skip_serializing_if = "Vec::is_empty")]
198    pub samples: Vec<UniqueConstraintSampleReport>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub struct UniqueConstraintSampleReport {
204    pub values: std::collections::BTreeMap<String, String>,
205    pub count: u64,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct SinkTargetEcho {
211    pub format: String,
212    pub path: String,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217pub struct ReportEcho {
218    pub path: String,
219    pub report_file: String,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223#[serde(rename_all = "snake_case")]
224pub struct SinkArchiveEcho {
225    pub enabled: bool,
226    #[serde(skip_serializing_if = "Option::is_none")]
227    pub path: Option<String>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
231#[serde(rename_all = "snake_case")]
232pub struct PolicyEcho {
233    pub severity: Severity,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(rename_all = "snake_case")]
238pub struct ResultsTotals {
239    pub files_total: u64,
240    pub rows_total: u64,
241    pub accepted_total: u64,
242    pub rejected_total: u64,
243    pub warnings_total: u64,
244    pub errors_total: u64,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
248#[serde(rename_all = "snake_case")]
249pub struct FileReport {
250    pub input_file: String,
251    pub status: FileStatus,
252    pub row_count: u64,
253    pub accepted_count: u64,
254    pub rejected_count: u64,
255    pub mismatch: FileMismatch,
256    pub output: FileOutput,
257    pub validation: FileValidation,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262pub struct FileMismatch {
263    pub declared_columns_count: u64,
264    pub input_columns_count: u64,
265    pub missing_columns: Vec<String>,
266    pub extra_columns: Vec<String>,
267    pub mismatch_action: MismatchAction,
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub error: Option<MismatchIssue>,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    pub warning: Option<String>,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275#[serde(rename_all = "snake_case")]
276pub struct MismatchIssue {
277    pub rule: String,
278    pub message: String,
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub struct FileOutput {
284    pub accepted_path: Option<String>,
285    pub rejected_path: Option<String>,
286    pub errors_path: Option<String>,
287    pub archived_path: Option<String>,
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "snake_case")]
292pub struct FileValidation {
293    pub errors: u64,
294    pub warnings: u64,
295    pub rules: Vec<RuleSummary>,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
299#[serde(rename_all = "snake_case")]
300pub struct RuleSummary {
301    pub rule: RuleName,
302    pub severity: Severity,
303    pub violations: u64,
304    pub columns: Vec<ColumnSummary>,
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
308#[serde(rename_all = "snake_case")]
309pub struct ColumnSummary {
310    pub column: String,
311    pub violations: u64,
312    #[serde(skip_serializing_if = "Option::is_none")]
313    pub target_type: Option<String>,
314    #[serde(skip_serializing_if = "Option::is_none")]
315    pub source: Option<String>,
316}
317
318#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
319#[serde(rename_all = "snake_case")]
320pub enum FileStatus {
321    Success,
322    Rejected,
323    Aborted,
324    Failed,
325}
326
327#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
328#[serde(rename_all = "snake_case")]
329pub enum RunStatus {
330    Success,
331    SuccessWithWarnings,
332    Rejected,
333    Aborted,
334    Failed,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
338#[serde(rename_all = "snake_case")]
339pub enum Severity {
340    Warn,
341    Reject,
342    Abort,
343}
344
345#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
346#[serde(rename_all = "snake_case")]
347pub enum RuleName {
348    NotNull,
349    CastError,
350    Unique,
351    SchemaError,
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
355#[serde(rename_all = "snake_case")]
356pub enum MismatchAction {
357    None,
358    FilledNulls,
359    IgnoredExtras,
360    RejectedFile,
361    Aborted,
362}
363
364#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
365#[serde(rename_all = "snake_case")]
366pub enum ResolvedInputMode {
367    Directory,
368    File,
369    Glob,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
373#[serde(rename_all = "snake_case")]
374pub enum SourceReadPlan {
375    RawAndTyped,
376}
377
378#[derive(Debug)]
379pub enum ReportError {
380    Io(std::io::Error),
381    Serialize(serde_json::Error),
382}
383
384impl std::fmt::Display for ReportError {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            ReportError::Io(err) => write!(f, "report io error: {err}"),
388            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
389        }
390    }
391}
392
393impl std::error::Error for ReportError {}
394
395impl From<std::io::Error> for ReportError {
396    fn from(err: std::io::Error) -> Self {
397        Self::Io(err)
398    }
399}
400
401impl From<serde_json::Error> for ReportError {
402    fn from(err: serde_json::Error) -> Self {
403        Self::Serialize(err)
404    }
405}
406
407pub trait ReportFormatter {
408    fn format_name(&self) -> &'static str;
409    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
410    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
411}
412
413pub struct JsonReportFormatter;
414
415impl ReportFormatter for JsonReportFormatter {
416    fn format_name(&self) -> &'static str {
417        "json"
418    }
419
420    fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
421        Ok(serde_json::to_string_pretty(report)?)
422    }
423
424    fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
425        Ok(serde_json::to_string_pretty(report)?)
426    }
427}
428
429pub fn now_rfc3339() -> String {
430    OffsetDateTime::now_utc()
431        .format(&Rfc3339)
432        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
433}
434
435pub fn run_id_from_timestamp(timestamp: &str) -> String {
436    timestamp.replace(':', "-")
437}
438
439pub struct ReportWriter;
440
441impl ReportWriter {
442    pub fn run_dir_name(run_id: &str) -> String {
443        format!("run_{run_id}")
444    }
445
446    pub fn report_file_name() -> String {
447        "run.json".to_string()
448    }
449
450    pub fn summary_file_name() -> String {
451        "run.summary.json".to_string()
452    }
453
454    pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
455        format!(
456            "{}/{}/{}",
457            Self::run_dir_name(run_id),
458            entity_name,
459            Self::report_file_name()
460        )
461    }
462
463    pub fn summary_relative_path(run_id: &str) -> String {
464        format!(
465            "{}/{}",
466            Self::run_dir_name(run_id),
467            Self::summary_file_name()
468        )
469    }
470
471    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
472        crate::io::storage::paths::normalize_local_path(
473            &report_dir
474                .join(Self::run_dir_name(run_id))
475                .join(entity_name),
476        )
477    }
478
479    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
480        crate::io::storage::paths::normalize_local_path(
481            &Self::entity_report_dir(report_dir, run_id, entity_name)
482                .join(Self::report_file_name()),
483        )
484    }
485
486    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
487        crate::io::storage::paths::normalize_local_path(
488            &report_dir
489                .join(Self::run_dir_name(run_id))
490                .join(Self::summary_file_name()),
491        )
492    }
493
494    pub fn write_report(
495        report_dir: &Path,
496        run_id: &str,
497        entity_name: &str,
498        report: &RunReport,
499    ) -> Result<PathBuf, ReportError> {
500        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
501        std::fs::create_dir_all(&entity_dir)?;
502        let report_path = Self::report_path(report_dir, run_id, entity_name);
503        let tmp_path = entity_dir.join(format!(
504            "{}.tmp-{}",
505            Self::report_file_name(),
506            unique_suffix()
507        ));
508
509        let json = serde_json::to_string_pretty(report)?;
510        let mut file = File::create(&tmp_path)?;
511        file.write_all(json.as_bytes())?;
512        file.sync_all()?;
513        std::fs::rename(&tmp_path, &report_path)?;
514
515        Ok(report_path)
516    }
517
518    pub fn write_summary(
519        report_dir: &Path,
520        run_id: &str,
521        report: &RunSummaryReport,
522    ) -> Result<PathBuf, ReportError> {
523        let run_dir = report_dir.join(Self::run_dir_name(run_id));
524        let run_dir = crate::io::storage::paths::normalize_local_path(&run_dir);
525        std::fs::create_dir_all(&run_dir)?;
526        let report_path = Self::summary_path(report_dir, run_id);
527        let tmp_path = run_dir.join(format!(
528            "{}.tmp-{}",
529            Self::summary_file_name(),
530            unique_suffix()
531        ));
532
533        let json = serde_json::to_string_pretty(report)?;
534        let mut file = File::create(&tmp_path)?;
535        file.write_all(json.as_bytes())?;
536        file.sync_all()?;
537        std::fs::rename(&tmp_path, &report_path)?;
538
539        Ok(report_path)
540    }
541}
542
543pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
544    if file_statuses.contains(&FileStatus::Failed) {
545        return (RunStatus::Failed, 1);
546    }
547    if file_statuses.contains(&FileStatus::Aborted) {
548        return (RunStatus::Aborted, 2);
549    }
550    if file_statuses.contains(&FileStatus::Rejected) {
551        return (RunStatus::Rejected, 0);
552    }
553    (RunStatus::Success, 0)
554}
555
556fn unique_suffix() -> String {
557    let nanos = SystemTime::now()
558        .duration_since(UNIX_EPOCH)
559        .map(|duration| duration.as_nanos())
560        .unwrap_or(0);
561    format!("{}-{}", std::process::id(), nanos)
562}