Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12    pub spec_version: String,
13    pub entity: EntityEcho,
14    pub source: SourceEcho,
15    pub sink: SinkEcho,
16    pub policy: PolicyEcho,
17    pub accepted_output: AcceptedOutputSummary,
18    pub results: ResultsTotals,
19    pub files: Vec<FileReport>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub struct RunSummaryReport {
25    pub spec_version: String,
26    pub tool: ToolInfo,
27    pub run: RunInfo,
28    pub config: ConfigEcho,
29    pub report: ReportEcho,
30    pub results: ResultsTotals,
31    pub entities: Vec<EntitySummary>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub struct EntitySummary {
37    pub name: String,
38    pub status: RunStatus,
39    pub results: ResultsTotals,
40    pub report_file: String,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub struct ToolInfo {
46    pub name: String,
47    pub version: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub git: Option<GitInfo>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub struct GitInfo {
55    pub commit: String,
56    pub dirty: bool,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub struct RunInfo {
62    pub run_id: String,
63    pub started_at: String,
64    pub finished_at: String,
65    pub duration_ms: u64,
66    pub status: RunStatus,
67    pub exit_code: i32,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub struct ConfigEcho {
73    pub path: String,
74    pub version: String,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub metadata: Option<serde_json::Value>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub struct EntityEcho {
82    pub name: String,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub metadata: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub struct SourceEcho {
90    pub format: String,
91    pub path: String,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub options: Option<serde_json::Value>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub cast_mode: Option<String>,
96    pub read_plan: SourceReadPlan,
97    pub resolved_inputs: ResolvedInputs,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub struct ResolvedInputs {
103    pub mode: ResolvedInputMode,
104    pub file_count: u64,
105    pub files: Vec<String>,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "snake_case")]
110pub struct SinkEcho {
111    pub accepted: SinkTargetEcho,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub rejected: Option<SinkTargetEcho>,
114    pub archive: SinkArchiveEcho,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "snake_case")]
119pub struct AcceptedOutputSummary {
120    pub path: String,
121    pub accepted_rows: u64,
122    pub parts_written: u64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub struct SinkTargetEcho {
128    pub format: String,
129    pub path: String,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub struct ReportEcho {
135    pub path: String,
136    pub report_file: String,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub struct SinkArchiveEcho {
142    pub enabled: bool,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub path: Option<String>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148#[serde(rename_all = "snake_case")]
149pub struct PolicyEcho {
150    pub severity: Severity,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub struct ResultsTotals {
156    pub files_total: u64,
157    pub rows_total: u64,
158    pub accepted_total: u64,
159    pub rejected_total: u64,
160    pub warnings_total: u64,
161    pub errors_total: u64,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub struct FileReport {
167    pub input_file: String,
168    pub status: FileStatus,
169    pub row_count: u64,
170    pub accepted_count: u64,
171    pub rejected_count: u64,
172    pub mismatch: FileMismatch,
173    pub output: FileOutput,
174    pub validation: FileValidation,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[serde(rename_all = "snake_case")]
179pub struct FileMismatch {
180    pub declared_columns_count: u64,
181    pub input_columns_count: u64,
182    pub missing_columns: Vec<String>,
183    pub extra_columns: Vec<String>,
184    pub mismatch_action: MismatchAction,
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub error: Option<MismatchIssue>,
187    #[serde(skip_serializing_if = "Option::is_none")]
188    pub warning: Option<String>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192#[serde(rename_all = "snake_case")]
193pub struct MismatchIssue {
194    pub rule: String,
195    pub message: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(rename_all = "snake_case")]
200pub struct FileOutput {
201    pub accepted_path: Option<String>,
202    pub rejected_path: Option<String>,
203    pub errors_path: Option<String>,
204    pub archived_path: Option<String>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208#[serde(rename_all = "snake_case")]
209pub struct FileValidation {
210    pub errors: u64,
211    pub warnings: u64,
212    pub rules: Vec<RuleSummary>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217pub struct RuleSummary {
218    pub rule: RuleName,
219    pub severity: Severity,
220    pub violations: u64,
221    pub columns: Vec<ColumnSummary>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub struct ColumnSummary {
227    pub column: String,
228    pub violations: u64,
229    #[serde(skip_serializing_if = "Option::is_none")]
230    pub target_type: Option<String>,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub enum FileStatus {
236    Success,
237    Rejected,
238    Aborted,
239    Failed,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
243#[serde(rename_all = "snake_case")]
244pub enum RunStatus {
245    Success,
246    SuccessWithWarnings,
247    Rejected,
248    Aborted,
249    Failed,
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum Severity {
255    Warn,
256    Reject,
257    Abort,
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262pub enum RuleName {
263    NotNull,
264    CastError,
265    Unique,
266    SchemaError,
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
270#[serde(rename_all = "snake_case")]
271pub enum MismatchAction {
272    None,
273    FilledNulls,
274    IgnoredExtras,
275    RejectedFile,
276    Aborted,
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(rename_all = "snake_case")]
281pub enum ResolvedInputMode {
282    Directory,
283    File,
284    Glob,
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
288#[serde(rename_all = "snake_case")]
289pub enum SourceReadPlan {
290    RawAndTyped,
291}
292
293#[derive(Debug)]
294pub enum ReportError {
295    Io(std::io::Error),
296    Serialize(serde_json::Error),
297}
298
299impl std::fmt::Display for ReportError {
300    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301        match self {
302            ReportError::Io(err) => write!(f, "report io error: {err}"),
303            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
304        }
305    }
306}
307
308impl std::error::Error for ReportError {}
309
310impl From<std::io::Error> for ReportError {
311    fn from(err: std::io::Error) -> Self {
312        Self::Io(err)
313    }
314}
315
316impl From<serde_json::Error> for ReportError {
317    fn from(err: serde_json::Error) -> Self {
318        Self::Serialize(err)
319    }
320}
321
322pub fn now_rfc3339() -> String {
323    OffsetDateTime::now_utc()
324        .format(&Rfc3339)
325        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
326}
327
328pub fn run_id_from_timestamp(timestamp: &str) -> String {
329    timestamp.replace(':', "-")
330}
331
332pub struct ReportWriter;
333
334impl ReportWriter {
335    pub fn run_dir_name(run_id: &str) -> String {
336        format!("run_{run_id}")
337    }
338
339    pub fn report_file_name() -> String {
340        "run.json".to_string()
341    }
342
343    pub fn summary_file_name() -> String {
344        "run.summary.json".to_string()
345    }
346
347    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
348        report_dir
349            .join(Self::run_dir_name(run_id))
350            .join(entity_name)
351    }
352
353    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
354        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
355    }
356
357    pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
358        report_dir
359            .join(Self::run_dir_name(run_id))
360            .join(Self::summary_file_name())
361    }
362
363    pub fn write_report(
364        report_dir: &Path,
365        run_id: &str,
366        entity_name: &str,
367        report: &RunReport,
368    ) -> Result<PathBuf, ReportError> {
369        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
370        std::fs::create_dir_all(&entity_dir)?;
371        let report_path = Self::report_path(report_dir, run_id, entity_name);
372        let tmp_path = entity_dir.join(format!(
373            "{}.tmp-{}",
374            Self::report_file_name(),
375            unique_suffix()
376        ));
377
378        let json = serde_json::to_string_pretty(report)?;
379        let mut file = File::create(&tmp_path)?;
380        file.write_all(json.as_bytes())?;
381        file.sync_all()?;
382        std::fs::rename(&tmp_path, &report_path)?;
383
384        Ok(report_path)
385    }
386
387    pub fn write_summary(
388        report_dir: &Path,
389        run_id: &str,
390        report: &RunSummaryReport,
391    ) -> Result<PathBuf, ReportError> {
392        let run_dir = report_dir.join(Self::run_dir_name(run_id));
393        std::fs::create_dir_all(&run_dir)?;
394        let report_path = Self::summary_path(report_dir, run_id);
395        let tmp_path = run_dir.join(format!(
396            "{}.tmp-{}",
397            Self::summary_file_name(),
398            unique_suffix()
399        ));
400
401        let json = serde_json::to_string_pretty(report)?;
402        let mut file = File::create(&tmp_path)?;
403        file.write_all(json.as_bytes())?;
404        file.sync_all()?;
405        std::fs::rename(&tmp_path, &report_path)?;
406
407        Ok(report_path)
408    }
409}
410
411pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
412    if file_statuses.contains(&FileStatus::Failed) {
413        return (RunStatus::Failed, 1);
414    }
415    if file_statuses.contains(&FileStatus::Aborted) {
416        return (RunStatus::Aborted, 2);
417    }
418    if file_statuses.contains(&FileStatus::Rejected) {
419        return (RunStatus::Rejected, 0);
420    }
421    (RunStatus::Success, 0)
422}
423
424fn unique_suffix() -> String {
425    let nanos = SystemTime::now()
426        .duration_since(UNIX_EPOCH)
427        .map(|duration| duration.as_nanos())
428        .unwrap_or(0);
429    format!("{}-{}", std::process::id(), nanos)
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435
436    fn sample_report() -> RunReport {
437        RunReport {
438            spec_version: "0.1".to_string(),
439            entity: EntityEcho {
440                name: "customer".to_string(),
441                metadata: None,
442            },
443            source: SourceEcho {
444                format: "csv".to_string(),
445                path: "/tmp/input".to_string(),
446                options: None,
447                cast_mode: Some("strict".to_string()),
448                read_plan: SourceReadPlan::RawAndTyped,
449                resolved_inputs: ResolvedInputs {
450                    mode: ResolvedInputMode::Directory,
451                    file_count: 1,
452                    files: vec!["/tmp/input/file.csv".to_string()],
453                },
454            },
455            sink: SinkEcho {
456                accepted: SinkTargetEcho {
457                    format: "parquet".to_string(),
458                    path: "/tmp/out/accepted".to_string(),
459                },
460                rejected: Some(SinkTargetEcho {
461                    format: "csv".to_string(),
462                    path: "/tmp/out/rejected".to_string(),
463                }),
464                archive: SinkArchiveEcho {
465                    enabled: false,
466                    path: None,
467                },
468            },
469            policy: PolicyEcho {
470                severity: Severity::Warn,
471            },
472            accepted_output: AcceptedOutputSummary {
473                path: "/tmp/out/accepted".to_string(),
474                accepted_rows: 10,
475                parts_written: 1,
476            },
477            results: ResultsTotals {
478                files_total: 1,
479                rows_total: 10,
480                accepted_total: 10,
481                rejected_total: 0,
482                warnings_total: 0,
483                errors_total: 0,
484            },
485            files: vec![FileReport {
486                input_file: "/tmp/input/file.csv".to_string(),
487                status: FileStatus::Success,
488                row_count: 10,
489                accepted_count: 10,
490                rejected_count: 0,
491                mismatch: FileMismatch {
492                    declared_columns_count: 1,
493                    input_columns_count: 1,
494                    missing_columns: Vec::new(),
495                    extra_columns: Vec::new(),
496                    mismatch_action: MismatchAction::None,
497                    error: None,
498                    warning: None,
499                },
500                output: FileOutput {
501                    accepted_path: Some("/tmp/out/accepted".to_string()),
502                    rejected_path: None,
503                    errors_path: None,
504                    archived_path: None,
505                },
506                validation: FileValidation {
507                    errors: 0,
508                    warnings: 0,
509                    rules: Vec::new(),
510                },
511            }],
512        }
513    }
514
515    fn sample_summary() -> RunSummaryReport {
516        RunSummaryReport {
517            spec_version: "0.1".to_string(),
518            tool: ToolInfo {
519                name: "floe".to_string(),
520                version: env!("CARGO_PKG_VERSION").to_string(),
521                git: None,
522            },
523            run: RunInfo {
524                run_id: "2026-01-19T10-23-45Z".to_string(),
525                started_at: "2026-01-19T10-23-45Z".to_string(),
526                finished_at: "2026-01-19T10-23-46Z".to_string(),
527                duration_ms: 1000,
528                status: RunStatus::Success,
529                exit_code: 0,
530            },
531            config: ConfigEcho {
532                path: "/tmp/config.yml".to_string(),
533                version: "0.1".to_string(),
534                metadata: None,
535            },
536            report: ReportEcho {
537                path: "/tmp/out/reports".to_string(),
538                report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/run.summary.json"
539                    .to_string(),
540            },
541            results: ResultsTotals {
542                files_total: 1,
543                rows_total: 10,
544                accepted_total: 10,
545                rejected_total: 0,
546                warnings_total: 0,
547                errors_total: 0,
548            },
549            entities: vec![EntitySummary {
550                name: "customer".to_string(),
551                status: RunStatus::Success,
552                results: ResultsTotals {
553                    files_total: 1,
554                    rows_total: 10,
555                    accepted_total: 10,
556                    rejected_total: 0,
557                    warnings_total: 0,
558                    errors_total: 0,
559                },
560                report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json"
561                    .to_string(),
562            }],
563        }
564    }
565
566    #[test]
567    fn report_serializes_expected_keys() {
568        let report = sample_report();
569        let value = serde_json::to_value(&report).expect("serialize report");
570        let object = value.as_object().expect("report object");
571        assert!(object.contains_key("spec_version"));
572        assert!(object.contains_key("entity"));
573        assert!(object.contains_key("source"));
574        assert!(object.contains_key("sink"));
575        assert!(object.contains_key("policy"));
576        assert!(object.contains_key("accepted_output"));
577        assert!(object.contains_key("results"));
578        assert!(object.contains_key("files"));
579    }
580
581    #[test]
582    fn report_file_name_matches_format() {
583        let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
584        assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
585        let name = ReportWriter::report_file_name();
586        assert_eq!(name, "run.json");
587    }
588
589    #[test]
590    fn compute_run_outcome_table() {
591        let (status, code) = compute_run_outcome(&[]);
592        assert_eq!(status, RunStatus::Success);
593        assert_eq!(code, 0);
594
595        let (status, code) = compute_run_outcome(&[FileStatus::Success]);
596        assert_eq!(status, RunStatus::Success);
597        assert_eq!(code, 0);
598
599        let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
600        assert_eq!(status, RunStatus::Rejected);
601        assert_eq!(code, 0);
602
603        let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
604        assert_eq!(status, RunStatus::Aborted);
605        assert_eq!(code, 2);
606
607        let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
608        assert_eq!(status, RunStatus::Failed);
609        assert_eq!(code, 1);
610
611        let (status, code) = compute_run_outcome(&[
612            FileStatus::Success,
613            FileStatus::Rejected,
614            FileStatus::Aborted,
615        ]);
616        assert_eq!(status, RunStatus::Aborted);
617        assert_eq!(code, 2);
618
619        let (status, code) = compute_run_outcome(&[
620            FileStatus::Success,
621            FileStatus::Rejected,
622            FileStatus::Failed,
623        ]);
624        assert_eq!(status, RunStatus::Failed);
625        assert_eq!(code, 1);
626    }
627
628    #[test]
629    fn write_report_writes_json_file() {
630        let report = sample_report();
631        let run_id = "2026-01-19T10-23-45Z";
632        let mut dir = std::env::temp_dir();
633        dir.push(format!("floe-report-tests-{}", unique_suffix()));
634        std::fs::create_dir_all(&dir).expect("create temp dir");
635
636        let report_path =
637            ReportWriter::write_report(&dir, run_id, "customer", &report).expect("write report");
638
639        assert!(report_path.exists());
640        let expected = dir
641            .join(format!("run_{run_id}"))
642            .join("customer")
643            .join("run.json");
644        assert_eq!(report_path, expected);
645        let contents = std::fs::read_to_string(&report_path).expect("read report");
646        let value: serde_json::Value = serde_json::from_str(&contents).expect("parse report");
647        assert!(value.get("entity").is_some());
648
649        let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
650            .expect("read dir")
651            .filter_map(|entry| entry.ok())
652            .filter(|entry| {
653                entry
654                    .file_name()
655                    .to_str()
656                    .map(|name| name.contains(".tmp-"))
657                    .unwrap_or(false)
658            })
659            .collect();
660        assert!(temp_files.is_empty());
661    }
662
663    #[test]
664    fn write_summary_writes_json_file() {
665        let summary = sample_summary();
666        let run_id = "2026-01-19T10-23-45Z";
667        let mut dir = std::env::temp_dir();
668        dir.push(format!("floe-summary-tests-{}", unique_suffix()));
669        std::fs::create_dir_all(&dir).expect("create temp dir");
670
671        let report_path =
672            ReportWriter::write_summary(&dir, run_id, &summary).expect("write summary");
673
674        assert!(report_path.exists());
675        let expected = dir.join(format!("run_{run_id}")).join("run.summary.json");
676        assert_eq!(report_path, expected);
677        let contents = std::fs::read_to_string(&report_path).expect("read summary");
678        let value: serde_json::Value = serde_json::from_str(&contents).expect("parse summary");
679        assert!(value.get("run").is_some());
680    }
681}