1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12 pub spec_version: String,
13 pub entity: EntityEcho,
14 pub source: SourceEcho,
15 pub sink: SinkEcho,
16 pub policy: PolicyEcho,
17 pub results: ResultsTotals,
18 pub files: Vec<FileReport>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23pub struct RunSummaryReport {
24 pub spec_version: String,
25 pub tool: ToolInfo,
26 pub run: RunInfo,
27 pub config: ConfigEcho,
28 pub report: ReportEcho,
29 pub results: ResultsTotals,
30 pub entities: Vec<EntitySummary>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "snake_case")]
35pub struct EntitySummary {
36 pub name: String,
37 pub status: RunStatus,
38 pub results: ResultsTotals,
39 pub report_file: String,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub struct ToolInfo {
45 pub name: String,
46 pub version: String,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub git: Option<GitInfo>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub struct GitInfo {
54 pub commit: String,
55 pub dirty: bool,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub struct RunInfo {
61 pub run_id: String,
62 pub started_at: String,
63 pub finished_at: String,
64 pub duration_ms: u64,
65 pub status: RunStatus,
66 pub exit_code: i32,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub struct ConfigEcho {
72 pub path: String,
73 pub version: String,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub metadata: Option<serde_json::Value>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub struct EntityEcho {
81 pub name: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 pub metadata: Option<serde_json::Value>,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub struct SourceEcho {
89 pub format: String,
90 pub path: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub options: Option<serde_json::Value>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub cast_mode: Option<String>,
95 pub read_plan: SourceReadPlan,
96 pub resolved_inputs: ResolvedInputs,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub struct ResolvedInputs {
102 pub mode: ResolvedInputMode,
103 pub file_count: u64,
104 pub files: Vec<String>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub struct SinkEcho {
110 pub accepted: SinkTargetEcho,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub rejected: Option<SinkTargetEcho>,
113 pub archive: SinkArchiveEcho,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub struct SinkTargetEcho {
119 pub format: String,
120 pub path: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub struct ReportEcho {
126 pub path: String,
127 pub report_file: String,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub struct SinkArchiveEcho {
133 pub enabled: bool,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub path: Option<String>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub struct PolicyEcho {
141 pub severity: Severity,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "snake_case")]
146pub struct ResultsTotals {
147 pub files_total: u64,
148 pub rows_total: u64,
149 pub accepted_total: u64,
150 pub rejected_total: u64,
151 pub warnings_total: u64,
152 pub errors_total: u64,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156#[serde(rename_all = "snake_case")]
157pub struct FileReport {
158 pub input_file: String,
159 pub status: FileStatus,
160 pub row_count: u64,
161 pub accepted_count: u64,
162 pub rejected_count: u64,
163 pub mismatch: FileMismatch,
164 pub output: FileOutput,
165 pub validation: FileValidation,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub struct FileMismatch {
171 pub declared_columns_count: u64,
172 pub input_columns_count: u64,
173 pub missing_columns: Vec<String>,
174 pub extra_columns: Vec<String>,
175 pub mismatch_action: MismatchAction,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub error: Option<MismatchIssue>,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub warning: Option<String>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183#[serde(rename_all = "snake_case")]
184pub struct MismatchIssue {
185 pub rule: String,
186 pub message: String,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190#[serde(rename_all = "snake_case")]
191pub struct FileOutput {
192 pub accepted_path: Option<String>,
193 pub rejected_path: Option<String>,
194 pub errors_path: Option<String>,
195 pub archived_path: Option<String>,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(rename_all = "snake_case")]
200pub struct FileValidation {
201 pub errors: u64,
202 pub warnings: u64,
203 pub rules: Vec<RuleSummary>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207#[serde(rename_all = "snake_case")]
208pub struct RuleSummary {
209 pub rule: RuleName,
210 pub severity: Severity,
211 pub violations: u64,
212 pub columns: Vec<ColumnSummary>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216#[serde(rename_all = "snake_case")]
217pub struct ColumnSummary {
218 pub column: String,
219 pub violations: u64,
220 #[serde(skip_serializing_if = "Option::is_none")]
221 pub target_type: Option<String>,
222}
223
224#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
225#[serde(rename_all = "snake_case")]
226pub enum FileStatus {
227 Success,
228 Rejected,
229 Aborted,
230 Failed,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub enum RunStatus {
236 Success,
237 SuccessWithWarnings,
238 Rejected,
239 Aborted,
240 Failed,
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum Severity {
246 Warn,
247 Reject,
248 Abort,
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252#[serde(rename_all = "snake_case")]
253pub enum RuleName {
254 NotNull,
255 CastError,
256 Unique,
257 SchemaError,
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262pub enum MismatchAction {
263 None,
264 FilledNulls,
265 IgnoredExtras,
266 RejectedFile,
267 Aborted,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
271#[serde(rename_all = "snake_case")]
272pub enum ResolvedInputMode {
273 Directory,
274 File,
275 Glob,
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
279#[serde(rename_all = "snake_case")]
280pub enum SourceReadPlan {
281 RawAndTyped,
282}
283
284#[derive(Debug)]
285pub enum ReportError {
286 Io(std::io::Error),
287 Serialize(serde_json::Error),
288}
289
290impl std::fmt::Display for ReportError {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 match self {
293 ReportError::Io(err) => write!(f, "report io error: {err}"),
294 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
295 }
296 }
297}
298
299impl std::error::Error for ReportError {}
300
301impl From<std::io::Error> for ReportError {
302 fn from(err: std::io::Error) -> Self {
303 Self::Io(err)
304 }
305}
306
307impl From<serde_json::Error> for ReportError {
308 fn from(err: serde_json::Error) -> Self {
309 Self::Serialize(err)
310 }
311}
312
313pub fn now_rfc3339() -> String {
314 OffsetDateTime::now_utc()
315 .format(&Rfc3339)
316 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
317}
318
319pub fn run_id_from_timestamp(timestamp: &str) -> String {
320 timestamp.replace(':', "-")
321}
322
323pub struct ReportWriter;
324
325impl ReportWriter {
326 pub fn run_dir_name(run_id: &str) -> String {
327 format!("run_{run_id}")
328 }
329
330 pub fn report_file_name() -> String {
331 "run.json".to_string()
332 }
333
334 pub fn summary_file_name() -> String {
335 "run.summary.json".to_string()
336 }
337
338 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
339 report_dir
340 .join(Self::run_dir_name(run_id))
341 .join(entity_name)
342 }
343
344 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
345 Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
346 }
347
348 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
349 report_dir
350 .join(Self::run_dir_name(run_id))
351 .join(Self::summary_file_name())
352 }
353
354 pub fn write_report(
355 report_dir: &Path,
356 run_id: &str,
357 entity_name: &str,
358 report: &RunReport,
359 ) -> Result<PathBuf, ReportError> {
360 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
361 std::fs::create_dir_all(&entity_dir)?;
362 let report_path = Self::report_path(report_dir, run_id, entity_name);
363 let tmp_path = entity_dir.join(format!(
364 "{}.tmp-{}",
365 Self::report_file_name(),
366 unique_suffix()
367 ));
368
369 let json = serde_json::to_string_pretty(report)?;
370 let mut file = File::create(&tmp_path)?;
371 file.write_all(json.as_bytes())?;
372 file.sync_all()?;
373 std::fs::rename(&tmp_path, &report_path)?;
374
375 Ok(report_path)
376 }
377
378 pub fn write_summary(
379 report_dir: &Path,
380 run_id: &str,
381 report: &RunSummaryReport,
382 ) -> Result<PathBuf, ReportError> {
383 let run_dir = report_dir.join(Self::run_dir_name(run_id));
384 std::fs::create_dir_all(&run_dir)?;
385 let report_path = Self::summary_path(report_dir, run_id);
386 let tmp_path = run_dir.join(format!(
387 "{}.tmp-{}",
388 Self::summary_file_name(),
389 unique_suffix()
390 ));
391
392 let json = serde_json::to_string_pretty(report)?;
393 let mut file = File::create(&tmp_path)?;
394 file.write_all(json.as_bytes())?;
395 file.sync_all()?;
396 std::fs::rename(&tmp_path, &report_path)?;
397
398 Ok(report_path)
399 }
400}
401
402pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
403 if file_statuses.contains(&FileStatus::Failed) {
404 return (RunStatus::Failed, 1);
405 }
406 if file_statuses.contains(&FileStatus::Aborted) {
407 return (RunStatus::Aborted, 2);
408 }
409 if file_statuses.contains(&FileStatus::Rejected) {
410 return (RunStatus::Rejected, 0);
411 }
412 (RunStatus::Success, 0)
413}
414
415fn unique_suffix() -> String {
416 let nanos = SystemTime::now()
417 .duration_since(UNIX_EPOCH)
418 .map(|duration| duration.as_nanos())
419 .unwrap_or(0);
420 format!("{}-{}", std::process::id(), nanos)
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426
427 fn sample_report() -> RunReport {
428 RunReport {
429 spec_version: "0.1".to_string(),
430 entity: EntityEcho {
431 name: "customer".to_string(),
432 metadata: None,
433 },
434 source: SourceEcho {
435 format: "csv".to_string(),
436 path: "/tmp/input".to_string(),
437 options: None,
438 cast_mode: Some("strict".to_string()),
439 read_plan: SourceReadPlan::RawAndTyped,
440 resolved_inputs: ResolvedInputs {
441 mode: ResolvedInputMode::Directory,
442 file_count: 1,
443 files: vec!["/tmp/input/file.csv".to_string()],
444 },
445 },
446 sink: SinkEcho {
447 accepted: SinkTargetEcho {
448 format: "parquet".to_string(),
449 path: "/tmp/out/accepted".to_string(),
450 },
451 rejected: Some(SinkTargetEcho {
452 format: "csv".to_string(),
453 path: "/tmp/out/rejected".to_string(),
454 }),
455 archive: SinkArchiveEcho {
456 enabled: false,
457 path: None,
458 },
459 },
460 policy: PolicyEcho {
461 severity: Severity::Warn,
462 },
463 results: ResultsTotals {
464 files_total: 1,
465 rows_total: 10,
466 accepted_total: 10,
467 rejected_total: 0,
468 warnings_total: 0,
469 errors_total: 0,
470 },
471 files: vec![FileReport {
472 input_file: "/tmp/input/file.csv".to_string(),
473 status: FileStatus::Success,
474 row_count: 10,
475 accepted_count: 10,
476 rejected_count: 0,
477 mismatch: FileMismatch {
478 declared_columns_count: 1,
479 input_columns_count: 1,
480 missing_columns: Vec::new(),
481 extra_columns: Vec::new(),
482 mismatch_action: MismatchAction::None,
483 error: None,
484 warning: None,
485 },
486 output: FileOutput {
487 accepted_path: Some("/tmp/out/accepted/file.parquet".to_string()),
488 rejected_path: None,
489 errors_path: None,
490 archived_path: None,
491 },
492 validation: FileValidation {
493 errors: 0,
494 warnings: 0,
495 rules: Vec::new(),
496 },
497 }],
498 }
499 }
500
501 fn sample_summary() -> RunSummaryReport {
502 RunSummaryReport {
503 spec_version: "0.1".to_string(),
504 tool: ToolInfo {
505 name: "floe".to_string(),
506 version: env!("CARGO_PKG_VERSION").to_string(),
507 git: None,
508 },
509 run: RunInfo {
510 run_id: "2026-01-19T10-23-45Z".to_string(),
511 started_at: "2026-01-19T10-23-45Z".to_string(),
512 finished_at: "2026-01-19T10-23-46Z".to_string(),
513 duration_ms: 1000,
514 status: RunStatus::Success,
515 exit_code: 0,
516 },
517 config: ConfigEcho {
518 path: "/tmp/config.yml".to_string(),
519 version: "0.1".to_string(),
520 metadata: None,
521 },
522 report: ReportEcho {
523 path: "/tmp/out/reports".to_string(),
524 report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/run.summary.json"
525 .to_string(),
526 },
527 results: ResultsTotals {
528 files_total: 1,
529 rows_total: 10,
530 accepted_total: 10,
531 rejected_total: 0,
532 warnings_total: 0,
533 errors_total: 0,
534 },
535 entities: vec![EntitySummary {
536 name: "customer".to_string(),
537 status: RunStatus::Success,
538 results: ResultsTotals {
539 files_total: 1,
540 rows_total: 10,
541 accepted_total: 10,
542 rejected_total: 0,
543 warnings_total: 0,
544 errors_total: 0,
545 },
546 report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json"
547 .to_string(),
548 }],
549 }
550 }
551
552 #[test]
553 fn report_serializes_expected_keys() {
554 let report = sample_report();
555 let value = serde_json::to_value(&report).expect("serialize report");
556 let object = value.as_object().expect("report object");
557 assert!(object.contains_key("spec_version"));
558 assert!(object.contains_key("entity"));
559 assert!(object.contains_key("source"));
560 assert!(object.contains_key("sink"));
561 assert!(object.contains_key("policy"));
562 assert!(object.contains_key("results"));
563 assert!(object.contains_key("files"));
564 }
565
566 #[test]
567 fn report_file_name_matches_format() {
568 let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
569 assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
570 let name = ReportWriter::report_file_name();
571 assert_eq!(name, "run.json");
572 }
573
574 #[test]
575 fn compute_run_outcome_table() {
576 let (status, code) = compute_run_outcome(&[]);
577 assert_eq!(status, RunStatus::Success);
578 assert_eq!(code, 0);
579
580 let (status, code) = compute_run_outcome(&[FileStatus::Success]);
581 assert_eq!(status, RunStatus::Success);
582 assert_eq!(code, 0);
583
584 let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
585 assert_eq!(status, RunStatus::Rejected);
586 assert_eq!(code, 0);
587
588 let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
589 assert_eq!(status, RunStatus::Aborted);
590 assert_eq!(code, 2);
591
592 let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
593 assert_eq!(status, RunStatus::Failed);
594 assert_eq!(code, 1);
595
596 let (status, code) = compute_run_outcome(&[
597 FileStatus::Success,
598 FileStatus::Rejected,
599 FileStatus::Aborted,
600 ]);
601 assert_eq!(status, RunStatus::Aborted);
602 assert_eq!(code, 2);
603
604 let (status, code) = compute_run_outcome(&[
605 FileStatus::Success,
606 FileStatus::Rejected,
607 FileStatus::Failed,
608 ]);
609 assert_eq!(status, RunStatus::Failed);
610 assert_eq!(code, 1);
611 }
612
613 #[test]
614 fn write_report_writes_json_file() {
615 let report = sample_report();
616 let run_id = "2026-01-19T10-23-45Z";
617 let mut dir = std::env::temp_dir();
618 dir.push(format!("floe-report-tests-{}", unique_suffix()));
619 std::fs::create_dir_all(&dir).expect("create temp dir");
620
621 let report_path =
622 ReportWriter::write_report(&dir, run_id, "customer", &report).expect("write report");
623
624 assert!(report_path.exists());
625 let expected = dir
626 .join(format!("run_{run_id}"))
627 .join("customer")
628 .join("run.json");
629 assert_eq!(report_path, expected);
630 let contents = std::fs::read_to_string(&report_path).expect("read report");
631 let value: serde_json::Value = serde_json::from_str(&contents).expect("parse report");
632 assert!(value.get("entity").is_some());
633
634 let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
635 .expect("read dir")
636 .filter_map(|entry| entry.ok())
637 .filter(|entry| {
638 entry
639 .file_name()
640 .to_str()
641 .map(|name| name.contains(".tmp-"))
642 .unwrap_or(false)
643 })
644 .collect();
645 assert!(temp_files.is_empty());
646 }
647
648 #[test]
649 fn write_summary_writes_json_file() {
650 let summary = sample_summary();
651 let run_id = "2026-01-19T10-23-45Z";
652 let mut dir = std::env::temp_dir();
653 dir.push(format!("floe-summary-tests-{}", unique_suffix()));
654 std::fs::create_dir_all(&dir).expect("create temp dir");
655
656 let report_path =
657 ReportWriter::write_summary(&dir, run_id, &summary).expect("write summary");
658
659 assert!(report_path.exists());
660 let expected = dir.join(format!("run_{run_id}")).join("run.summary.json");
661 assert_eq!(report_path, expected);
662 let contents = std::fs::read_to_string(&report_path).expect("read summary");
663 let value: serde_json::Value = serde_json::from_str(&contents).expect("parse summary");
664 assert!(value.get("run").is_some());
665 }
666}