Skip to main content

floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12    pub spec_version: String,
13    pub tool: ToolInfo,
14    pub run: RunInfo,
15    pub config: ConfigEcho,
16    pub entity: EntityEcho,
17    pub source: SourceEcho,
18    pub sink: SinkEcho,
19    pub report: ReportEcho,
20    pub policy: PolicyEcho,
21    pub results: ResultsTotals,
22    pub files: Vec<FileReport>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub struct ToolInfo {
28    pub name: String,
29    pub version: String,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub git: Option<GitInfo>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub struct GitInfo {
37    pub commit: String,
38    pub dirty: bool,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub struct RunInfo {
44    pub run_id: String,
45    pub started_at: String,
46    pub finished_at: String,
47    pub duration_ms: u64,
48    pub status: RunStatus,
49    pub exit_code: i32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub struct ConfigEcho {
55    pub path: String,
56    pub version: String,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub metadata: Option<serde_json::Value>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub struct EntityEcho {
64    pub name: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub metadata: Option<serde_json::Value>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub struct SourceEcho {
72    pub format: String,
73    pub path: String,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub options: Option<serde_json::Value>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub cast_mode: Option<String>,
78    pub read_plan: SourceReadPlan,
79    pub resolved_inputs: ResolvedInputs,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83#[serde(rename_all = "snake_case")]
84pub struct ResolvedInputs {
85    pub mode: ResolvedInputMode,
86    pub file_count: u64,
87    pub files: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub struct SinkEcho {
93    pub accepted: SinkTargetEcho,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub rejected: Option<SinkTargetEcho>,
96    pub archive: SinkArchiveEcho,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub struct SinkTargetEcho {
102    pub format: String,
103    pub path: String,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "snake_case")]
108pub struct ReportEcho {
109    pub path: String,
110    pub report_file: String,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115pub struct SinkArchiveEcho {
116    pub enabled: bool,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub path: Option<String>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct PolicyEcho {
124    pub severity: Severity,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "snake_case")]
129pub struct ResultsTotals {
130    pub files_total: u64,
131    pub rows_total: u64,
132    pub accepted_total: u64,
133    pub rejected_total: u64,
134    pub warnings_total: u64,
135    pub errors_total: u64,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub struct FileReport {
141    pub input_file: String,
142    pub status: FileStatus,
143    pub row_count: u64,
144    pub accepted_count: u64,
145    pub rejected_count: u64,
146    pub mismatch: FileMismatch,
147    pub output: FileOutput,
148    pub validation: FileValidation,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152#[serde(rename_all = "snake_case")]
153pub struct FileMismatch {
154    pub declared_columns_count: u64,
155    pub input_columns_count: u64,
156    pub missing_columns: Vec<String>,
157    pub extra_columns: Vec<String>,
158    pub mismatch_action: MismatchAction,
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub error: Option<MismatchIssue>,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub warning: Option<String>,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166#[serde(rename_all = "snake_case")]
167pub struct MismatchIssue {
168    pub rule: String,
169    pub message: String,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173#[serde(rename_all = "snake_case")]
174pub struct FileOutput {
175    pub accepted_path: Option<String>,
176    pub rejected_path: Option<String>,
177    pub errors_path: Option<String>,
178    pub archived_path: Option<String>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182#[serde(rename_all = "snake_case")]
183pub struct FileValidation {
184    pub errors: u64,
185    pub warnings: u64,
186    pub rules: Vec<RuleSummary>,
187    pub examples: ExampleSummary,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
191#[serde(rename_all = "snake_case")]
192pub struct RuleSummary {
193    pub rule: RuleName,
194    pub severity: Severity,
195    pub violations: u64,
196    pub columns: Vec<ColumnSummary>,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
200#[serde(rename_all = "snake_case")]
201pub struct ColumnSummary {
202    pub column: String,
203    pub violations: u64,
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub target_type: Option<String>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct ExampleSummary {
211    pub max_examples_per_rule: u64,
212    pub items: Vec<ValidationExample>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217pub struct ValidationExample {
218    pub rule: RuleName,
219    pub column: String,
220    pub row_index: u64,
221    pub message: String,
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub enum FileStatus {
227    Success,
228    Rejected,
229    Aborted,
230    Failed,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub enum RunStatus {
236    Success,
237    SuccessWithWarnings,
238    Rejected,
239    Aborted,
240    Failed,
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum Severity {
246    Warn,
247    Reject,
248    Abort,
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252#[serde(rename_all = "snake_case")]
253pub enum RuleName {
254    NotNull,
255    CastError,
256    Unique,
257    SchemaError,
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262pub enum MismatchAction {
263    None,
264    FilledNulls,
265    IgnoredExtras,
266    RejectedFile,
267    Aborted,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
271#[serde(rename_all = "snake_case")]
272pub enum ResolvedInputMode {
273    Directory,
274    File,
275    Glob,
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
279#[serde(rename_all = "snake_case")]
280pub enum SourceReadPlan {
281    RawAndTyped,
282}
283
284#[derive(Debug)]
285pub enum ReportError {
286    Io(std::io::Error),
287    Serialize(serde_json::Error),
288}
289
290impl std::fmt::Display for ReportError {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        match self {
293            ReportError::Io(err) => write!(f, "report io error: {err}"),
294            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
295        }
296    }
297}
298
299impl std::error::Error for ReportError {}
300
301impl From<std::io::Error> for ReportError {
302    fn from(err: std::io::Error) -> Self {
303        Self::Io(err)
304    }
305}
306
307impl From<serde_json::Error> for ReportError {
308    fn from(err: serde_json::Error) -> Self {
309        Self::Serialize(err)
310    }
311}
312
313pub fn now_rfc3339() -> String {
314    OffsetDateTime::now_utc()
315        .format(&Rfc3339)
316        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
317}
318
319pub fn run_id_from_timestamp(timestamp: &str) -> String {
320    timestamp.replace(':', "-")
321}
322
323pub struct ReportWriter;
324
325impl ReportWriter {
326    pub fn run_dir_name(run_id: &str) -> String {
327        format!("run_{run_id}")
328    }
329
330    pub fn report_file_name() -> String {
331        "run.json".to_string()
332    }
333
334    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
335        report_dir
336            .join(Self::run_dir_name(run_id))
337            .join(entity_name)
338    }
339
340    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
341        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
342    }
343
344    pub fn write_report(
345        report_dir: &Path,
346        run_id: &str,
347        entity_name: &str,
348        report: &RunReport,
349    ) -> Result<PathBuf, ReportError> {
350        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
351        std::fs::create_dir_all(&entity_dir)?;
352        let report_path = Self::report_path(report_dir, run_id, entity_name);
353        let tmp_path = entity_dir.join(format!(
354            "{}.tmp-{}",
355            Self::report_file_name(),
356            unique_suffix()
357        ));
358
359        let json = serde_json::to_string_pretty(report)?;
360        let mut file = File::create(&tmp_path)?;
361        file.write_all(json.as_bytes())?;
362        file.sync_all()?;
363        std::fs::rename(&tmp_path, &report_path)?;
364
365        Ok(report_path)
366    }
367}
368
369pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
370    if file_statuses.contains(&FileStatus::Failed) {
371        return (RunStatus::Failed, 1);
372    }
373    if file_statuses.contains(&FileStatus::Aborted) {
374        return (RunStatus::Aborted, 2);
375    }
376    if file_statuses.contains(&FileStatus::Rejected) {
377        return (RunStatus::Rejected, 0);
378    }
379    (RunStatus::Success, 0)
380}
381
382fn unique_suffix() -> String {
383    let nanos = SystemTime::now()
384        .duration_since(UNIX_EPOCH)
385        .map(|duration| duration.as_nanos())
386        .unwrap_or(0);
387    format!("{}-{}", std::process::id(), nanos)
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    fn sample_report() -> RunReport {
395        RunReport {
396            spec_version: "0.1".to_string(),
397            tool: ToolInfo {
398                name: "floe".to_string(),
399                version: env!("CARGO_PKG_VERSION").to_string(),
400                git: None,
401            },
402            run: RunInfo {
403                run_id: "2026-01-19T10-23-45Z".to_string(),
404                started_at: "2026-01-19T10-23-45Z".to_string(),
405                finished_at: "2026-01-19T10-23-46Z".to_string(),
406                duration_ms: 1000,
407                status: RunStatus::Success,
408                exit_code: 0,
409            },
410            config: ConfigEcho {
411                path: "/tmp/config.yml".to_string(),
412                version: "0.1".to_string(),
413                metadata: None,
414            },
415            entity: EntityEcho {
416                name: "customer".to_string(),
417                metadata: None,
418            },
419            source: SourceEcho {
420                format: "csv".to_string(),
421                path: "/tmp/input".to_string(),
422                options: None,
423                cast_mode: Some("strict".to_string()),
424                read_plan: SourceReadPlan::RawAndTyped,
425                resolved_inputs: ResolvedInputs {
426                    mode: ResolvedInputMode::Directory,
427                    file_count: 1,
428                    files: vec!["/tmp/input/file.csv".to_string()],
429                },
430            },
431            sink: SinkEcho {
432                accepted: SinkTargetEcho {
433                    format: "parquet".to_string(),
434                    path: "/tmp/out/accepted".to_string(),
435                },
436                rejected: Some(SinkTargetEcho {
437                    format: "csv".to_string(),
438                    path: "/tmp/out/rejected".to_string(),
439                }),
440                archive: SinkArchiveEcho {
441                    enabled: false,
442                    path: None,
443                },
444            },
445            report: ReportEcho {
446                path: "/tmp/out/reports".to_string(),
447                report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json"
448                    .to_string(),
449            },
450            policy: PolicyEcho {
451                severity: Severity::Warn,
452            },
453            results: ResultsTotals {
454                files_total: 1,
455                rows_total: 10,
456                accepted_total: 10,
457                rejected_total: 0,
458                warnings_total: 0,
459                errors_total: 0,
460            },
461            files: vec![FileReport {
462                input_file: "/tmp/input/file.csv".to_string(),
463                status: FileStatus::Success,
464                row_count: 10,
465                accepted_count: 10,
466                rejected_count: 0,
467                mismatch: FileMismatch {
468                    declared_columns_count: 1,
469                    input_columns_count: 1,
470                    missing_columns: Vec::new(),
471                    extra_columns: Vec::new(),
472                    mismatch_action: MismatchAction::None,
473                    error: None,
474                    warning: None,
475                },
476                output: FileOutput {
477                    accepted_path: Some("/tmp/out/accepted/file.parquet".to_string()),
478                    rejected_path: None,
479                    errors_path: None,
480                    archived_path: None,
481                },
482                validation: FileValidation {
483                    errors: 0,
484                    warnings: 0,
485                    rules: Vec::new(),
486                    examples: ExampleSummary {
487                        max_examples_per_rule: 3,
488                        items: Vec::new(),
489                    },
490                },
491            }],
492        }
493    }
494
495    #[test]
496    fn report_serializes_expected_keys() {
497        let report = sample_report();
498        let value = serde_json::to_value(&report).expect("serialize report");
499        let object = value.as_object().expect("report object");
500        assert!(object.contains_key("spec_version"));
501        assert!(object.contains_key("tool"));
502        assert!(object.contains_key("run"));
503        assert!(object.contains_key("config"));
504        assert!(object.contains_key("entity"));
505        assert!(object.contains_key("source"));
506        assert!(object.contains_key("sink"));
507        assert!(object.contains_key("report"));
508        assert!(object.contains_key("policy"));
509        assert!(object.contains_key("results"));
510        assert!(object.contains_key("files"));
511
512        let report_obj = object.get("report").expect("report");
513        let report_map = report_obj.as_object().expect("report map");
514        assert!(report_map.contains_key("report_file"));
515    }
516
517    #[test]
518    fn report_file_name_matches_format() {
519        let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
520        assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
521        let name = ReportWriter::report_file_name();
522        assert_eq!(name, "run.json");
523    }
524
525    #[test]
526    fn compute_run_outcome_table() {
527        let (status, code) = compute_run_outcome(&[]);
528        assert_eq!(status, RunStatus::Success);
529        assert_eq!(code, 0);
530
531        let (status, code) = compute_run_outcome(&[FileStatus::Success]);
532        assert_eq!(status, RunStatus::Success);
533        assert_eq!(code, 0);
534
535        let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
536        assert_eq!(status, RunStatus::Rejected);
537        assert_eq!(code, 0);
538
539        let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
540        assert_eq!(status, RunStatus::Aborted);
541        assert_eq!(code, 2);
542
543        let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
544        assert_eq!(status, RunStatus::Failed);
545        assert_eq!(code, 1);
546
547        let (status, code) = compute_run_outcome(&[
548            FileStatus::Success,
549            FileStatus::Rejected,
550            FileStatus::Aborted,
551        ]);
552        assert_eq!(status, RunStatus::Aborted);
553        assert_eq!(code, 2);
554
555        let (status, code) = compute_run_outcome(&[
556            FileStatus::Success,
557            FileStatus::Rejected,
558            FileStatus::Failed,
559        ]);
560        assert_eq!(status, RunStatus::Failed);
561        assert_eq!(code, 1);
562    }
563
564    #[test]
565    fn write_report_writes_json_file() {
566        let report = sample_report();
567        let mut dir = std::env::temp_dir();
568        dir.push(format!("floe-report-tests-{}", unique_suffix()));
569        std::fs::create_dir_all(&dir).expect("create temp dir");
570
571        let report_path = ReportWriter::write_report(&dir, &report.run.run_id, "customer", &report)
572            .expect("write report");
573
574        assert!(report_path.exists());
575        let expected = dir
576            .join("run_2026-01-19T10-23-45Z")
577            .join("customer")
578            .join("run.json");
579        assert_eq!(report_path, expected);
580        let contents = std::fs::read_to_string(&report_path).expect("read report");
581        let value: serde_json::Value = serde_json::from_str(&contents).expect("parse report");
582        assert_eq!(
583            value
584                .get("run")
585                .and_then(|run| run.get("run_id"))
586                .and_then(|run_id| run_id.as_str()),
587            Some("2026-01-19T10-23-45Z")
588        );
589
590        let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
591            .expect("read dir")
592            .filter_map(|entry| entry.ok())
593            .filter(|entry| {
594                entry
595                    .file_name()
596                    .to_str()
597                    .map(|name| name.contains(".tmp-"))
598                    .unwrap_or(false)
599            })
600            .collect();
601        assert!(temp_files.is_empty());
602    }
603}