Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12    pub spec_version: String,
13    pub entity: EntityEcho,
14    pub source: SourceEcho,
15    pub sink: SinkEcho,
16    pub policy: PolicyEcho,
17    pub results: ResultsTotals,
18    pub files: Vec<FileReport>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23pub struct RunSummaryReport {
24    pub spec_version: String,
25    pub tool: ToolInfo,
26    pub run: RunInfo,
27    pub config: ConfigEcho,
28    pub report: ReportEcho,
29    pub results: ResultsTotals,
30    pub entities: Vec<EntitySummary>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "snake_case")]
35pub struct EntitySummary {
36    pub name: String,
37    pub status: RunStatus,
38    pub results: ResultsTotals,
39    pub report_file: String,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub struct ToolInfo {
45    pub name: String,
46    pub version: String,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub git: Option<GitInfo>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub struct GitInfo {
54    pub commit: String,
55    pub dirty: bool,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub struct RunInfo {
61    pub run_id: String,
62    pub started_at: String,
63    pub finished_at: String,
64    pub duration_ms: u64,
65    pub status: RunStatus,
66    pub exit_code: i32,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub struct ConfigEcho {
72    pub path: String,
73    pub version: String,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub metadata: Option<serde_json::Value>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub struct EntityEcho {
81    pub name: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub metadata: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub struct SourceEcho {
89    pub format: String,
90    pub path: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub options: Option<serde_json::Value>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub cast_mode: Option<String>,
95    pub read_plan: SourceReadPlan,
96    pub resolved_inputs: ResolvedInputs,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub struct ResolvedInputs {
102    pub mode: ResolvedInputMode,
103    pub file_count: u64,
104    pub files: Vec<String>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub struct SinkEcho {
110    pub accepted: SinkTargetEcho,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub rejected: Option<SinkTargetEcho>,
113    pub archive: SinkArchiveEcho,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub struct SinkTargetEcho {
119    pub format: String,
120    pub path: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub struct ReportEcho {
126    pub path: String,
127    pub report_file: String,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub struct SinkArchiveEcho {
133    pub enabled: bool,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub path: Option<String>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub struct PolicyEcho {
141    pub severity: Severity,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "snake_case")]
146pub struct ResultsTotals {
147    pub files_total: u64,
148    pub rows_total: u64,
149    pub accepted_total: u64,
150    pub rejected_total: u64,
151    pub warnings_total: u64,
152    pub errors_total: u64,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156#[serde(rename_all = "snake_case")]
157pub struct FileReport {
158    pub input_file: String,
159    pub status: FileStatus,
160    pub row_count: u64,
161    pub accepted_count: u64,
162    pub rejected_count: u64,
163    pub mismatch: FileMismatch,
164    pub output: FileOutput,
165    pub validation: FileValidation,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub struct FileMismatch {
171    pub declared_columns_count: u64,
172    pub input_columns_count: u64,
173    pub missing_columns: Vec<String>,
174    pub extra_columns: Vec<String>,
175    pub mismatch_action: MismatchAction,
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub error: Option<MismatchIssue>,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub warning: Option<String>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub struct MismatchIssue {
185    pub rule: String,
186    pub message: String,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190#[serde(rename_all = "snake_case")]
191pub struct FileOutput {
192    pub accepted_path: Option<String>,
193    pub rejected_path: Option<String>,
194    pub errors_path: Option<String>,
195    pub archived_path: Option<String>,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(rename_all = "snake_case")]
200pub struct FileValidation {
201    pub errors: u64,
202    pub warnings: u64,
203    pub rules: Vec<RuleSummary>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207#[serde(rename_all = "snake_case")]
208pub struct RuleSummary {
209    pub rule: RuleName,
210    pub severity: Severity,
211    pub violations: u64,
212    pub columns: Vec<ColumnSummary>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217pub struct ColumnSummary {
218    pub column: String,
219    pub violations: u64,
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub target_type: Option<String>,
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub enum FileStatus {
227    Success,
228    Rejected,
229    Aborted,
230    Failed,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub enum RunStatus {
236    Success,
237    SuccessWithWarnings,
238    Rejected,
239    Aborted,
240    Failed,
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum Severity {
246    Warn,
247    Reject,
248    Abort,
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252#[serde(rename_all = "snake_case")]
253pub enum RuleName {
254    NotNull,
255    CastError,
256    Unique,
257    SchemaError,
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262pub enum MismatchAction {
263    None,
264    FilledNulls,
265    IgnoredExtras,
266    RejectedFile,
267    Aborted,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
271#[serde(rename_all = "snake_case")]
272pub enum ResolvedInputMode {
273    Directory,
274    File,
275    Glob,
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
279#[serde(rename_all = "snake_case")]
280pub enum SourceReadPlan {
281    RawAndTyped,
282}
283
284#[derive(Debug)]
285pub enum ReportError {
286    Io(std::io::Error),
287    Serialize(serde_json::Error),
288}
289
290impl std::fmt::Display for ReportError {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        match self {
293            ReportError::Io(err) => write!(f, "report io error: {err}"),
294            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
295        }
296    }
297}
298
299impl std::error::Error for ReportError {}
300
301impl From<std::io::Error> for ReportError {
302    fn from(err: std::io::Error) -> Self {
303        Self::Io(err)
304    }
305}
306
307impl From<serde_json::Error> for ReportError {
308    fn from(err: serde_json::Error) -> Self {
309        Self::Serialize(err)
310    }
311}
312
313pub fn now_rfc3339() -> String {
314    OffsetDateTime::now_utc()
315        .format(&Rfc3339)
316        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
317}
318
319pub fn run_id_from_timestamp(timestamp: &str) -> String {
320    timestamp.replace(':', "-")
321}
322
323pub struct ReportWriter;
324
325impl ReportWriter {
326    pub fn run_dir_name(run_id: &str) -> String {
327        format!("run_{run_id}")
328    }
329
330    pub fn report_file_name() -> String {
331        "run.json".to_string()
332    }
333
334    pub fn summary_file_name() -> String {
335        "run.summary.json".to_string()
336    }
337
338    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
339        report_dir
340            .join(Self::run_dir_name(run_id))
341            .join(entity_name)
342    }
343
344    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
345        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
346    }
347
348    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
349        report_dir
350            .join(Self::run_dir_name(run_id))
351            .join(Self::summary_file_name())
352    }
353
354    pub fn write_report(
355        report_dir: &Path,
356        run_id: &str,
357        entity_name: &str,
358        report: &RunReport,
359    ) -> Result<PathBuf, ReportError> {
360        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
361        std::fs::create_dir_all(&entity_dir)?;
362        let report_path = Self::report_path(report_dir, run_id, entity_name);
363        let tmp_path = entity_dir.join(format!(
364            "{}.tmp-{}",
365            Self::report_file_name(),
366            unique_suffix()
367        ));
368
369        let json = serde_json::to_string_pretty(report)?;
370        let mut file = File::create(&tmp_path)?;
371        file.write_all(json.as_bytes())?;
372        file.sync_all()?;
373        std::fs::rename(&tmp_path, &report_path)?;
374
375        Ok(report_path)
376    }
377
378    pub fn write_summary(
379        report_dir: &Path,
380        run_id: &str,
381        report: &RunSummaryReport,
382    ) -> Result<PathBuf, ReportError> {
383        let run_dir = report_dir.join(Self::run_dir_name(run_id));
384        std::fs::create_dir_all(&run_dir)?;
385        let report_path = Self::summary_path(report_dir, run_id);
386        let tmp_path = run_dir.join(format!(
387            "{}.tmp-{}",
388            Self::summary_file_name(),
389            unique_suffix()
390        ));
391
392        let json = serde_json::to_string_pretty(report)?;
393        let mut file = File::create(&tmp_path)?;
394        file.write_all(json.as_bytes())?;
395        file.sync_all()?;
396        std::fs::rename(&tmp_path, &report_path)?;
397
398        Ok(report_path)
399    }
400}
401
402pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
403    if file_statuses.contains(&FileStatus::Failed) {
404        return (RunStatus::Failed, 1);
405    }
406    if file_statuses.contains(&FileStatus::Aborted) {
407        return (RunStatus::Aborted, 2);
408    }
409    if file_statuses.contains(&FileStatus::Rejected) {
410        return (RunStatus::Rejected, 0);
411    }
412    (RunStatus::Success, 0)
413}
414
415fn unique_suffix() -> String {
416    let nanos = SystemTime::now()
417        .duration_since(UNIX_EPOCH)
418        .map(|duration| duration.as_nanos())
419        .unwrap_or(0);
420    format!("{}-{}", std::process::id(), nanos)
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426
427    fn sample_report() -> RunReport {
428        RunReport {
429            spec_version: "0.1".to_string(),
430            entity: EntityEcho {
431                name: "customer".to_string(),
432                metadata: None,
433            },
434            source: SourceEcho {
435                format: "csv".to_string(),
436                path: "/tmp/input".to_string(),
437                options: None,
438                cast_mode: Some("strict".to_string()),
439                read_plan: SourceReadPlan::RawAndTyped,
440                resolved_inputs: ResolvedInputs {
441                    mode: ResolvedInputMode::Directory,
442                    file_count: 1,
443                    files: vec!["/tmp/input/file.csv".to_string()],
444                },
445            },
446            sink: SinkEcho {
447                accepted: SinkTargetEcho {
448                    format: "parquet".to_string(),
449                    path: "/tmp/out/accepted".to_string(),
450                },
451                rejected: Some(SinkTargetEcho {
452                    format: "csv".to_string(),
453                    path: "/tmp/out/rejected".to_string(),
454                }),
455                archive: SinkArchiveEcho {
456                    enabled: false,
457                    path: None,
458                },
459            },
460            policy: PolicyEcho {
461                severity: Severity::Warn,
462            },
463            results: ResultsTotals {
464                files_total: 1,
465                rows_total: 10,
466                accepted_total: 10,
467                rejected_total: 0,
468                warnings_total: 0,
469                errors_total: 0,
470            },
471            files: vec![FileReport {
472                input_file: "/tmp/input/file.csv".to_string(),
473                status: FileStatus::Success,
474                row_count: 10,
475                accepted_count: 10,
476                rejected_count: 0,
477                mismatch: FileMismatch {
478                    declared_columns_count: 1,
479                    input_columns_count: 1,
480                    missing_columns: Vec::new(),
481                    extra_columns: Vec::new(),
482                    mismatch_action: MismatchAction::None,
483                    error: None,
484                    warning: None,
485                },
486                output: FileOutput {
487                    accepted_path: Some("/tmp/out/accepted/file.parquet".to_string()),
488                    rejected_path: None,
489                    errors_path: None,
490                    archived_path: None,
491                },
492                validation: FileValidation {
493                    errors: 0,
494                    warnings: 0,
495                    rules: Vec::new(),
496                },
497            }],
498        }
499    }
500
501    fn sample_summary() -> RunSummaryReport {
502        RunSummaryReport {
503            spec_version: "0.1".to_string(),
504            tool: ToolInfo {
505                name: "floe".to_string(),
506                version: env!("CARGO_PKG_VERSION").to_string(),
507                git: None,
508            },
509            run: RunInfo {
510                run_id: "2026-01-19T10-23-45Z".to_string(),
511                started_at: "2026-01-19T10-23-45Z".to_string(),
512                finished_at: "2026-01-19T10-23-46Z".to_string(),
513                duration_ms: 1000,
514                status: RunStatus::Success,
515                exit_code: 0,
516            },
517            config: ConfigEcho {
518                path: "/tmp/config.yml".to_string(),
519                version: "0.1".to_string(),
520                metadata: None,
521            },
522            report: ReportEcho {
523                path: "/tmp/out/reports".to_string(),
524                report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/run.summary.json"
525                    .to_string(),
526            },
527            results: ResultsTotals {
528                files_total: 1,
529                rows_total: 10,
530                accepted_total: 10,
531                rejected_total: 0,
532                warnings_total: 0,
533                errors_total: 0,
534            },
535            entities: vec![EntitySummary {
536                name: "customer".to_string(),
537                status: RunStatus::Success,
538                results: ResultsTotals {
539                    files_total: 1,
540                    rows_total: 10,
541                    accepted_total: 10,
542                    rejected_total: 0,
543                    warnings_total: 0,
544                    errors_total: 0,
545                },
546                report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json"
547                    .to_string(),
548            }],
549        }
550    }
551
552    #[test]
553    fn report_serializes_expected_keys() {
554        let report = sample_report();
555        let value = serde_json::to_value(&report).expect("serialize report");
556        let object = value.as_object().expect("report object");
557        assert!(object.contains_key("spec_version"));
558        assert!(object.contains_key("entity"));
559        assert!(object.contains_key("source"));
560        assert!(object.contains_key("sink"));
561        assert!(object.contains_key("policy"));
562        assert!(object.contains_key("results"));
563        assert!(object.contains_key("files"));
564    }
565
566    #[test]
567    fn report_file_name_matches_format() {
568        let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
569        assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
570        let name = ReportWriter::report_file_name();
571        assert_eq!(name, "run.json");
572    }
573
574    #[test]
575    fn compute_run_outcome_table() {
576        let (status, code) = compute_run_outcome(&[]);
577        assert_eq!(status, RunStatus::Success);
578        assert_eq!(code, 0);
579
580        let (status, code) = compute_run_outcome(&[FileStatus::Success]);
581        assert_eq!(status, RunStatus::Success);
582        assert_eq!(code, 0);
583
584        let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
585        assert_eq!(status, RunStatus::Rejected);
586        assert_eq!(code, 0);
587
588        let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
589        assert_eq!(status, RunStatus::Aborted);
590        assert_eq!(code, 2);
591
592        let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
593        assert_eq!(status, RunStatus::Failed);
594        assert_eq!(code, 1);
595
596        let (status, code) = compute_run_outcome(&[
597            FileStatus::Success,
598            FileStatus::Rejected,
599            FileStatus::Aborted,
600        ]);
601        assert_eq!(status, RunStatus::Aborted);
602        assert_eq!(code, 2);
603
604        let (status, code) = compute_run_outcome(&[
605            FileStatus::Success,
606            FileStatus::Rejected,
607            FileStatus::Failed,
608        ]);
609        assert_eq!(status, RunStatus::Failed);
610        assert_eq!(code, 1);
611    }
612
613    #[test]
614    fn write_report_writes_json_file() {
615        let report = sample_report();
616        let run_id = "2026-01-19T10-23-45Z";
617        let mut dir = std::env::temp_dir();
618        dir.push(format!("floe-report-tests-{}", unique_suffix()));
619        std::fs::create_dir_all(&dir).expect("create temp dir");
620
621        let report_path =
622            ReportWriter::write_report(&dir, run_id, "customer", &report).expect("write report");
623
624        assert!(report_path.exists());
625        let expected = dir
626            .join(format!("run_{run_id}"))
627            .join("customer")
628            .join("run.json");
629        assert_eq!(report_path, expected);
630        let contents = std::fs::read_to_string(&report_path).expect("read report");
631        let value: serde_json::Value = serde_json::from_str(&contents).expect("parse report");
632        assert!(value.get("entity").is_some());
633
634        let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
635            .expect("read dir")
636            .filter_map(|entry| entry.ok())
637            .filter(|entry| {
638                entry
639                    .file_name()
640                    .to_str()
641                    .map(|name| name.contains(".tmp-"))
642                    .unwrap_or(false)
643            })
644            .collect();
645        assert!(temp_files.is_empty());
646    }
647
648    #[test]
649    fn write_summary_writes_json_file() {
650        let summary = sample_summary();
651        let run_id = "2026-01-19T10-23-45Z";
652        let mut dir = std::env::temp_dir();
653        dir.push(format!("floe-summary-tests-{}", unique_suffix()));
654        std::fs::create_dir_all(&dir).expect("create temp dir");
655
656        let report_path =
657            ReportWriter::write_summary(&dir, run_id, &summary).expect("write summary");
658
659        assert!(report_path.exists());
660        let expected = dir.join(format!("run_{run_id}")).join("run.summary.json");
661        assert_eq!(report_path, expected);
662        let contents = std::fs::read_to_string(&report_path).expect("read summary");
663        let value: serde_json::Value = serde_json::from_str(&contents).expect("parse summary");
664        assert!(value.get("run").is_some());
665    }
666}