1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12 pub spec_version: String,
13 pub entity: EntityEcho,
14 pub source: SourceEcho,
15 pub sink: SinkEcho,
16 pub policy: PolicyEcho,
17 pub accepted_output: AcceptedOutputSummary,
18 pub results: ResultsTotals,
19 pub files: Vec<FileReport>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub struct RunSummaryReport {
25 pub spec_version: String,
26 pub tool: ToolInfo,
27 pub run: RunInfo,
28 pub config: ConfigEcho,
29 pub report: ReportEcho,
30 pub results: ResultsTotals,
31 pub entities: Vec<EntitySummary>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub struct EntitySummary {
37 pub name: String,
38 pub status: RunStatus,
39 pub results: ResultsTotals,
40 pub report_file: String,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub struct ToolInfo {
46 pub name: String,
47 pub version: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub git: Option<GitInfo>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub struct GitInfo {
55 pub commit: String,
56 pub dirty: bool,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub struct RunInfo {
62 pub run_id: String,
63 pub started_at: String,
64 pub finished_at: String,
65 pub duration_ms: u64,
66 pub status: RunStatus,
67 pub exit_code: i32,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub struct ConfigEcho {
73 pub path: String,
74 pub version: String,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub metadata: Option<serde_json::Value>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub struct EntityEcho {
82 pub name: String,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub metadata: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub struct SourceEcho {
90 pub format: String,
91 pub path: String,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 pub options: Option<serde_json::Value>,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 pub cast_mode: Option<String>,
96 pub read_plan: SourceReadPlan,
97 pub resolved_inputs: ResolvedInputs,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[serde(rename_all = "snake_case")]
102pub struct ResolvedInputs {
103 pub mode: ResolvedInputMode,
104 pub file_count: u64,
105 pub files: Vec<String>,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "snake_case")]
110pub struct SinkEcho {
111 pub accepted: SinkTargetEcho,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub rejected: Option<SinkTargetEcho>,
114 pub archive: SinkArchiveEcho,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "snake_case")]
119pub struct AcceptedOutputSummary {
120 pub path: String,
121 pub accepted_rows: u64,
122 pub parts_written: u64,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub struct SinkTargetEcho {
128 pub format: String,
129 pub path: String,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "snake_case")]
134pub struct ReportEcho {
135 pub path: String,
136 pub report_file: String,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub struct SinkArchiveEcho {
142 pub enabled: bool,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub path: Option<String>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148#[serde(rename_all = "snake_case")]
149pub struct PolicyEcho {
150 pub severity: Severity,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub struct ResultsTotals {
156 pub files_total: u64,
157 pub rows_total: u64,
158 pub accepted_total: u64,
159 pub rejected_total: u64,
160 pub warnings_total: u64,
161 pub errors_total: u64,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub struct FileReport {
167 pub input_file: String,
168 pub status: FileStatus,
169 pub row_count: u64,
170 pub accepted_count: u64,
171 pub rejected_count: u64,
172 pub mismatch: FileMismatch,
173 pub output: FileOutput,
174 pub validation: FileValidation,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[serde(rename_all = "snake_case")]
179pub struct FileMismatch {
180 pub declared_columns_count: u64,
181 pub input_columns_count: u64,
182 pub missing_columns: Vec<String>,
183 pub extra_columns: Vec<String>,
184 pub mismatch_action: MismatchAction,
185 #[serde(skip_serializing_if = "Option::is_none")]
186 pub error: Option<MismatchIssue>,
187 #[serde(skip_serializing_if = "Option::is_none")]
188 pub warning: Option<String>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192#[serde(rename_all = "snake_case")]
193pub struct MismatchIssue {
194 pub rule: String,
195 pub message: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(rename_all = "snake_case")]
200pub struct FileOutput {
201 pub accepted_path: Option<String>,
202 pub rejected_path: Option<String>,
203 pub errors_path: Option<String>,
204 pub archived_path: Option<String>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208#[serde(rename_all = "snake_case")]
209pub struct FileValidation {
210 pub errors: u64,
211 pub warnings: u64,
212 pub rules: Vec<RuleSummary>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217pub struct RuleSummary {
218 pub rule: RuleName,
219 pub severity: Severity,
220 pub violations: u64,
221 pub columns: Vec<ColumnSummary>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub struct ColumnSummary {
227 pub column: String,
228 pub violations: u64,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub target_type: Option<String>,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub enum FileStatus {
236 Success,
237 Rejected,
238 Aborted,
239 Failed,
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
243#[serde(rename_all = "snake_case")]
244pub enum RunStatus {
245 Success,
246 SuccessWithWarnings,
247 Rejected,
248 Aborted,
249 Failed,
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum Severity {
255 Warn,
256 Reject,
257 Abort,
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262pub enum RuleName {
263 NotNull,
264 CastError,
265 Unique,
266 SchemaError,
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
270#[serde(rename_all = "snake_case")]
271pub enum MismatchAction {
272 None,
273 FilledNulls,
274 IgnoredExtras,
275 RejectedFile,
276 Aborted,
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(rename_all = "snake_case")]
281pub enum ResolvedInputMode {
282 Directory,
283 File,
284 Glob,
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
288#[serde(rename_all = "snake_case")]
289pub enum SourceReadPlan {
290 RawAndTyped,
291}
292
293#[derive(Debug)]
294pub enum ReportError {
295 Io(std::io::Error),
296 Serialize(serde_json::Error),
297}
298
299impl std::fmt::Display for ReportError {
300 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 match self {
302 ReportError::Io(err) => write!(f, "report io error: {err}"),
303 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
304 }
305 }
306}
307
308impl std::error::Error for ReportError {}
309
310impl From<std::io::Error> for ReportError {
311 fn from(err: std::io::Error) -> Self {
312 Self::Io(err)
313 }
314}
315
316impl From<serde_json::Error> for ReportError {
317 fn from(err: serde_json::Error) -> Self {
318 Self::Serialize(err)
319 }
320}
321
322pub fn now_rfc3339() -> String {
323 OffsetDateTime::now_utc()
324 .format(&Rfc3339)
325 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
326}
327
328pub fn run_id_from_timestamp(timestamp: &str) -> String {
329 timestamp.replace(':', "-")
330}
331
332pub struct ReportWriter;
333
334impl ReportWriter {
335 pub fn run_dir_name(run_id: &str) -> String {
336 format!("run_{run_id}")
337 }
338
339 pub fn report_file_name() -> String {
340 "run.json".to_string()
341 }
342
343 pub fn summary_file_name() -> String {
344 "run.summary.json".to_string()
345 }
346
347 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
348 report_dir
349 .join(Self::run_dir_name(run_id))
350 .join(entity_name)
351 }
352
353 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
354 Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
355 }
356
357 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
358 report_dir
359 .join(Self::run_dir_name(run_id))
360 .join(Self::summary_file_name())
361 }
362
363 pub fn write_report(
364 report_dir: &Path,
365 run_id: &str,
366 entity_name: &str,
367 report: &RunReport,
368 ) -> Result<PathBuf, ReportError> {
369 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
370 std::fs::create_dir_all(&entity_dir)?;
371 let report_path = Self::report_path(report_dir, run_id, entity_name);
372 let tmp_path = entity_dir.join(format!(
373 "{}.tmp-{}",
374 Self::report_file_name(),
375 unique_suffix()
376 ));
377
378 let json = serde_json::to_string_pretty(report)?;
379 let mut file = File::create(&tmp_path)?;
380 file.write_all(json.as_bytes())?;
381 file.sync_all()?;
382 std::fs::rename(&tmp_path, &report_path)?;
383
384 Ok(report_path)
385 }
386
387 pub fn write_summary(
388 report_dir: &Path,
389 run_id: &str,
390 report: &RunSummaryReport,
391 ) -> Result<PathBuf, ReportError> {
392 let run_dir = report_dir.join(Self::run_dir_name(run_id));
393 std::fs::create_dir_all(&run_dir)?;
394 let report_path = Self::summary_path(report_dir, run_id);
395 let tmp_path = run_dir.join(format!(
396 "{}.tmp-{}",
397 Self::summary_file_name(),
398 unique_suffix()
399 ));
400
401 let json = serde_json::to_string_pretty(report)?;
402 let mut file = File::create(&tmp_path)?;
403 file.write_all(json.as_bytes())?;
404 file.sync_all()?;
405 std::fs::rename(&tmp_path, &report_path)?;
406
407 Ok(report_path)
408 }
409}
410
411pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
412 if file_statuses.contains(&FileStatus::Failed) {
413 return (RunStatus::Failed, 1);
414 }
415 if file_statuses.contains(&FileStatus::Aborted) {
416 return (RunStatus::Aborted, 2);
417 }
418 if file_statuses.contains(&FileStatus::Rejected) {
419 return (RunStatus::Rejected, 0);
420 }
421 (RunStatus::Success, 0)
422}
423
424fn unique_suffix() -> String {
425 let nanos = SystemTime::now()
426 .duration_since(UNIX_EPOCH)
427 .map(|duration| duration.as_nanos())
428 .unwrap_or(0);
429 format!("{}-{}", std::process::id(), nanos)
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435
436 fn sample_report() -> RunReport {
437 RunReport {
438 spec_version: "0.1".to_string(),
439 entity: EntityEcho {
440 name: "customer".to_string(),
441 metadata: None,
442 },
443 source: SourceEcho {
444 format: "csv".to_string(),
445 path: "/tmp/input".to_string(),
446 options: None,
447 cast_mode: Some("strict".to_string()),
448 read_plan: SourceReadPlan::RawAndTyped,
449 resolved_inputs: ResolvedInputs {
450 mode: ResolvedInputMode::Directory,
451 file_count: 1,
452 files: vec!["/tmp/input/file.csv".to_string()],
453 },
454 },
455 sink: SinkEcho {
456 accepted: SinkTargetEcho {
457 format: "parquet".to_string(),
458 path: "/tmp/out/accepted".to_string(),
459 },
460 rejected: Some(SinkTargetEcho {
461 format: "csv".to_string(),
462 path: "/tmp/out/rejected".to_string(),
463 }),
464 archive: SinkArchiveEcho {
465 enabled: false,
466 path: None,
467 },
468 },
469 policy: PolicyEcho {
470 severity: Severity::Warn,
471 },
472 accepted_output: AcceptedOutputSummary {
473 path: "/tmp/out/accepted".to_string(),
474 accepted_rows: 10,
475 parts_written: 1,
476 },
477 results: ResultsTotals {
478 files_total: 1,
479 rows_total: 10,
480 accepted_total: 10,
481 rejected_total: 0,
482 warnings_total: 0,
483 errors_total: 0,
484 },
485 files: vec![FileReport {
486 input_file: "/tmp/input/file.csv".to_string(),
487 status: FileStatus::Success,
488 row_count: 10,
489 accepted_count: 10,
490 rejected_count: 0,
491 mismatch: FileMismatch {
492 declared_columns_count: 1,
493 input_columns_count: 1,
494 missing_columns: Vec::new(),
495 extra_columns: Vec::new(),
496 mismatch_action: MismatchAction::None,
497 error: None,
498 warning: None,
499 },
500 output: FileOutput {
501 accepted_path: Some("/tmp/out/accepted".to_string()),
502 rejected_path: None,
503 errors_path: None,
504 archived_path: None,
505 },
506 validation: FileValidation {
507 errors: 0,
508 warnings: 0,
509 rules: Vec::new(),
510 },
511 }],
512 }
513 }
514
515 fn sample_summary() -> RunSummaryReport {
516 RunSummaryReport {
517 spec_version: "0.1".to_string(),
518 tool: ToolInfo {
519 name: "floe".to_string(),
520 version: env!("CARGO_PKG_VERSION").to_string(),
521 git: None,
522 },
523 run: RunInfo {
524 run_id: "2026-01-19T10-23-45Z".to_string(),
525 started_at: "2026-01-19T10-23-45Z".to_string(),
526 finished_at: "2026-01-19T10-23-46Z".to_string(),
527 duration_ms: 1000,
528 status: RunStatus::Success,
529 exit_code: 0,
530 },
531 config: ConfigEcho {
532 path: "/tmp/config.yml".to_string(),
533 version: "0.1".to_string(),
534 metadata: None,
535 },
536 report: ReportEcho {
537 path: "/tmp/out/reports".to_string(),
538 report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/run.summary.json"
539 .to_string(),
540 },
541 results: ResultsTotals {
542 files_total: 1,
543 rows_total: 10,
544 accepted_total: 10,
545 rejected_total: 0,
546 warnings_total: 0,
547 errors_total: 0,
548 },
549 entities: vec![EntitySummary {
550 name: "customer".to_string(),
551 status: RunStatus::Success,
552 results: ResultsTotals {
553 files_total: 1,
554 rows_total: 10,
555 accepted_total: 10,
556 rejected_total: 0,
557 warnings_total: 0,
558 errors_total: 0,
559 },
560 report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json"
561 .to_string(),
562 }],
563 }
564 }
565
566 #[test]
567 fn report_serializes_expected_keys() {
568 let report = sample_report();
569 let value = serde_json::to_value(&report).expect("serialize report");
570 let object = value.as_object().expect("report object");
571 assert!(object.contains_key("spec_version"));
572 assert!(object.contains_key("entity"));
573 assert!(object.contains_key("source"));
574 assert!(object.contains_key("sink"));
575 assert!(object.contains_key("policy"));
576 assert!(object.contains_key("accepted_output"));
577 assert!(object.contains_key("results"));
578 assert!(object.contains_key("files"));
579 }
580
581 #[test]
582 fn report_file_name_matches_format() {
583 let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
584 assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
585 let name = ReportWriter::report_file_name();
586 assert_eq!(name, "run.json");
587 }
588
589 #[test]
590 fn compute_run_outcome_table() {
591 let (status, code) = compute_run_outcome(&[]);
592 assert_eq!(status, RunStatus::Success);
593 assert_eq!(code, 0);
594
595 let (status, code) = compute_run_outcome(&[FileStatus::Success]);
596 assert_eq!(status, RunStatus::Success);
597 assert_eq!(code, 0);
598
599 let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
600 assert_eq!(status, RunStatus::Rejected);
601 assert_eq!(code, 0);
602
603 let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
604 assert_eq!(status, RunStatus::Aborted);
605 assert_eq!(code, 2);
606
607 let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
608 assert_eq!(status, RunStatus::Failed);
609 assert_eq!(code, 1);
610
611 let (status, code) = compute_run_outcome(&[
612 FileStatus::Success,
613 FileStatus::Rejected,
614 FileStatus::Aborted,
615 ]);
616 assert_eq!(status, RunStatus::Aborted);
617 assert_eq!(code, 2);
618
619 let (status, code) = compute_run_outcome(&[
620 FileStatus::Success,
621 FileStatus::Rejected,
622 FileStatus::Failed,
623 ]);
624 assert_eq!(status, RunStatus::Failed);
625 assert_eq!(code, 1);
626 }
627
628 #[test]
629 fn write_report_writes_json_file() {
630 let report = sample_report();
631 let run_id = "2026-01-19T10-23-45Z";
632 let mut dir = std::env::temp_dir();
633 dir.push(format!("floe-report-tests-{}", unique_suffix()));
634 std::fs::create_dir_all(&dir).expect("create temp dir");
635
636 let report_path =
637 ReportWriter::write_report(&dir, run_id, "customer", &report).expect("write report");
638
639 assert!(report_path.exists());
640 let expected = dir
641 .join(format!("run_{run_id}"))
642 .join("customer")
643 .join("run.json");
644 assert_eq!(report_path, expected);
645 let contents = std::fs::read_to_string(&report_path).expect("read report");
646 let value: serde_json::Value = serde_json::from_str(&contents).expect("parse report");
647 assert!(value.get("entity").is_some());
648
649 let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
650 .expect("read dir")
651 .filter_map(|entry| entry.ok())
652 .filter(|entry| {
653 entry
654 .file_name()
655 .to_str()
656 .map(|name| name.contains(".tmp-"))
657 .unwrap_or(false)
658 })
659 .collect();
660 assert!(temp_files.is_empty());
661 }
662
663 #[test]
664 fn write_summary_writes_json_file() {
665 let summary = sample_summary();
666 let run_id = "2026-01-19T10-23-45Z";
667 let mut dir = std::env::temp_dir();
668 dir.push(format!("floe-summary-tests-{}", unique_suffix()));
669 std::fs::create_dir_all(&dir).expect("create temp dir");
670
671 let report_path =
672 ReportWriter::write_summary(&dir, run_id, &summary).expect("write summary");
673
674 assert!(report_path.exists());
675 let expected = dir.join(format!("run_{run_id}")).join("run.summary.json");
676 assert_eq!(report_path, expected);
677 let contents = std::fs::read_to_string(&report_path).expect("read summary");
678 let value: serde_json::Value = serde_json::from_str(&contents).expect("parse summary");
679 assert!(value.get("run").is_some());
680 }
681}