1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16 pub spec_version: String,
17 pub entity: EntityEcho,
18 pub source: SourceEcho,
19 pub sink: SinkEcho,
20 pub policy: PolicyEcho,
21 pub accepted_output: AcceptedOutputSummary,
22 pub results: ResultsTotals,
23 pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29 pub spec_version: String,
30 pub tool: ToolInfo,
31 pub run: RunInfo,
32 pub config: ConfigEcho,
33 pub report: ReportEcho,
34 pub results: ResultsTotals,
35 pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41 pub name: String,
42 pub status: RunStatus,
43 pub results: ResultsTotals,
44 pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50 pub name: String,
51 pub version: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59 pub commit: String,
60 pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66 pub run_id: String,
67 pub started_at: String,
68 pub finished_at: String,
69 pub duration_ms: u64,
70 pub status: RunStatus,
71 pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77 pub path: String,
78 pub version: String,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86 pub name: String,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94 pub format: String,
95 pub path: String,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub options: Option<serde_json::Value>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub cast_mode: Option<String>,
100 pub read_plan: SourceReadPlan,
101 pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107 pub mode: ResolvedInputMode,
108 pub file_count: u64,
109 pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115 pub accepted: SinkTargetEcho,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub rejected: Option<SinkTargetEcho>,
118 pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124 pub path: String,
125 #[serde(default)]
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub table_root_uri: Option<String>,
128 #[serde(default)]
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub write_mode: Option<String>,
131 pub accepted_rows: u64,
132 #[serde(default)]
133 pub files_written: u64,
134 pub parts_written: u64,
135 #[serde(default)]
136 #[serde(skip_serializing_if = "Vec::is_empty")]
137 pub part_files: Vec<String>,
138 #[serde(default)]
139 #[serde(skip_serializing_if = "Option::is_none")]
140 pub table_version: Option<i64>,
141 #[serde(default)]
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub snapshot_id: Option<i64>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147#[serde(rename_all = "snake_case")]
148pub struct SinkTargetEcho {
149 pub format: String,
150 pub path: String,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub struct ReportEcho {
156 pub path: String,
157 pub report_file: String,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161#[serde(rename_all = "snake_case")]
162pub struct SinkArchiveEcho {
163 pub enabled: bool,
164 #[serde(skip_serializing_if = "Option::is_none")]
165 pub path: Option<String>,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(rename_all = "snake_case")]
170pub struct PolicyEcho {
171 pub severity: Severity,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct ResultsTotals {
177 pub files_total: u64,
178 pub rows_total: u64,
179 pub accepted_total: u64,
180 pub rejected_total: u64,
181 pub warnings_total: u64,
182 pub errors_total: u64,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
186#[serde(rename_all = "snake_case")]
187pub struct FileReport {
188 pub input_file: String,
189 pub status: FileStatus,
190 pub row_count: u64,
191 pub accepted_count: u64,
192 pub rejected_count: u64,
193 pub mismatch: FileMismatch,
194 pub output: FileOutput,
195 pub validation: FileValidation,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(rename_all = "snake_case")]
200pub struct FileMismatch {
201 pub declared_columns_count: u64,
202 pub input_columns_count: u64,
203 pub missing_columns: Vec<String>,
204 pub extra_columns: Vec<String>,
205 pub mismatch_action: MismatchAction,
206 #[serde(skip_serializing_if = "Option::is_none")]
207 pub error: Option<MismatchIssue>,
208 #[serde(skip_serializing_if = "Option::is_none")]
209 pub warning: Option<String>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
213#[serde(rename_all = "snake_case")]
214pub struct MismatchIssue {
215 pub rule: String,
216 pub message: String,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220#[serde(rename_all = "snake_case")]
221pub struct FileOutput {
222 pub accepted_path: Option<String>,
223 pub rejected_path: Option<String>,
224 pub errors_path: Option<String>,
225 pub archived_path: Option<String>,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
229#[serde(rename_all = "snake_case")]
230pub struct FileValidation {
231 pub errors: u64,
232 pub warnings: u64,
233 pub rules: Vec<RuleSummary>,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
237#[serde(rename_all = "snake_case")]
238pub struct RuleSummary {
239 pub rule: RuleName,
240 pub severity: Severity,
241 pub violations: u64,
242 pub columns: Vec<ColumnSummary>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
246#[serde(rename_all = "snake_case")]
247pub struct ColumnSummary {
248 pub column: String,
249 pub violations: u64,
250 #[serde(skip_serializing_if = "Option::is_none")]
251 pub target_type: Option<String>,
252 #[serde(skip_serializing_if = "Option::is_none")]
253 pub source: Option<String>,
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
257#[serde(rename_all = "snake_case")]
258pub enum FileStatus {
259 Success,
260 Rejected,
261 Aborted,
262 Failed,
263}
264
265#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
266#[serde(rename_all = "snake_case")]
267pub enum RunStatus {
268 Success,
269 SuccessWithWarnings,
270 Rejected,
271 Aborted,
272 Failed,
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
276#[serde(rename_all = "snake_case")]
277pub enum Severity {
278 Warn,
279 Reject,
280 Abort,
281}
282
283#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
284#[serde(rename_all = "snake_case")]
285pub enum RuleName {
286 NotNull,
287 CastError,
288 Unique,
289 SchemaError,
290}
291
292#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
293#[serde(rename_all = "snake_case")]
294pub enum MismatchAction {
295 None,
296 FilledNulls,
297 IgnoredExtras,
298 RejectedFile,
299 Aborted,
300}
301
302#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
303#[serde(rename_all = "snake_case")]
304pub enum ResolvedInputMode {
305 Directory,
306 File,
307 Glob,
308}
309
310#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
311#[serde(rename_all = "snake_case")]
312pub enum SourceReadPlan {
313 RawAndTyped,
314}
315
316#[derive(Debug)]
317pub enum ReportError {
318 Io(std::io::Error),
319 Serialize(serde_json::Error),
320}
321
322impl std::fmt::Display for ReportError {
323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324 match self {
325 ReportError::Io(err) => write!(f, "report io error: {err}"),
326 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
327 }
328 }
329}
330
331impl std::error::Error for ReportError {}
332
333impl From<std::io::Error> for ReportError {
334 fn from(err: std::io::Error) -> Self {
335 Self::Io(err)
336 }
337}
338
339impl From<serde_json::Error> for ReportError {
340 fn from(err: serde_json::Error) -> Self {
341 Self::Serialize(err)
342 }
343}
344
345pub trait ReportFormatter {
346 fn format_name(&self) -> &'static str;
347 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
348 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
349}
350
351pub struct JsonReportFormatter;
352
353impl ReportFormatter for JsonReportFormatter {
354 fn format_name(&self) -> &'static str {
355 "json"
356 }
357
358 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
359 Ok(serde_json::to_string_pretty(report)?)
360 }
361
362 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
363 Ok(serde_json::to_string_pretty(report)?)
364 }
365}
366
367pub fn now_rfc3339() -> String {
368 OffsetDateTime::now_utc()
369 .format(&Rfc3339)
370 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
371}
372
373pub fn run_id_from_timestamp(timestamp: &str) -> String {
374 timestamp.replace(':', "-")
375}
376
377pub struct ReportWriter;
378
379impl ReportWriter {
380 pub fn run_dir_name(run_id: &str) -> String {
381 format!("run_{run_id}")
382 }
383
384 pub fn report_file_name() -> String {
385 "run.json".to_string()
386 }
387
388 pub fn summary_file_name() -> String {
389 "run.summary.json".to_string()
390 }
391
392 pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
393 format!(
394 "{}/{}/{}",
395 Self::run_dir_name(run_id),
396 entity_name,
397 Self::report_file_name()
398 )
399 }
400
401 pub fn summary_relative_path(run_id: &str) -> String {
402 format!(
403 "{}/{}",
404 Self::run_dir_name(run_id),
405 Self::summary_file_name()
406 )
407 }
408
409 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
410 report_dir
411 .join(Self::run_dir_name(run_id))
412 .join(entity_name)
413 }
414
415 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
416 Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
417 }
418
419 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
420 report_dir
421 .join(Self::run_dir_name(run_id))
422 .join(Self::summary_file_name())
423 }
424
425 pub fn write_report(
426 report_dir: &Path,
427 run_id: &str,
428 entity_name: &str,
429 report: &RunReport,
430 ) -> Result<PathBuf, ReportError> {
431 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
432 std::fs::create_dir_all(&entity_dir)?;
433 let report_path = Self::report_path(report_dir, run_id, entity_name);
434 let tmp_path = entity_dir.join(format!(
435 "{}.tmp-{}",
436 Self::report_file_name(),
437 unique_suffix()
438 ));
439
440 let json = serde_json::to_string_pretty(report)?;
441 let mut file = File::create(&tmp_path)?;
442 file.write_all(json.as_bytes())?;
443 file.sync_all()?;
444 std::fs::rename(&tmp_path, &report_path)?;
445
446 Ok(report_path)
447 }
448
449 pub fn write_summary(
450 report_dir: &Path,
451 run_id: &str,
452 report: &RunSummaryReport,
453 ) -> Result<PathBuf, ReportError> {
454 let run_dir = report_dir.join(Self::run_dir_name(run_id));
455 std::fs::create_dir_all(&run_dir)?;
456 let report_path = Self::summary_path(report_dir, run_id);
457 let tmp_path = run_dir.join(format!(
458 "{}.tmp-{}",
459 Self::summary_file_name(),
460 unique_suffix()
461 ));
462
463 let json = serde_json::to_string_pretty(report)?;
464 let mut file = File::create(&tmp_path)?;
465 file.write_all(json.as_bytes())?;
466 file.sync_all()?;
467 std::fs::rename(&tmp_path, &report_path)?;
468
469 Ok(report_path)
470 }
471}
472
473pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
474 if file_statuses.contains(&FileStatus::Failed) {
475 return (RunStatus::Failed, 1);
476 }
477 if file_statuses.contains(&FileStatus::Aborted) {
478 return (RunStatus::Aborted, 2);
479 }
480 if file_statuses.contains(&FileStatus::Rejected) {
481 return (RunStatus::Rejected, 0);
482 }
483 (RunStatus::Success, 0)
484}
485
486fn unique_suffix() -> String {
487 let nanos = SystemTime::now()
488 .duration_since(UNIX_EPOCH)
489 .map(|duration| duration.as_nanos())
490 .unwrap_or(0);
491 format!("{}-{}", std::process::id(), nanos)
492}