1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10#[serde(rename_all = "snake_case")]
11pub struct RunReport {
12 pub spec_version: String,
13 pub tool: ToolInfo,
14 pub run: RunInfo,
15 pub config: ConfigEcho,
16 pub entity: EntityEcho,
17 pub source: SourceEcho,
18 pub sink: SinkEcho,
19 pub report: ReportEcho,
20 pub policy: PolicyEcho,
21 pub results: ResultsTotals,
22 pub files: Vec<FileReport>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub struct ToolInfo {
28 pub name: String,
29 pub version: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub git: Option<GitInfo>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub struct GitInfo {
37 pub commit: String,
38 pub dirty: bool,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub struct RunInfo {
44 pub run_id: String,
45 pub started_at: String,
46 pub finished_at: String,
47 pub duration_ms: u64,
48 pub status: RunStatus,
49 pub exit_code: i32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "snake_case")]
54pub struct ConfigEcho {
55 pub path: String,
56 pub version: String,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub metadata: Option<serde_json::Value>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub struct EntityEcho {
64 pub name: String,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub metadata: Option<serde_json::Value>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub struct SourceEcho {
72 pub format: String,
73 pub path: String,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub options: Option<serde_json::Value>,
76 #[serde(skip_serializing_if = "Option::is_none")]
77 pub cast_mode: Option<String>,
78 pub read_plan: SourceReadPlan,
79 pub resolved_inputs: ResolvedInputs,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83#[serde(rename_all = "snake_case")]
84pub struct ResolvedInputs {
85 pub mode: ResolvedInputMode,
86 pub file_count: u64,
87 pub files: Vec<String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub struct SinkEcho {
93 pub accepted: SinkTargetEcho,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 pub rejected: Option<SinkTargetEcho>,
96 pub archive: SinkArchiveEcho,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub struct SinkTargetEcho {
102 pub format: String,
103 pub path: String,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "snake_case")]
108pub struct ReportEcho {
109 pub path: String,
110 pub report_file: String,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114#[serde(rename_all = "snake_case")]
115pub struct SinkArchiveEcho {
116 pub enabled: bool,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub path: Option<String>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct PolicyEcho {
124 pub severity: Severity,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "snake_case")]
129pub struct ResultsTotals {
130 pub files_total: u64,
131 pub rows_total: u64,
132 pub accepted_total: u64,
133 pub rejected_total: u64,
134 pub warnings_total: u64,
135 pub errors_total: u64,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(rename_all = "snake_case")]
140pub struct FileReport {
141 pub input_file: String,
142 pub status: FileStatus,
143 pub row_count: u64,
144 pub accepted_count: u64,
145 pub rejected_count: u64,
146 pub output: FileOutput,
147 pub validation: FileValidation,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151#[serde(rename_all = "snake_case")]
152pub struct FileOutput {
153 pub accepted_path: Option<String>,
154 pub rejected_path: Option<String>,
155 pub errors_path: Option<String>,
156 pub archived_path: Option<String>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(rename_all = "snake_case")]
161pub struct FileValidation {
162 pub errors: u64,
163 pub warnings: u64,
164 pub rules: Vec<RuleSummary>,
165 pub examples: ExampleSummary,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub struct RuleSummary {
171 pub rule: RuleName,
172 pub severity: Severity,
173 pub violations: u64,
174 pub columns: Vec<ColumnSummary>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178#[serde(rename_all = "snake_case")]
179pub struct ColumnSummary {
180 pub column: String,
181 pub violations: u64,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub target_type: Option<String>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187#[serde(rename_all = "snake_case")]
188pub struct ExampleSummary {
189 pub max_examples_per_rule: u64,
190 pub items: Vec<ValidationExample>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194#[serde(rename_all = "snake_case")]
195pub struct ValidationExample {
196 pub rule: RuleName,
197 pub column: String,
198 pub row_index: u64,
199 pub message: String,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
203#[serde(rename_all = "snake_case")]
204pub enum FileStatus {
205 Success,
206 Rejected,
207 Aborted,
208 Failed,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
212#[serde(rename_all = "snake_case")]
213pub enum RunStatus {
214 Success,
215 SuccessWithWarnings,
216 Rejected,
217 Aborted,
218 Failed,
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
222#[serde(rename_all = "snake_case")]
223pub enum Severity {
224 Warn,
225 Reject,
226 Abort,
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
230#[serde(rename_all = "snake_case")]
231pub enum RuleName {
232 NotNull,
233 CastError,
234 Unique,
235 SchemaError,
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
239#[serde(rename_all = "snake_case")]
240pub enum ResolvedInputMode {
241 Directory,
242 File,
243 Glob,
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
247#[serde(rename_all = "snake_case")]
248pub enum SourceReadPlan {
249 RawAndTyped,
250}
251
252#[derive(Debug)]
253pub enum ReportError {
254 Io(std::io::Error),
255 Serialize(serde_json::Error),
256}
257
258impl std::fmt::Display for ReportError {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 match self {
261 ReportError::Io(err) => write!(f, "report io error: {err}"),
262 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
263 }
264 }
265}
266
267impl std::error::Error for ReportError {}
268
269impl From<std::io::Error> for ReportError {
270 fn from(err: std::io::Error) -> Self {
271 Self::Io(err)
272 }
273}
274
275impl From<serde_json::Error> for ReportError {
276 fn from(err: serde_json::Error) -> Self {
277 Self::Serialize(err)
278 }
279}
280
281pub fn now_rfc3339() -> String {
282 OffsetDateTime::now_utc()
283 .format(&Rfc3339)
284 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
285}
286
287pub fn run_id_from_timestamp(timestamp: &str) -> String {
288 timestamp.replace(':', "-")
289}
290
291pub struct ReportWriter;
292
293impl ReportWriter {
294 pub fn run_dir_name(run_id: &str) -> String {
295 format!("run_{run_id}")
296 }
297
298 pub fn report_file_name() -> String {
299 "run.json".to_string()
300 }
301
302 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
303 report_dir
304 .join(Self::run_dir_name(run_id))
305 .join(entity_name)
306 }
307
308 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
309 Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
310 }
311
312 pub fn write_report(
313 report_dir: &Path,
314 run_id: &str,
315 entity_name: &str,
316 report: &RunReport,
317 ) -> Result<PathBuf, ReportError> {
318 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
319 std::fs::create_dir_all(&entity_dir)?;
320 let report_path = Self::report_path(report_dir, run_id, entity_name);
321 let tmp_path = entity_dir.join(format!(
322 "{}.tmp-{}",
323 Self::report_file_name(),
324 unique_suffix()
325 ));
326
327 let json = serde_json::to_string_pretty(report)?;
328 let mut file = File::create(&tmp_path)?;
329 file.write_all(json.as_bytes())?;
330 file.sync_all()?;
331 std::fs::rename(&tmp_path, &report_path)?;
332
333 Ok(report_path)
334 }
335}
336
337pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
338 if file_statuses.contains(&FileStatus::Failed) {
339 return (RunStatus::Failed, 1);
340 }
341 if file_statuses.contains(&FileStatus::Aborted) {
342 return (RunStatus::Aborted, 2);
343 }
344 if file_statuses.contains(&FileStatus::Rejected) {
345 return (RunStatus::Rejected, 0);
346 }
347 (RunStatus::Success, 0)
348}
349
350fn unique_suffix() -> String {
351 let nanos = SystemTime::now()
352 .duration_since(UNIX_EPOCH)
353 .map(|duration| duration.as_nanos())
354 .unwrap_or(0);
355 format!("{}-{}", std::process::id(), nanos)
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 fn sample_report() -> RunReport {
363 RunReport {
364 spec_version: "0.1".to_string(),
365 tool: ToolInfo {
366 name: "floe".to_string(),
367 version: env!("CARGO_PKG_VERSION").to_string(),
368 git: None,
369 },
370 run: RunInfo {
371 run_id: "2026-01-19T10-23-45Z".to_string(),
372 started_at: "2026-01-19T10-23-45Z".to_string(),
373 finished_at: "2026-01-19T10-23-46Z".to_string(),
374 duration_ms: 1000,
375 status: RunStatus::Success,
376 exit_code: 0,
377 },
378 config: ConfigEcho {
379 path: "/tmp/config.yml".to_string(),
380 version: "0.1".to_string(),
381 metadata: None,
382 },
383 entity: EntityEcho {
384 name: "customer".to_string(),
385 metadata: None,
386 },
387 source: SourceEcho {
388 format: "csv".to_string(),
389 path: "/tmp/input".to_string(),
390 options: None,
391 cast_mode: Some("strict".to_string()),
392 read_plan: SourceReadPlan::RawAndTyped,
393 resolved_inputs: ResolvedInputs {
394 mode: ResolvedInputMode::Directory,
395 file_count: 1,
396 files: vec!["/tmp/input/file.csv".to_string()],
397 },
398 },
399 sink: SinkEcho {
400 accepted: SinkTargetEcho {
401 format: "parquet".to_string(),
402 path: "/tmp/out/accepted".to_string(),
403 },
404 rejected: Some(SinkTargetEcho {
405 format: "csv".to_string(),
406 path: "/tmp/out/rejected".to_string(),
407 }),
408 archive: SinkArchiveEcho {
409 enabled: false,
410 path: None,
411 },
412 },
413 report: ReportEcho {
414 path: "/tmp/out/reports".to_string(),
415 report_file: "/tmp/out/reports/run_2026-01-19T10-23-45Z/customer/run.json"
416 .to_string(),
417 },
418 policy: PolicyEcho {
419 severity: Severity::Warn,
420 },
421 results: ResultsTotals {
422 files_total: 1,
423 rows_total: 10,
424 accepted_total: 10,
425 rejected_total: 0,
426 warnings_total: 0,
427 errors_total: 0,
428 },
429 files: vec![FileReport {
430 input_file: "/tmp/input/file.csv".to_string(),
431 status: FileStatus::Success,
432 row_count: 10,
433 accepted_count: 10,
434 rejected_count: 0,
435 output: FileOutput {
436 accepted_path: Some("/tmp/out/accepted/file.parquet".to_string()),
437 rejected_path: None,
438 errors_path: None,
439 archived_path: None,
440 },
441 validation: FileValidation {
442 errors: 0,
443 warnings: 0,
444 rules: Vec::new(),
445 examples: ExampleSummary {
446 max_examples_per_rule: 3,
447 items: Vec::new(),
448 },
449 },
450 }],
451 }
452 }
453
454 #[test]
455 fn report_serializes_expected_keys() {
456 let report = sample_report();
457 let value = serde_json::to_value(&report).expect("serialize report");
458 let object = value.as_object().expect("report object");
459 assert!(object.contains_key("spec_version"));
460 assert!(object.contains_key("tool"));
461 assert!(object.contains_key("run"));
462 assert!(object.contains_key("config"));
463 assert!(object.contains_key("entity"));
464 assert!(object.contains_key("source"));
465 assert!(object.contains_key("sink"));
466 assert!(object.contains_key("report"));
467 assert!(object.contains_key("policy"));
468 assert!(object.contains_key("results"));
469 assert!(object.contains_key("files"));
470
471 let report_obj = object.get("report").expect("report");
472 let report_map = report_obj.as_object().expect("report map");
473 assert!(report_map.contains_key("report_file"));
474 }
475
476 #[test]
477 fn report_file_name_matches_format() {
478 let run_dir = ReportWriter::run_dir_name("2026-01-19T10-23-45Z");
479 assert_eq!(run_dir, "run_2026-01-19T10-23-45Z");
480 let name = ReportWriter::report_file_name();
481 assert_eq!(name, "run.json");
482 }
483
484 #[test]
485 fn compute_run_outcome_table() {
486 let (status, code) = compute_run_outcome(&[]);
487 assert_eq!(status, RunStatus::Success);
488 assert_eq!(code, 0);
489
490 let (status, code) = compute_run_outcome(&[FileStatus::Success]);
491 assert_eq!(status, RunStatus::Success);
492 assert_eq!(code, 0);
493
494 let (status, code) = compute_run_outcome(&[FileStatus::Rejected]);
495 assert_eq!(status, RunStatus::Rejected);
496 assert_eq!(code, 0);
497
498 let (status, code) = compute_run_outcome(&[FileStatus::Aborted]);
499 assert_eq!(status, RunStatus::Aborted);
500 assert_eq!(code, 2);
501
502 let (status, code) = compute_run_outcome(&[FileStatus::Failed]);
503 assert_eq!(status, RunStatus::Failed);
504 assert_eq!(code, 1);
505
506 let (status, code) = compute_run_outcome(&[
507 FileStatus::Success,
508 FileStatus::Rejected,
509 FileStatus::Aborted,
510 ]);
511 assert_eq!(status, RunStatus::Aborted);
512 assert_eq!(code, 2);
513
514 let (status, code) = compute_run_outcome(&[
515 FileStatus::Success,
516 FileStatus::Rejected,
517 FileStatus::Failed,
518 ]);
519 assert_eq!(status, RunStatus::Failed);
520 assert_eq!(code, 1);
521 }
522
523 #[test]
524 fn write_report_writes_json_file() {
525 let report = sample_report();
526 let mut dir = std::env::temp_dir();
527 dir.push(format!("floe-report-tests-{}", unique_suffix()));
528 std::fs::create_dir_all(&dir).expect("create temp dir");
529
530 let report_path = ReportWriter::write_report(&dir, &report.run.run_id, "customer", &report)
531 .expect("write report");
532
533 assert!(report_path.exists());
534 let expected = dir
535 .join("run_2026-01-19T10-23-45Z")
536 .join("customer")
537 .join("run.json");
538 assert_eq!(report_path, expected);
539 let contents = std::fs::read_to_string(&report_path).expect("read report");
540 let value: serde_json::Value = serde_json::from_str(&contents).expect("parse report");
541 assert_eq!(
542 value
543 .get("run")
544 .and_then(|run| run.get("run_id"))
545 .and_then(|run_id| run_id.as_str()),
546 Some("2026-01-19T10-23-45Z")
547 );
548
549 let temp_files: Vec<_> = std::fs::read_dir(expected.parent().expect("entity dir"))
550 .expect("read dir")
551 .filter_map(|entry| entry.ok())
552 .filter(|entry| {
553 entry
554 .file_name()
555 .to_str()
556 .map(|name| name.contains(".tmp-"))
557 .unwrap_or(false)
558 })
559 .collect();
560 assert!(temp_files.is_empty());
561 }
562}