1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16 pub spec_version: String,
17 pub entity: EntityEcho,
18 pub source: SourceEcho,
19 pub sink: SinkEcho,
20 pub policy: PolicyEcho,
21 pub accepted_output: AcceptedOutputSummary,
22 pub results: ResultsTotals,
23 pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29 pub spec_version: String,
30 pub tool: ToolInfo,
31 pub run: RunInfo,
32 pub config: ConfigEcho,
33 pub report: ReportEcho,
34 pub results: ResultsTotals,
35 pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41 pub name: String,
42 pub status: RunStatus,
43 pub results: ResultsTotals,
44 pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50 pub name: String,
51 pub version: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59 pub commit: String,
60 pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66 pub run_id: String,
67 pub started_at: String,
68 pub finished_at: String,
69 pub duration_ms: u64,
70 pub status: RunStatus,
71 pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77 pub path: String,
78 pub version: String,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86 pub name: String,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94 pub format: String,
95 pub path: String,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub options: Option<serde_json::Value>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub cast_mode: Option<String>,
100 pub read_plan: SourceReadPlan,
101 pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107 pub mode: ResolvedInputMode,
108 pub file_count: u64,
109 pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115 pub accepted: SinkTargetEcho,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub rejected: Option<SinkTargetEcho>,
118 pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124 pub path: String,
125 #[serde(default)]
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub table_root_uri: Option<String>,
128 #[serde(default)]
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub write_mode: Option<String>,
131 pub accepted_rows: u64,
132 #[serde(default)]
133 pub files_written: u64,
134 pub parts_written: u64,
135 #[serde(default)]
136 #[serde(skip_serializing_if = "Vec::is_empty")]
137 pub part_files: Vec<String>,
138 #[serde(default)]
139 #[serde(skip_serializing_if = "Option::is_none")]
140 pub table_version: Option<i64>,
141 #[serde(default)]
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub snapshot_id: Option<i64>,
144 #[serde(default)]
145 #[serde(skip_serializing_if = "Option::is_none")]
146 pub iceberg_catalog_name: Option<String>,
147 #[serde(default)]
148 #[serde(skip_serializing_if = "Option::is_none")]
149 pub iceberg_database: Option<String>,
150 #[serde(default)]
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub iceberg_namespace: Option<String>,
153 #[serde(default)]
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub iceberg_table: Option<String>,
156 #[serde(default)]
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub total_bytes_written: Option<u64>,
159 #[serde(default)]
160 #[serde(skip_serializing_if = "Option::is_none")]
161 pub avg_file_size_mb: Option<f64>,
162 #[serde(default)]
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub small_files_count: Option<u64>,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
168#[serde(rename_all = "snake_case")]
169pub struct SinkTargetEcho {
170 pub format: String,
171 pub path: String,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct ReportEcho {
177 pub path: String,
178 pub report_file: String,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
182#[serde(rename_all = "snake_case")]
183pub struct SinkArchiveEcho {
184 pub enabled: bool,
185 #[serde(skip_serializing_if = "Option::is_none")]
186 pub path: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190#[serde(rename_all = "snake_case")]
191pub struct PolicyEcho {
192 pub severity: Severity,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196#[serde(rename_all = "snake_case")]
197pub struct ResultsTotals {
198 pub files_total: u64,
199 pub rows_total: u64,
200 pub accepted_total: u64,
201 pub rejected_total: u64,
202 pub warnings_total: u64,
203 pub errors_total: u64,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
207#[serde(rename_all = "snake_case")]
208pub struct FileReport {
209 pub input_file: String,
210 pub status: FileStatus,
211 pub row_count: u64,
212 pub accepted_count: u64,
213 pub rejected_count: u64,
214 pub mismatch: FileMismatch,
215 pub output: FileOutput,
216 pub validation: FileValidation,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220#[serde(rename_all = "snake_case")]
221pub struct FileMismatch {
222 pub declared_columns_count: u64,
223 pub input_columns_count: u64,
224 pub missing_columns: Vec<String>,
225 pub extra_columns: Vec<String>,
226 pub mismatch_action: MismatchAction,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub error: Option<MismatchIssue>,
229 #[serde(skip_serializing_if = "Option::is_none")]
230 pub warning: Option<String>,
231}
232
233#[derive(Debug, Clone, Serialize, Deserialize)]
234#[serde(rename_all = "snake_case")]
235pub struct MismatchIssue {
236 pub rule: String,
237 pub message: String,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
241#[serde(rename_all = "snake_case")]
242pub struct FileOutput {
243 pub accepted_path: Option<String>,
244 pub rejected_path: Option<String>,
245 pub errors_path: Option<String>,
246 pub archived_path: Option<String>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
250#[serde(rename_all = "snake_case")]
251pub struct FileValidation {
252 pub errors: u64,
253 pub warnings: u64,
254 pub rules: Vec<RuleSummary>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258#[serde(rename_all = "snake_case")]
259pub struct RuleSummary {
260 pub rule: RuleName,
261 pub severity: Severity,
262 pub violations: u64,
263 pub columns: Vec<ColumnSummary>,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
267#[serde(rename_all = "snake_case")]
268pub struct ColumnSummary {
269 pub column: String,
270 pub violations: u64,
271 #[serde(skip_serializing_if = "Option::is_none")]
272 pub target_type: Option<String>,
273 #[serde(skip_serializing_if = "Option::is_none")]
274 pub source: Option<String>,
275}
276
277#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
278#[serde(rename_all = "snake_case")]
279pub enum FileStatus {
280 Success,
281 Rejected,
282 Aborted,
283 Failed,
284}
285
286#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
287#[serde(rename_all = "snake_case")]
288pub enum RunStatus {
289 Success,
290 SuccessWithWarnings,
291 Rejected,
292 Aborted,
293 Failed,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
297#[serde(rename_all = "snake_case")]
298pub enum Severity {
299 Warn,
300 Reject,
301 Abort,
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(rename_all = "snake_case")]
306pub enum RuleName {
307 NotNull,
308 CastError,
309 Unique,
310 SchemaError,
311}
312
313#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
314#[serde(rename_all = "snake_case")]
315pub enum MismatchAction {
316 None,
317 FilledNulls,
318 IgnoredExtras,
319 RejectedFile,
320 Aborted,
321}
322
323#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
324#[serde(rename_all = "snake_case")]
325pub enum ResolvedInputMode {
326 Directory,
327 File,
328 Glob,
329}
330
331#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
332#[serde(rename_all = "snake_case")]
333pub enum SourceReadPlan {
334 RawAndTyped,
335}
336
337#[derive(Debug)]
338pub enum ReportError {
339 Io(std::io::Error),
340 Serialize(serde_json::Error),
341}
342
343impl std::fmt::Display for ReportError {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 match self {
346 ReportError::Io(err) => write!(f, "report io error: {err}"),
347 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
348 }
349 }
350}
351
352impl std::error::Error for ReportError {}
353
354impl From<std::io::Error> for ReportError {
355 fn from(err: std::io::Error) -> Self {
356 Self::Io(err)
357 }
358}
359
360impl From<serde_json::Error> for ReportError {
361 fn from(err: serde_json::Error) -> Self {
362 Self::Serialize(err)
363 }
364}
365
366pub trait ReportFormatter {
367 fn format_name(&self) -> &'static str;
368 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
369 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
370}
371
372pub struct JsonReportFormatter;
373
374impl ReportFormatter for JsonReportFormatter {
375 fn format_name(&self) -> &'static str {
376 "json"
377 }
378
379 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
380 Ok(serde_json::to_string_pretty(report)?)
381 }
382
383 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
384 Ok(serde_json::to_string_pretty(report)?)
385 }
386}
387
388pub fn now_rfc3339() -> String {
389 OffsetDateTime::now_utc()
390 .format(&Rfc3339)
391 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
392}
393
394pub fn run_id_from_timestamp(timestamp: &str) -> String {
395 timestamp.replace(':', "-")
396}
397
398pub struct ReportWriter;
399
400impl ReportWriter {
401 pub fn run_dir_name(run_id: &str) -> String {
402 format!("run_{run_id}")
403 }
404
405 pub fn report_file_name() -> String {
406 "run.json".to_string()
407 }
408
409 pub fn summary_file_name() -> String {
410 "run.summary.json".to_string()
411 }
412
413 pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
414 format!(
415 "{}/{}/{}",
416 Self::run_dir_name(run_id),
417 entity_name,
418 Self::report_file_name()
419 )
420 }
421
422 pub fn summary_relative_path(run_id: &str) -> String {
423 format!(
424 "{}/{}",
425 Self::run_dir_name(run_id),
426 Self::summary_file_name()
427 )
428 }
429
430 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
431 crate::io::storage::paths::normalize_local_path(
432 &report_dir
433 .join(Self::run_dir_name(run_id))
434 .join(entity_name),
435 )
436 }
437
438 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
439 crate::io::storage::paths::normalize_local_path(
440 &Self::entity_report_dir(report_dir, run_id, entity_name)
441 .join(Self::report_file_name()),
442 )
443 }
444
445 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
446 crate::io::storage::paths::normalize_local_path(
447 &report_dir
448 .join(Self::run_dir_name(run_id))
449 .join(Self::summary_file_name()),
450 )
451 }
452
453 pub fn write_report(
454 report_dir: &Path,
455 run_id: &str,
456 entity_name: &str,
457 report: &RunReport,
458 ) -> Result<PathBuf, ReportError> {
459 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
460 std::fs::create_dir_all(&entity_dir)?;
461 let report_path = Self::report_path(report_dir, run_id, entity_name);
462 let tmp_path = entity_dir.join(format!(
463 "{}.tmp-{}",
464 Self::report_file_name(),
465 unique_suffix()
466 ));
467
468 let json = serde_json::to_string_pretty(report)?;
469 let mut file = File::create(&tmp_path)?;
470 file.write_all(json.as_bytes())?;
471 file.sync_all()?;
472 std::fs::rename(&tmp_path, &report_path)?;
473
474 Ok(report_path)
475 }
476
477 pub fn write_summary(
478 report_dir: &Path,
479 run_id: &str,
480 report: &RunSummaryReport,
481 ) -> Result<PathBuf, ReportError> {
482 let run_dir = report_dir.join(Self::run_dir_name(run_id));
483 let run_dir = crate::io::storage::paths::normalize_local_path(&run_dir);
484 std::fs::create_dir_all(&run_dir)?;
485 let report_path = Self::summary_path(report_dir, run_id);
486 let tmp_path = run_dir.join(format!(
487 "{}.tmp-{}",
488 Self::summary_file_name(),
489 unique_suffix()
490 ));
491
492 let json = serde_json::to_string_pretty(report)?;
493 let mut file = File::create(&tmp_path)?;
494 file.write_all(json.as_bytes())?;
495 file.sync_all()?;
496 std::fs::rename(&tmp_path, &report_path)?;
497
498 Ok(report_path)
499 }
500}
501
502pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
503 if file_statuses.contains(&FileStatus::Failed) {
504 return (RunStatus::Failed, 1);
505 }
506 if file_statuses.contains(&FileStatus::Aborted) {
507 return (RunStatus::Aborted, 2);
508 }
509 if file_statuses.contains(&FileStatus::Rejected) {
510 return (RunStatus::Rejected, 0);
511 }
512 (RunStatus::Success, 0)
513}
514
515fn unique_suffix() -> String {
516 let nanos = SystemTime::now()
517 .duration_since(UNIX_EPOCH)
518 .map(|duration| duration.as_nanos())
519 .unwrap_or(0);
520 format!("{}-{}", std::process::id(), nanos)
521}