1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16 pub spec_version: String,
17 pub entity: EntityEcho,
18 pub source: SourceEcho,
19 pub sink: SinkEcho,
20 pub policy: PolicyEcho,
21 pub accepted_output: AcceptedOutputSummary,
22 pub results: ResultsTotals,
23 pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29 pub spec_version: String,
30 pub tool: ToolInfo,
31 pub run: RunInfo,
32 pub config: ConfigEcho,
33 pub report: ReportEcho,
34 pub results: ResultsTotals,
35 pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41 pub name: String,
42 pub status: RunStatus,
43 pub results: ResultsTotals,
44 pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50 pub name: String,
51 pub version: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59 pub commit: String,
60 pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66 pub run_id: String,
67 pub started_at: String,
68 pub finished_at: String,
69 pub duration_ms: u64,
70 pub status: RunStatus,
71 pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77 pub path: String,
78 pub version: String,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86 pub name: String,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94 pub format: String,
95 pub path: String,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub options: Option<serde_json::Value>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub cast_mode: Option<String>,
100 pub read_plan: SourceReadPlan,
101 pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107 pub mode: ResolvedInputMode,
108 pub file_count: u64,
109 pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115 pub accepted: SinkTargetEcho,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub rejected: Option<SinkTargetEcho>,
118 pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124 pub path: String,
125 pub accepted_rows: u64,
126 pub parts_written: u64,
127 #[serde(default)]
128 #[serde(skip_serializing_if = "Vec::is_empty")]
129 pub part_files: Vec<String>,
130 #[serde(default)]
131 #[serde(skip_serializing_if = "Option::is_none")]
132 pub table_version: Option<i64>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub struct SinkTargetEcho {
138 pub format: String,
139 pub path: String,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(rename_all = "snake_case")]
144pub struct ReportEcho {
145 pub path: String,
146 pub report_file: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150#[serde(rename_all = "snake_case")]
151pub struct SinkArchiveEcho {
152 pub enabled: bool,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub path: Option<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub struct PolicyEcho {
160 pub severity: Severity,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(rename_all = "snake_case")]
165pub struct ResultsTotals {
166 pub files_total: u64,
167 pub rows_total: u64,
168 pub accepted_total: u64,
169 pub rejected_total: u64,
170 pub warnings_total: u64,
171 pub errors_total: u64,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct FileReport {
177 pub input_file: String,
178 pub status: FileStatus,
179 pub row_count: u64,
180 pub accepted_count: u64,
181 pub rejected_count: u64,
182 pub mismatch: FileMismatch,
183 pub output: FileOutput,
184 pub validation: FileValidation,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188#[serde(rename_all = "snake_case")]
189pub struct FileMismatch {
190 pub declared_columns_count: u64,
191 pub input_columns_count: u64,
192 pub missing_columns: Vec<String>,
193 pub extra_columns: Vec<String>,
194 pub mismatch_action: MismatchAction,
195 #[serde(skip_serializing_if = "Option::is_none")]
196 pub error: Option<MismatchIssue>,
197 #[serde(skip_serializing_if = "Option::is_none")]
198 pub warning: Option<String>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub struct MismatchIssue {
204 pub rule: String,
205 pub message: String,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct FileOutput {
211 pub accepted_path: Option<String>,
212 pub rejected_path: Option<String>,
213 pub errors_path: Option<String>,
214 pub archived_path: Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "snake_case")]
219pub struct FileValidation {
220 pub errors: u64,
221 pub warnings: u64,
222 pub rules: Vec<RuleSummary>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(rename_all = "snake_case")]
227pub struct RuleSummary {
228 pub rule: RuleName,
229 pub severity: Severity,
230 pub violations: u64,
231 pub columns: Vec<ColumnSummary>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235#[serde(rename_all = "snake_case")]
236pub struct ColumnSummary {
237 pub column: String,
238 pub violations: u64,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 pub target_type: Option<String>,
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub source: Option<String>,
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
246#[serde(rename_all = "snake_case")]
247pub enum FileStatus {
248 Success,
249 Rejected,
250 Aborted,
251 Failed,
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
255#[serde(rename_all = "snake_case")]
256pub enum RunStatus {
257 Success,
258 SuccessWithWarnings,
259 Rejected,
260 Aborted,
261 Failed,
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
265#[serde(rename_all = "snake_case")]
266pub enum Severity {
267 Warn,
268 Reject,
269 Abort,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
273#[serde(rename_all = "snake_case")]
274pub enum RuleName {
275 NotNull,
276 CastError,
277 Unique,
278 SchemaError,
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub enum MismatchAction {
284 None,
285 FilledNulls,
286 IgnoredExtras,
287 RejectedFile,
288 Aborted,
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
292#[serde(rename_all = "snake_case")]
293pub enum ResolvedInputMode {
294 Directory,
295 File,
296 Glob,
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
300#[serde(rename_all = "snake_case")]
301pub enum SourceReadPlan {
302 RawAndTyped,
303}
304
305#[derive(Debug)]
306pub enum ReportError {
307 Io(std::io::Error),
308 Serialize(serde_json::Error),
309}
310
311impl std::fmt::Display for ReportError {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 match self {
314 ReportError::Io(err) => write!(f, "report io error: {err}"),
315 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
316 }
317 }
318}
319
320impl std::error::Error for ReportError {}
321
322impl From<std::io::Error> for ReportError {
323 fn from(err: std::io::Error) -> Self {
324 Self::Io(err)
325 }
326}
327
328impl From<serde_json::Error> for ReportError {
329 fn from(err: serde_json::Error) -> Self {
330 Self::Serialize(err)
331 }
332}
333
334pub trait ReportFormatter {
335 fn format_name(&self) -> &'static str;
336 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
337 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
338}
339
340pub struct JsonReportFormatter;
341
342impl ReportFormatter for JsonReportFormatter {
343 fn format_name(&self) -> &'static str {
344 "json"
345 }
346
347 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
348 Ok(serde_json::to_string_pretty(report)?)
349 }
350
351 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
352 Ok(serde_json::to_string_pretty(report)?)
353 }
354}
355
356pub fn now_rfc3339() -> String {
357 OffsetDateTime::now_utc()
358 .format(&Rfc3339)
359 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
360}
361
362pub fn run_id_from_timestamp(timestamp: &str) -> String {
363 timestamp.replace(':', "-")
364}
365
366pub struct ReportWriter;
367
368impl ReportWriter {
369 pub fn run_dir_name(run_id: &str) -> String {
370 format!("run_{run_id}")
371 }
372
373 pub fn report_file_name() -> String {
374 "run.json".to_string()
375 }
376
377 pub fn summary_file_name() -> String {
378 "run.summary.json".to_string()
379 }
380
381 pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
382 format!(
383 "{}/{}/{}",
384 Self::run_dir_name(run_id),
385 entity_name,
386 Self::report_file_name()
387 )
388 }
389
390 pub fn summary_relative_path(run_id: &str) -> String {
391 format!(
392 "{}/{}",
393 Self::run_dir_name(run_id),
394 Self::summary_file_name()
395 )
396 }
397
398 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
399 report_dir
400 .join(Self::run_dir_name(run_id))
401 .join(entity_name)
402 }
403
404 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
405 Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
406 }
407
408 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
409 report_dir
410 .join(Self::run_dir_name(run_id))
411 .join(Self::summary_file_name())
412 }
413
414 pub fn write_report(
415 report_dir: &Path,
416 run_id: &str,
417 entity_name: &str,
418 report: &RunReport,
419 ) -> Result<PathBuf, ReportError> {
420 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
421 std::fs::create_dir_all(&entity_dir)?;
422 let report_path = Self::report_path(report_dir, run_id, entity_name);
423 let tmp_path = entity_dir.join(format!(
424 "{}.tmp-{}",
425 Self::report_file_name(),
426 unique_suffix()
427 ));
428
429 let json = serde_json::to_string_pretty(report)?;
430 let mut file = File::create(&tmp_path)?;
431 file.write_all(json.as_bytes())?;
432 file.sync_all()?;
433 std::fs::rename(&tmp_path, &report_path)?;
434
435 Ok(report_path)
436 }
437
438 pub fn write_summary(
439 report_dir: &Path,
440 run_id: &str,
441 report: &RunSummaryReport,
442 ) -> Result<PathBuf, ReportError> {
443 let run_dir = report_dir.join(Self::run_dir_name(run_id));
444 std::fs::create_dir_all(&run_dir)?;
445 let report_path = Self::summary_path(report_dir, run_id);
446 let tmp_path = run_dir.join(format!(
447 "{}.tmp-{}",
448 Self::summary_file_name(),
449 unique_suffix()
450 ));
451
452 let json = serde_json::to_string_pretty(report)?;
453 let mut file = File::create(&tmp_path)?;
454 file.write_all(json.as_bytes())?;
455 file.sync_all()?;
456 std::fs::rename(&tmp_path, &report_path)?;
457
458 Ok(report_path)
459 }
460}
461
462pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
463 if file_statuses.contains(&FileStatus::Failed) {
464 return (RunStatus::Failed, 1);
465 }
466 if file_statuses.contains(&FileStatus::Aborted) {
467 return (RunStatus::Aborted, 2);
468 }
469 if file_statuses.contains(&FileStatus::Rejected) {
470 return (RunStatus::Rejected, 0);
471 }
472 (RunStatus::Success, 0)
473}
474
475fn unique_suffix() -> String {
476 let nanos = SystemTime::now()
477 .duration_since(UNIX_EPOCH)
478 .map(|duration| duration.as_nanos())
479 .unwrap_or(0);
480 format!("{}-{}", std::process::id(), nanos)
481}