floe_core/report/
mod.rs

1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12    pub spec_version: String,
13    pub tool: ToolInfo,
14    pub run: RunInfo,
15    pub config: ConfigEcho,
16    pub entity: EntityEcho,
17    pub source: SourceEcho,
18    pub sink: SinkEcho,
19    pub report: ReportEcho,
20    pub policy: PolicyEcho,
21    pub results: ResultsTotals,
22    pub files: Vec<FileReport>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub struct ToolInfo {
28    pub name: String,
29    pub version: String,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub git: Option<GitInfo>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub struct GitInfo {
37    pub commit: String,
38    pub dirty: bool,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub struct RunInfo {
44    pub run_id: String,
45    pub started_at: String,
46    pub finished_at: String,
47    pub duration_ms: u64,
48    pub status: RunStatus,
49    pub exit_code: i32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub struct ConfigEcho {
55    pub path: String,
56    pub version: String,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub metadata: Option<serde_json::Value>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub struct EntityEcho {
64    pub name: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub metadata: Option<serde_json::Value>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub struct SourceEcho {
72    pub format: String,
73    pub path: String,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub options: Option<serde_json::Value>,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub cast_mode: Option<String>,
78    pub read_plan: SourceReadPlan,
79    pub resolved_inputs: ResolvedInputs,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83#[serde(rename_all = "snake_case")]
84pub struct ResolvedInputs {
85    pub mode: ResolvedInputMode,
86    pub file_count: u64,
87    pub files: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub struct SinkEcho {
93    pub accepted: SinkTargetEcho,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub rejected: Option<SinkTargetEcho>,
96    pub archive: SinkArchiveEcho,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub struct SinkTargetEcho {
102    pub format: String,
103    pub path: String,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "snake_case")]
108pub struct ReportEcho {
109    pub path: String,
110    pub report_file: String,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115pub struct SinkArchiveEcho {
116    pub enabled: bool,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub path: Option<String>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct PolicyEcho {
124    pub severity: Severity,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "snake_case")]
129pub struct ResultsTotals {
130    pub files_total: u64,
131    pub rows_total: u64,
132    pub accepted_total: u64,
133    pub rejected_total: u64,
134    pub warnings_total: u64,
135    pub errors_total: u64,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub struct FileReport {
141    pub input_file: String,
142    pub status: FileStatus,
143    pub row_count: u64,
144    pub accepted_count: u64,
145    pub rejected_count: u64,
146    pub output: FileOutput,
147    pub validation: FileValidation,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151#[serde(rename_all = "snake_case")]
152pub struct FileOutput {
153    pub accepted_path: Option<String>,
154    pub rejected_path: Option<String>,
155    pub errors_path: Option<String>,
156    pub archived_path: Option<String>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(rename_all = "snake_case")]
161pub struct FileValidation {
162    pub errors: u64,
163    pub warnings: u64,
164    pub rules: Vec<RuleSummary>,
165    pub examples: ExampleSummary,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub struct RuleSummary {
171    pub rule: RuleName,
172    pub severity: Severity,
173    pub violations: u64,
174    pub columns: Vec<ColumnSummary>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[serde(rename_all = "snake_case")]
179pub struct ColumnSummary {
180    pub column: String,
181    pub violations: u64,
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub target_type: Option<String>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187#[serde(rename_all = "snake_case")]
188pub struct ExampleSummary {
189    pub max_examples_per_rule: u64,
190    pub items: Vec<ValidationExample>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194#[serde(rename_all = "snake_case")]
195pub struct ValidationExample {
196    pub rule: RuleName,
197    pub column: String,
198    pub row_index: u64,
199    pub message: String,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
203#[serde(rename_all = "snake_case")]
204pub enum FileStatus {
205    Success,
206    Rejected,
207    Aborted,
208    Failed,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
212#[serde(rename_all = "snake_case")]
213pub enum RunStatus {
214    Success,
215    SuccessWithWarnings,
216    Rejected,
217    Aborted,
218    Failed,
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
222#[serde(rename_all = "snake_case")]
223pub enum Severity {
224    Warn,
225    Reject,
226    Abort,
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
230#[serde(rename_all = "snake_case")]
231pub enum RuleName {
232    NotNull,
233    CastError,
234    Unique,
235    SchemaError,
236}
237
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
240#[serde(rename_all = "snake_case")]
241pub enum ResolvedInputMode {
242    Directory,
243    File,
244    Glob,
245}
246
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
248#[serde(rename_all = "snake_case")]
249pub enum SourceReadPlan {
250    RawAndTyped,
251}
252
253#[derive(Debug)]
254pub enum ReportError {
255    Io(std::io::Error),
256    Serialize(serde_json::Error),
257}
258
259impl std::fmt::Display for ReportError {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        match self {
262            ReportError::Io(err) => write!(f, "report io error: {err}"),
263            ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
264        }
265    }
266}
267
268impl std::error::Error for ReportError {}
269
270impl From<std::io::Error> for ReportError {
271    fn from(err: std::io::Error) -> Self {
272        Self::Io(err)
273    }
274}
275
276impl From<serde_json::Error> for ReportError {
277    fn from(err: serde_json::Error) -> Self {
278        Self::Serialize(err)
279    }
280}
281
282pub fn now_rfc3339() -> String {
283    OffsetDateTime::now_utc()
284        .format(&Rfc3339)
285        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
286}
287
288pub fn run_id_from_timestamp(timestamp: &str) -> String {
289    timestamp.replace(':', "-")
290}
291
292pub struct ReportWriter;
293
294impl ReportWriter {
295    pub fn run_dir_name(run_id: &str) -> String {
296        format!("run_{run_id}")
297    }
298
299    pub fn report_file_name() -> String {
300        "run.json".to_string()
301    }
302
303    pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
304        report_dir
305            .join(Self::run_dir_name(run_id))
306            .join(entity_name)
307    }
308
309    pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
310        Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
311    }
312
313    pub fn write_report(
314        report_dir: &Path,
315        run_id: &str,
316        entity_name: &str,
317        report: &RunReport,
318    ) -> Result<PathBuf, ReportError> {
319        let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
320        std::fs::create_dir_all(&entity_dir)?;
321        let report_path = Self::report_path(report_dir, run_id, entity_name);
322        let tmp_path = entity_dir.join(format!(
323            "{}.tmp-{}",
324            Self::report_file_name(),
325            unique_suffix()
326        ));
327
328        let json = serde_json::to_string_pretty(report)?;
329        let mut file = File::create(&tmp_path)?;
330        file.write_all(json.as_bytes())?;
331        file.sync_all()?;
332        std::fs::rename(&tmp_path, &report_path)?;
333
334        Ok(report_path)
335    }
336}
337
338pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
339    if file_statuses.iter().any(|status| *status == FileStatus::Failed) {
340        return (RunStatus::Failed, 1);
341    }
342    if file_statuses
343        .iter()
344        .any(|status| *status == FileStatus::Aborted)
345    {
346        return (RunStatus::Aborted, 2);
347    }
348    if file_statuses
349        .iter()
350        .any(|status| *status == FileStatus::Rejected)
351    {
352        return (RunStatus::Rejected, 0);
353    }
354    (RunStatus::Success, 0)
355}
356
357fn unique_suffix() -> String {
358    let nanos = SystemTime::now()
359        .duration_since(UNIX_EPOCH)
360        .map(|duration| duration.as_nanos())
361        .unwrap_or(0);
362    format!("{}-{}", std::process::id(), nanos)
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    fn sample_report() -> RunReport {
370        RunReport {
371            spec_version: "0.1".to_string(),
372            tool: ToolInfo {
373                name: "floe".to_string(),
374                version: "0.1.0".to_string(),
375                git: None,
376            },
377            run: RunInfo {
378                run_id: "2026-01-19T10-23-45Z".to_string(),
379                started_at: "2026-01-19T10-23-45Z".to_string(),
380                finished_at: "2026-01-19T10-23-46Z".to_string(),
381                duration_ms: 1000,
382                status: RunStatus::Success,
383                exit_code: 0,
384            },
385            config: ConfigEcho {
386                path: "/tmp/config.yml".to_string(),
387                version: "0.1".to_string(),
388                metadata: None,
389            },
390            entity: EntityEcho {
391                name: "customer".to_string(),
392                metadata: None,
393            },
394            source: SourceEcho {
395                format: "csv".to_string(),
396                path: "/tmp/input".to_string(),
397                options: None,
398                cast_mode: Some("strict".to_string()),
399                read_plan: SourceReadPlan::RawAndTyped,
400                resolved_inputs: ResolvedInputs {
401                    mode: ResolvedInputMode::Directory,
402                    file_count: 1,
403                    files: vec!["/tmp/input/file.csv".to_string()],
404                },
405            },
406            sink: SinkEcho {
407                accepted: SinkTargetEcho {
408                    format: "parquet".to_string(),
409                    path: "/tmp/out/accepted".to_string(),
410                },
411                rejected: Some(SinkTargetEcho {
412                    format: "csv".to_string(),
413                    path: "/tmp/out/rejected".to_string(),
414                }),
415                archive: SinkArchiveEcho {
416                    enabled: false,
417                    path: None,
418                },
419            },
420            report: ReportEcho {
421                path: "/tmp/out/reports".to_string(),
422                report_file:
423                    "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json".to_string(),
424            },
425            policy: PolicyEcho {
426                severity: Severity::Warn,
427            },
428            results: ResultsTotals {
429                files_total: 1,
430                rows_total: 10,
431                accepted_total: 10,
432                rejected_total: 0,
433                warnings_total: 0,
434                errors_total: 0,
435            },
436            files: vec![FileReport {
437                input_file: "/tmp/input/file.csv".to_string(),
438                status: FileStatus::Success,
439                row_count: 10,
440                accepted_count: 10,
441                rejected_count: 0,
442                output: FileOutput {
443                    accepted_path: Some("/tmp/out/accepted/file.parquet".to_string()),
444                    rejected_path: None,
445                    errors_path: None,
446                    archived_path: None,
447                },
448                validation: FileValidation {
449                    errors: 0,
450                    warnings: 0,
451                    rules: Vec::new(),
452                    examples: ExampleSummary {
453                        max_examples_per_rule: 3,
454                        items: Vec::new(),
455                    },
456                },
457            }],
458        }
459    }
460
461    #[test]
462    fn report_serializes_expected_keys() {
463        let report = sample_report();
464        let value = serde_json::to_value(&report).expect("serialize report");
465        let object = value.as_object().expect("report object");
466        assert!(object.contains_key("spec_version"));
467        assert!(object.contains_key("tool"));
468        assert!(object.contains_key("run"));
469        assert!(object.contains_key("config"));
470        assert!(object.contains_key("entity"));
471        assert!(object.contains_key("source"));
472        assert!(object.contains_key("sink"));
473        assert!(object.contains_key("report"));
474        assert!(object.contains_key("policy"));
475        assert!(object.contains_key("results"));
476        assert!(object.contains_key("files"));
477
478        let report_obj = object.get("report").expect("report");
479        let report_map = report_obj.as_object().expect("report map");
480        assert!(report_map.contains_key("report_file"));
481    }
482
483    #[test]
484    fn report_file_name_matches_format() {
485        let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
486        assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
487        let name = ReportWriter::report_file_name();
488        assert_eq!(name, "run.json");
489    }
490
491    #[test]
492    fn compute_run_outcome_table() {
493        let (status, code) = compute_run_outcome(&[]);
494        assert_eq!(status, RunStatus::Success);
495        assert_eq!(code, 0);
496
497        let (status, code) = compute_run_outcome(&[FileStatus::Success]);
498        assert_eq!(status, RunStatus::Success);
499        assert_eq!(code, 0);
500
501        let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
502        assert_eq!(status, RunStatus::Rejected);
503        assert_eq!(code, 0);
504
505        let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
506        assert_eq!(status, RunStatus::Aborted);
507        assert_eq!(code, 2);
508
509        let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
510        assert_eq!(status, RunStatus::Failed);
511        assert_eq!(code, 1);
512
513        let (status, code) = compute_run_outcome(&[
514            FileStatus::Success,
515            FileStatus::Rejected,
516            FileStatus::Aborted,
517        ]);
518        assert_eq!(status, RunStatus::Aborted);
519        assert_eq!(code, 2);
520
521        let (status, code) = compute_run_outcome(&[
522            FileStatus::Success,
523            FileStatus::Rejected,
524            FileStatus::Failed,
525        ]);
526        assert_eq!(status, RunStatus::Failed);
527        assert_eq!(code, 1);
528    }
529
530    #[test]
531    fn write_report_writes_json_file() {
532        let report = sample_report();
533        let mut dir = std::env::temp_dir();
534        dir.push(format!("floe-report-tests-{}", unique_suffix()));
535        std::fs::create_dir_all(&dir).expect("create temp dir");
536
537        let report_path = ReportWriter::write_report(
538            &dir,
539            &report.run.run_id,
540            "customer",
541            &report,
542        )
543        .expect("write report");
544
545        assert!(report_path.exists());
546        let expected = dir
547            .join("run_2026-01-19T10-23-45Z")
548            .join("customer")
549            .join("run.json");
550        assert_eq!(report_path, expected);
551        let contents = std::fs::read_to_string(&report_path).expect("read report");
552        let value: serde_json::Value =
553            serde_json::from_str(&contents).expect("parse report");
554        assert_eq!(
555            value
556                .get("run")
557                .and_then(|run| run.get("run_id"))
558                .and_then(|run_id| run_id.as_str()),
559            Some("2026-01-19T10-23-45Z")
560        );
561
562        let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
563            .expect("read dir")
564            .filter_map(|entry| entry.ok())
565            .filter(|entry| {
566                entry
567                    .file_name()
568                    .to_str()
569                    .map(|name| name.contains(".tmp-"))
570                    .unwrap_or(false)
571            })
572            .collect();
573        assert!(temp_files.is_empty());
574    }
575}