1use std::fs::File;
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7use time::{format_description::well_known::Rfc3339, OffsetDateTime};
8
9pub mod build;
10pub mod entity;
11pub mod output;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub struct RunReport {
16 pub spec_version: String,
17 pub entity: EntityEcho,
18 pub source: SourceEcho,
19 pub sink: SinkEcho,
20 pub policy: PolicyEcho,
21 pub accepted_output: AcceptedOutputSummary,
22 pub results: ResultsTotals,
23 pub files: Vec<FileReport>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub struct RunSummaryReport {
29 pub spec_version: String,
30 pub tool: ToolInfo,
31 pub run: RunInfo,
32 pub config: ConfigEcho,
33 pub report: ReportEcho,
34 pub results: ResultsTotals,
35 pub entities: Vec<EntitySummary>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub struct EntitySummary {
41 pub name: String,
42 pub status: RunStatus,
43 pub results: ResultsTotals,
44 pub report_file: String,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub struct ToolInfo {
50 pub name: String,
51 pub version: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub git: Option<GitInfo>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57#[serde(rename_all = "snake_case")]
58pub struct GitInfo {
59 pub commit: String,
60 pub dirty: bool,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub struct RunInfo {
66 pub run_id: String,
67 pub started_at: String,
68 pub finished_at: String,
69 pub duration_ms: u64,
70 pub status: RunStatus,
71 pub exit_code: i32,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub struct ConfigEcho {
77 pub path: String,
78 pub version: String,
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub metadata: Option<serde_json::Value>,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub struct EntityEcho {
86 pub name: String,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub metadata: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92#[serde(rename_all = "snake_case")]
93pub struct SourceEcho {
94 pub format: String,
95 pub path: String,
96 #[serde(skip_serializing_if = "Option::is_none")]
97 pub options: Option<serde_json::Value>,
98 #[serde(skip_serializing_if = "Option::is_none")]
99 pub cast_mode: Option<String>,
100 pub read_plan: SourceReadPlan,
101 pub resolved_inputs: ResolvedInputs,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105#[serde(rename_all = "snake_case")]
106pub struct ResolvedInputs {
107 pub mode: ResolvedInputMode,
108 pub file_count: u64,
109 pub files: Vec<String>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "snake_case")]
114pub struct SinkEcho {
115 pub accepted: SinkTargetEcho,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub rejected: Option<SinkTargetEcho>,
118 pub archive: SinkArchiveEcho,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122#[serde(rename_all = "snake_case")]
123pub struct AcceptedOutputSummary {
124 pub path: String,
125 pub accepted_rows: u64,
126 pub parts_written: u64,
127 #[serde(default)]
128 #[serde(skip_serializing_if = "Vec::is_empty")]
129 pub part_files: Vec<String>,
130 #[serde(default)]
131 #[serde(skip_serializing_if = "Option::is_none")]
132 pub table_version: Option<i64>,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "snake_case")]
137pub struct SinkTargetEcho {
138 pub format: String,
139 pub path: String,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(rename_all = "snake_case")]
144pub struct ReportEcho {
145 pub path: String,
146 pub report_file: String,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150#[serde(rename_all = "snake_case")]
151pub struct SinkArchiveEcho {
152 pub enabled: bool,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub path: Option<String>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub struct PolicyEcho {
160 pub severity: Severity,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(rename_all = "snake_case")]
165pub struct ResultsTotals {
166 pub files_total: u64,
167 pub rows_total: u64,
168 pub accepted_total: u64,
169 pub rejected_total: u64,
170 pub warnings_total: u64,
171 pub errors_total: u64,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175#[serde(rename_all = "snake_case")]
176pub struct FileReport {
177 pub input_file: String,
178 pub status: FileStatus,
179 pub row_count: u64,
180 pub accepted_count: u64,
181 pub rejected_count: u64,
182 pub mismatch: FileMismatch,
183 pub output: FileOutput,
184 pub validation: FileValidation,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188#[serde(rename_all = "snake_case")]
189pub struct FileMismatch {
190 pub declared_columns_count: u64,
191 pub input_columns_count: u64,
192 pub missing_columns: Vec<String>,
193 pub extra_columns: Vec<String>,
194 pub mismatch_action: MismatchAction,
195 #[serde(skip_serializing_if = "Option::is_none")]
196 pub error: Option<MismatchIssue>,
197 #[serde(skip_serializing_if = "Option::is_none")]
198 pub warning: Option<String>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(rename_all = "snake_case")]
203pub struct MismatchIssue {
204 pub rule: String,
205 pub message: String,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209#[serde(rename_all = "snake_case")]
210pub struct FileOutput {
211 pub accepted_path: Option<String>,
212 pub rejected_path: Option<String>,
213 pub errors_path: Option<String>,
214 pub archived_path: Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "snake_case")]
219pub struct FileValidation {
220 pub errors: u64,
221 pub warnings: u64,
222 pub rules: Vec<RuleSummary>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(rename_all = "snake_case")]
227pub struct RuleSummary {
228 pub rule: RuleName,
229 pub severity: Severity,
230 pub violations: u64,
231 pub columns: Vec<ColumnSummary>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235#[serde(rename_all = "snake_case")]
236pub struct ColumnSummary {
237 pub column: String,
238 pub violations: u64,
239 #[serde(skip_serializing_if = "Option::is_none")]
240 pub target_type: Option<String>,
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
244#[serde(rename_all = "snake_case")]
245pub enum FileStatus {
246 Success,
247 Rejected,
248 Aborted,
249 Failed,
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum RunStatus {
255 Success,
256 SuccessWithWarnings,
257 Rejected,
258 Aborted,
259 Failed,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
263#[serde(rename_all = "snake_case")]
264pub enum Severity {
265 Warn,
266 Reject,
267 Abort,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
271#[serde(rename_all = "snake_case")]
272pub enum RuleName {
273 NotNull,
274 CastError,
275 Unique,
276 SchemaError,
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(rename_all = "snake_case")]
281pub enum MismatchAction {
282 None,
283 FilledNulls,
284 IgnoredExtras,
285 RejectedFile,
286 Aborted,
287}
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
290#[serde(rename_all = "snake_case")]
291pub enum ResolvedInputMode {
292 Directory,
293 File,
294 Glob,
295}
296
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
298#[serde(rename_all = "snake_case")]
299pub enum SourceReadPlan {
300 RawAndTyped,
301}
302
303#[derive(Debug)]
304pub enum ReportError {
305 Io(std::io::Error),
306 Serialize(serde_json::Error),
307}
308
309impl std::fmt::Display for ReportError {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 match self {
312 ReportError::Io(err) => write!(f, "report io error: {err}"),
313 ReportError::Serialize(err) => write!(f, "report serialize error: {err}"),
314 }
315 }
316}
317
318impl std::error::Error for ReportError {}
319
320impl From<std::io::Error> for ReportError {
321 fn from(err: std::io::Error) -> Self {
322 Self::Io(err)
323 }
324}
325
326impl From<serde_json::Error> for ReportError {
327 fn from(err: serde_json::Error) -> Self {
328 Self::Serialize(err)
329 }
330}
331
332pub trait ReportFormatter {
333 fn format_name(&self) -> &'static str;
334 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError>;
335 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError>;
336}
337
338pub struct JsonReportFormatter;
339
340impl ReportFormatter for JsonReportFormatter {
341 fn format_name(&self) -> &'static str {
342 "json"
343 }
344
345 fn serialize_run(&self, report: &RunReport) -> Result<String, ReportError> {
346 Ok(serde_json::to_string_pretty(report)?)
347 }
348
349 fn serialize_summary(&self, report: &RunSummaryReport) -> Result<String, ReportError> {
350 Ok(serde_json::to_string_pretty(report)?)
351 }
352}
353
354pub fn now_rfc3339() -> String {
355 OffsetDateTime::now_utc()
356 .format(&Rfc3339)
357 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
358}
359
360pub fn run_id_from_timestamp(timestamp: &str) -> String {
361 timestamp.replace(':', "-")
362}
363
364pub struct ReportWriter;
365
366impl ReportWriter {
367 pub fn run_dir_name(run_id: &str) -> String {
368 format!("run_{run_id}")
369 }
370
371 pub fn report_file_name() -> String {
372 "run.json".to_string()
373 }
374
375 pub fn summary_file_name() -> String {
376 "run.summary.json".to_string()
377 }
378
379 pub fn report_relative_path(run_id: &str, entity_name: &str) -> String {
380 format!(
381 "{}/{}/{}",
382 Self::run_dir_name(run_id),
383 entity_name,
384 Self::report_file_name()
385 )
386 }
387
388 pub fn summary_relative_path(run_id: &str) -> String {
389 format!(
390 "{}/{}",
391 Self::run_dir_name(run_id),
392 Self::summary_file_name()
393 )
394 }
395
396 pub fn entity_report_dir(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
397 report_dir
398 .join(Self::run_dir_name(run_id))
399 .join(entity_name)
400 }
401
402 pub fn report_path(report_dir: &Path, run_id: &str, entity_name: &str) -> PathBuf {
403 Self::entity_report_dir(report_dir, run_id, entity_name).join(Self::report_file_name())
404 }
405
406 pub fn summary_path(report_dir: &Path, run_id: &str) -> PathBuf {
407 report_dir
408 .join(Self::run_dir_name(run_id))
409 .join(Self::summary_file_name())
410 }
411
412 pub fn write_report(
413 report_dir: &Path,
414 run_id: &str,
415 entity_name: &str,
416 report: &RunReport,
417 ) -> Result<PathBuf, ReportError> {
418 let entity_dir = Self::entity_report_dir(report_dir, run_id, entity_name);
419 std::fs::create_dir_all(&entity_dir)?;
420 let report_path = Self::report_path(report_dir, run_id, entity_name);
421 let tmp_path = entity_dir.join(format!(
422 "{}.tmp-{}",
423 Self::report_file_name(),
424 unique_suffix()
425 ));
426
427 let json = serde_json::to_string_pretty(report)?;
428 let mut file = File::create(&tmp_path)?;
429 file.write_all(json.as_bytes())?;
430 file.sync_all()?;
431 std::fs::rename(&tmp_path, &report_path)?;
432
433 Ok(report_path)
434 }
435
436 pub fn write_summary(
437 report_dir: &Path,
438 run_id: &str,
439 report: &RunSummaryReport,
440 ) -> Result<PathBuf, ReportError> {
441 let run_dir = report_dir.join(Self::run_dir_name(run_id));
442 std::fs::create_dir_all(&run_dir)?;
443 let report_path = Self::summary_path(report_dir, run_id);
444 let tmp_path = run_dir.join(format!(
445 "{}.tmp-{}",
446 Self::summary_file_name(),
447 unique_suffix()
448 ));
449
450 let json = serde_json::to_string_pretty(report)?;
451 let mut file = File::create(&tmp_path)?;
452 file.write_all(json.as_bytes())?;
453 file.sync_all()?;
454 std::fs::rename(&tmp_path, &report_path)?;
455
456 Ok(report_path)
457 }
458}
459
460pub fn compute_run_outcome(file_statuses: &[FileStatus]) -> (RunStatus, i32) {
461 if file_statuses.contains(&FileStatus::Failed) {
462 return (RunStatus::Failed, 1);
463 }
464 if file_statuses.contains(&FileStatus::Aborted) {
465 return (RunStatus::Aborted, 2);
466 }
467 if file_statuses.contains(&FileStatus::Rejected) {
468 return (RunStatus::Rejected, 0);
469 }
470 (RunStatus::Success, 0)
471}
472
473fn unique_suffix() -> String {
474 let nanos = SystemTime::now()
475 .duration_since(UNIX_EPOCH)
476 .map(|duration| duration.as_nanos())
477 .unwrap_or(0);
478 format!("{}-{}", std::process::id(), nanos)
479}