1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16 pub spec_version: String,
17 pub entity: EntityEcho,
18 pub source: SourceEcho,
19 pub sink: SinkEcho,
20 pub policy: PolicyEcho,
21 pub accepted_output: AcceptedOutputSummary,
22 pub schema_evolution: SchemaEvolutionSummary,
23 #[serde(default)]
24 #[serde(skip_serializing_if = "Vec::is_empty")]
25 pub unique_constraints: Vec<UniqueConstraintReport>,
26 pub results: ResultsTotals,
27 pub files: Vec<FileReport>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub struct RunSummaryReport {
33 pub spec_version: String,
34 pub tool: ToolInfo,
35 pub run: RunInfo,
36 pub config: ConfigEcho,
37 pub report: ReportEcho,
38 pub results: ResultsTotals,
39 pub entities: Vec<EntitySummary>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub struct EntitySummary {
45 pub name: String,
46 pub status: RunStatus,
47 pub results: ResultsTotals,
48 pub report_file: String,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "snake_case")]
53pub struct ToolInfo {
54 pub name: String,
55 pub version: String,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub git: Option<GitInfo>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub struct GitInfo {
63 pub commit: String,
64 pub dirty: bool,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69pub struct RunInfo {
70 pub run_id: String,
71 pub started_at: String,
72 pub finished_at: String,
73 pub duration_ms: u64,
74 pub status: RunStatus,
75 pub exit_code: i32,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub struct ConfigEcho {
81 pub path: String,
82 pub version: String,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub metadata: Option<serde_json::Value>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub struct EntityEcho {
90 pub name: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub metadata: Option<serde_json::Value>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub struct SourceEcho {
98 pub format: String,
99 pub path: String,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub options: Option<serde_json::Value>,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub cast_mode: Option<String>,
104 pub read_plan: SourceReadPlan,
105 pub resolved_inputs: ResolvedInputs,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "snake_case")]
110pub struct ResolvedInputs {
111 pub mode: ResolvedInputMode,
112 pub file_count: u64,
113 pub files: Vec<String>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(rename_all = "snake_case")]
118pub struct SinkEcho {
119 pub accepted: SinkTargetEcho,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub rejected: Option<SinkTargetEcho>,
122 pub archive: SinkArchiveEcho,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "snake_case")]
127pub struct AcceptedOutputSummary {
128 pub path: String,
129 #[serde(default)]
130 #[serde(skip_serializing_if = "Option::is_none")]
131 pub table_root_uri: Option<String>,
132 #[serde(default)]
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub write_mode: Option<String>,
135 pub accepted_rows: u64,
136 #[serde(default)]
137 pub files_written: Option<u64>,
138 pub parts_written: u64,
139 #[serde(default)]
140 #[serde(skip_serializing_if = "Vec::is_empty")]
141 pub part_files: Vec<String>,
142 #[serde(default)]
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub table_version: Option<i64>,
145 #[serde(default)]
146 #[serde(skip_serializing_if = "Option::is_none")]
147 pub snapshot_id: Option<i64>,
148 #[serde(default)]
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub iceberg_catalog_name: Option<String>,
151 #[serde(default)]
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub iceberg_database: Option<String>,
154 #[serde(default)]
155 #[serde(skip_serializing_if = "Option::is_none")]
156 pub iceberg_namespace: Option<String>,
157 #[serde(default)]
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub iceberg_table: Option<String>,
160 #[serde(default)]
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub delta_catalog_name: Option<String>,
163 #[serde(default)]
164 #[serde(skip_serializing_if = "Option::is_none")]
165 pub delta_catalog_schema: Option<String>,
166 #[serde(default)]
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub delta_catalog_table: Option<String>,
169 #[serde(default)]
170 pub total_bytes_written: Option<u64>,
171 #[serde(default)]
172 pub avg_file_size_mb: Option<f64>,
173 #[serde(default)]
174 pub small_files_count: Option<u64>,
175 #[serde(default)]
176 #[serde(skip_serializing_if = "Vec::is_empty")]
177 pub merge_key: Vec<String>,
178 #[serde(default)]
179 #[serde(skip_serializing_if = "Option::is_none")]
180 pub inserted_count: Option<u64>,
181 #[serde(default)]
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub updated_count: Option<u64>,
184 #[serde(default)]
185 #[serde(skip_serializing_if = "Option::is_none")]
186 pub closed_count: Option<u64>,
187 #[serde(default)]
188 #[serde(skip_serializing_if = "Option::is_none")]
189 pub unchanged_count: Option<u64>,
190 #[serde(default)]
191 #[serde(skip_serializing_if = "Option::is_none")]
192 pub target_rows_before: Option<u64>,
193 #[serde(default)]
194 #[serde(skip_serializing_if = "Option::is_none")]
195 pub target_rows_after: Option<u64>,
196 #[serde(default)]
197 #[serde(skip_serializing_if = "Option::is_none")]
198 pub merge_elapsed_ms: Option<u64>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize, Default)]
202#[serde(rename_all = "snake_case")]
203pub struct SchemaEvolutionSummary {
204 pub enabled: bool,
205 pub mode: String,
206 pub applied: bool,
207 #[serde(default)]
208 #[serde(skip_serializing_if = "Vec::is_empty")]
209 pub added_columns: Vec<String>,
210 pub incompatible_changes_detected: bool,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214#[serde(rename_all = "snake_case")]
215pub struct UniqueConstraintReport {
216 pub columns: Vec<String>,
217 pub duplicates_count: u64,
218 #[serde(default)]
219 #[serde(skip_serializing_if = "is_zero_u64")]
220 pub batch_duplicates_count: u64,
221 #[serde(default)]
222 #[serde(skip_serializing_if = "is_zero_u64")]
223 pub target_duplicates_count: u64,
224 pub affected_rows_count: u64,
225 pub action: String,
226 pub status_effect: String,
227 #[serde(default)]
228 #[serde(skip_serializing_if = "Vec::is_empty")]
229 pub samples: Vec<UniqueConstraintSampleReport>,
230}
231
232fn is_zero_u64(n: &u64) -> bool {
233 *n == 0
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(rename_all = "snake_case")]
238pub struct UniqueConstraintSampleReport {
239 pub values: std::collections::BTreeMap<String, String>,
240 pub count: u64,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub struct SinkTargetEcho {
246 pub format: String,
247 pub path: String,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251#[serde(rename_all = "snake_case")]
252pub struct ReportEcho {
253 pub path: String,
254 pub report_file: String,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258#[serde(rename_all = "snake_case")]
259pub struct SinkArchiveEcho {
260 pub enabled: bool,
261 #[serde(skip_serializing_if = "Option::is_none")]
262 pub path: Option<String>,
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
266#[serde(rename_all = "snake_case")]
267pub struct PolicyEcho {
268 pub severity: Severity,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272#[serde(rename_all = "snake_case")]
273pub struct ResultsTotals {
274 pub files_total: u64,
275 pub rows_total: u64,
276 pub accepted_total: u64,
277 pub rejected_total: u64,
278 pub warnings_total: u64,
279 pub errors_total: u64,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283#[serde(rename_all = "snake_case")]
284pub struct FileReport {
285 pub input_file: String,
286 pub status: FileStatus,
287 pub row_count: u64,
288 pub accepted_count: u64,
289 pub rejected_count: u64,
290 pub mismatch: FileMismatch,
291 pub output: FileOutput,
292 pub validation: FileValidation,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296#[serde(rename_all = "snake_case")]
297pub struct FileMismatch {
298 pub declared_columns_count: u64,
299 pub input_columns_count: u64,
300 pub missing_columns: Vec<String>,
301 pub extra_columns: Vec<String>,
302 pub mismatch_action: MismatchAction,
303 #[serde(skip_serializing_if = "Option::is_none")]
304 pub error: Option<MismatchIssue>,
305 #[serde(skip_serializing_if = "Option::is_none")]
306 pub warning: Option<String>,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310#[serde(rename_all = "snake_case")]
311pub struct MismatchIssue {
312 pub rule: String,
313 pub message: String,
314}
315
316#[derive(Debug, Clone, Serialize, Deserialize)]
317#[serde(rename_all = "snake_case")]
318pub struct FileOutput {
319 pub accepted_path: Option<String>,
320 pub rejected_path: Option<String>,
321 pub errors_path: Option<String>,
322 pub archived_path: Option<String>,
323}
324
325#[derive(Debug, Clone, Serialize, Deserialize)]
326#[serde(rename_all = "snake_case")]
327pub struct FileValidation {
328 pub errors: u64,
329 pub warnings: u64,
330 pub rules: Vec<RuleSummary>,
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize)]
334#[serde(rename_all = "snake_case")]
335pub struct RuleSummary {
336 pub rule: RuleName,
337 pub severity: Severity,
338 pub violations: u64,
339 pub columns: Vec<ColumnSummary>,
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize)]
343#[serde(rename_all = "snake_case")]
344pub struct ColumnSummary {
345 pub column: String,
346 pub violations: u64,
347 #[serde(skip_serializing_if = "Option::is_none")]
348 pub target_type: Option<String>,
349 #[serde(skip_serializing_if = "Option::is_none")]
350 pub source: Option<String>,
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
354#[serde(rename_all = "snake_case")]
355pub enum FileStatus {
356 Success,
357 Rejected,
358 Aborted,
359 Failed,
360}
361
362#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
363#[serde(rename_all = "snake_case")]
364pub enum RunStatus {
365 Success,
366 SuccessWithWarnings,
367 Rejected,
368 Aborted,
369 Failed,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
373#[serde(rename_all = "snake_case")]
374pub enum Severity {
375 Warn,
376 Reject,
377 Abort,
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
381#[serde(rename_all = "snake_case")]
382pub enum RuleName {
383 NotNull,
384 CastError,
385 Unique,
386 SchemaError,
387}
388
389#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
390#[serde(rename_all = "snake_case")]
391pub enum MismatchAction {
392 None,
393 FilledNulls,
394 IgnoredExtras,
395 RejectedFile,
396 Aborted,
397}
398
399#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
400#[serde(rename_all = "snake_case")]
401pub enum ResolvedInputMode {
402 Directory,
403 File,
404 Glob,
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
408#[serde(rename_all = "snake_case")]
409pub enum SourceReadPlan {
410 RawAndTyped,
411}
412
413#[derive(Debug)]
414pub enum ReportError {
415 Io(std::io::Error),
416 Serialize(serde_json::Error),
417}
418
419impl std::fmt::Display for ReportError {
420 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421 match self {
422 ReportError::Io(err) => write!(f, "report io error: {err}"),
423 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
424 }
425 }
426}
427
428impl std::error::Error for ReportError {}
429
430impl From<std::io::Error> for ReportError {
431 fn from(err: std::io::Error) -> Self {
432 Self::Io(err)
433 }
434}
435
436impl From<serde_json::Error> for ReportError {
437 fn from(err: serde_json::Error) -> Self {
438 Self::Serialize(err)
439 }
440}
441
442pub trait ReportFormatter {
443 fn format_name(&self) -> &'static str;
444 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
445 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
446}
447
448pub struct JsonReportFormatter;
449
450impl ReportFormatter for JsonReportFormatter {
451 fn format_name(&self) -> &'static str {
452 "json"
453 }
454
455 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
456 Ok(serde_json::to_string_pretty(report)?)
457 }
458
459 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
460 Ok(serde_json::to_string_pretty(report)?)
461 }
462}
463
464pub fn now_rfc3339() -> String {
465 OffsetDateTime::now_utc()
466 .format(&Rfc3339)
467 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
468}
469
470pub fn run_id_from_timestamp(timestamp: &str) -> String {
471 timestamp.replace(':', "-")
472}
473
474pub struct ReportWriter;
475
476impl ReportWriter {
477 pub fn run_dir_name(run_id: &str) -> String {
478 format!("run_{run_id}")
479 }
480
481 pub fn report_file_name() -> String {
482 "run.json".to_string()
483 }
484
485 pub fn summary_file_name() -> String {
486 "run.summary.json".to_string()
487 }
488
489 pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
490 format!(
491 "{}/{}/{}",
492 Self::run_dir_name(run_id),
493 entity_name,
494 Self::report_file_name()
495 )
496 }
497
498 pub fn summary_relative_path(run_id: &str) -> String {
499 format!(
500 "{}/{}",
501 Self::run_dir_name(run_id),
502 Self::summary_file_name()
503 )
504 }
505
506 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
507 crate::io::storage::paths::normalize_local_path(
508 &report_dir
509 .join(Self::run_dir_name(run_id))
510 .join(entity_name),
511 )
512 }
513
514 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
515 crate::io::storage::paths::normalize_local_path(
516 &Self::entity_report_dir(report_dir, run_id, entity_name)
517 .join(Self::report_file_name()),
518 )
519 }
520
521 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
522 crate::io::storage::paths::normalize_local_path(
523 &report_dir
524 .join(Self::run_dir_name(run_id))
525 .join(Self::summary_file_name()),
526 )
527 }
528
529 pub fn write_report(
530 report_dir: &Path,
531 run_id: &str,
532 entity_name: &str,
533 report: &RunReport,
534 ) -> Result<PathBuf, ReportError> {
535 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
536 std::fs::create_dir_all(&entity_dir)?;
537 let report_path = Self::report_path(report_dir, run_id, entity_name);
538 let tmp_path = entity_dir.join(format!(
539 "{}.tmp-{}",
540 Self::report_file_name(),
541 unique_suffix()
542 ));
543
544 let json = serde_json::to_string_pretty(report)?;
545 let mut file = File::create(&tmp_path)?;
546 file.write_all(json.as_bytes())?;
547 file.sync_all()?;
548 std::fs::rename(&tmp_path, &report_path)?;
549
550 Ok(report_path)
551 }
552
553 pub fn write_summary(
554 report_dir: &Path,
555 run_id: &str,
556 report: &RunSummaryReport,
557 ) -> Result<PathBuf, ReportError> {
558 let run_dir = report_dir.join(Self::run_dir_name(run_id));
559 let run_dir = crate::io::storage::paths::normalize_local_path(&run_dir);
560 std::fs::create_dir_all(&run_dir)?;
561 let report_path = Self::summary_path(report_dir, run_id);
562 let tmp_path = run_dir.join(format!(
563 "{}.tmp-{}",
564 Self::summary_file_name(),
565 unique_suffix()
566 ));
567
568 let json = serde_json::to_string_pretty(report)?;
569 let mut file = File::create(&tmp_path)?;
570 file.write_all(json.as_bytes())?;
571 file.sync_all()?;
572 std::fs::rename(&tmp_path, &report_path)?;
573
574 Ok(report_path)
575 }
576}
577
578pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
579 if file_statuses.contains(&FileStatus::Failed) {
580 return (RunStatus::Failed, 1);
581 }
582 if file_statuses.contains(&FileStatus::Aborted) {
583 return (RunStatus::Aborted, 2);
584 }
585 if file_statuses.contains(&FileStatus::Rejected) {
586 return (RunStatus::Rejected, 0);
587 }
588 (RunStatus::Success, 0)
589}
590
591fn unique_suffix() -> String {
592 let nanos = SystemTime::now()
593 .duration_since(UNIX_EPOCH)
594 .map(|duration| duration.as_nanos())
595 .unwrap_or(0);
596 format!("{}-{}", std::process::id(), nanos)
597}