1use serde::Serialize;
8
9use crate::config::{Config, ExportConfig, FormatType, SourceType};
10use crate::error::Result;
11use crate::source;
12use crate::types::{
13 ColumnOverrides, TypeFidelity,
14 policy::{PolicyAction, PolicyViolation, TypePolicy},
15 target::{ExportTarget, TargetInput, TargetStatus},
16};
17
18#[derive(Serialize)]
20pub struct TypeReportRow {
21 pub column: String,
22 pub source_type: String,
23 pub rivet_type: String,
24 pub arrow_type: String,
25 pub fidelity: TypeFidelity,
26 #[serde(skip_serializing_if = "Vec::is_empty")]
27 pub warnings: Vec<String>,
28 #[serde(skip_serializing_if = "Option::is_none")]
30 pub target_type: Option<String>,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub target_status: Option<TargetStatus>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub target_note: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
39 pub autoload_type: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none")]
42 pub cast_sql: Option<String>,
43}
44
45#[derive(Serialize)]
47pub struct ExportTypeReport {
48 pub export: String,
49 pub columns: Vec<TypeReportRow>,
50 pub violations: Vec<PolicyViolation>,
51 #[serde(skip_serializing_if = "std::ops::Not::not")]
53 pub target_failures: bool,
54 #[serde(skip_serializing_if = "Option::is_none")]
59 pub recovery_sql: Option<String>,
60}
61
62impl ExportTypeReport {
63 pub fn has_fatal(&self) -> bool {
64 self.violations.iter().any(|v| v.fatal)
65 }
66
67 pub fn has_target_fail(&self) -> bool {
68 self.target_failures
69 }
70}
71
72pub fn collect_report(
74 config: &Config,
75 export: &ExportConfig,
76 column_overrides: &ColumnOverrides,
77 policy: &TypePolicy,
78 target: Option<ExportTarget>,
79 config_dir: &std::path::Path,
80 params: Option<&std::collections::HashMap<String, String>>,
81) -> Result<ExportTypeReport> {
82 let url = config.source.resolve_url()?;
83 let tls = config.source.tls.as_ref();
84 let query = export.resolve_query(config_dir, params)?;
88
89 let mut src: Box<dyn source::Source> = match config.source.source_type {
90 SourceType::Postgres => Box::new(source::postgres::PostgresSource::connect_with_tls(
91 &url, tls,
92 )?),
93 SourceType::Mysql => Box::new(source::mysql::MysqlSource::connect_with_tls(&url, tls)?),
94 SourceType::Mssql => Box::new(source::mssql::MssqlSource::connect_with_tls(&url, tls)?),
95 };
96
97 let mut mappings = src.type_mappings(&query, column_overrides)?;
98
99 if !column_overrides.is_empty() {
110 let source_mappings = src.type_mappings(&query, &ColumnOverrides::new())?;
111 let source_by_name: std::collections::HashMap<&str, &crate::types::RivetType> =
112 source_mappings
113 .iter()
114 .map(|m| (m.column_name.as_str(), &m.rivet_type))
115 .collect();
116 for m in &mut mappings {
117 if !column_overrides.contains_key(&m.column_name) {
118 continue;
119 }
120 if let Some(&src_type) = source_by_name.get(m.column_name.as_str())
121 && let Some(reason) = override_narrows(src_type, &m.rivet_type)
122 {
123 m.fidelity = TypeFidelity::Lossy;
124 m.warnings.push(reason);
125 }
126 }
127 }
128
129 let mut violations = policy.validate(&mappings);
130
131 if export.format == FormatType::Csv {
138 let fatal = policy.on_unsupported_type == PolicyAction::Fail;
139 for m in &mappings {
140 if let Some(dt) = m.arrow_type.as_ref()
141 && !crate::format::csv::csv_serializable(dt)
142 {
143 violations.push(PolicyViolation {
144 column_name: m.column_name.clone(),
145 fidelity: TypeFidelity::Unsupported,
146 message: format!(
147 "column '{}' (Arrow {dt:?}) cannot be serialized to CSV — \
148 use `format: parquet` or drop it from the query",
149 m.column_name
150 ),
151 fatal,
152 });
153 }
154 }
155 }
156
157 let mut target_failures = false;
158 let rows = mappings
159 .iter()
160 .map(|m| {
161 let (target_type, target_status, target_note, autoload_type, cast_sql) =
162 if let Some(tgt) = target {
163 let spec = tgt.resolve_column(TargetInput::from(m));
164 if spec.status == TargetStatus::Fail {
165 target_failures = true;
166 }
167 let autoload =
170 (spec.autoload_type != spec.target_type).then_some(spec.autoload_type);
171 (
172 Some(spec.target_type),
173 Some(spec.status),
174 spec.note,
175 autoload,
176 spec.cast_sql,
177 )
178 } else {
179 (None, None, None, None, None)
180 };
181 TypeReportRow {
182 column: m.column_name.clone(),
183 source_type: m.source_native_type.clone(),
184 rivet_type: rivet_type_label(&m.rivet_type),
185 arrow_type: m
186 .arrow_type
187 .as_ref()
188 .map(|t| format!("{t:?}"))
189 .unwrap_or_else(|| "-".into()),
190 fidelity: m.fidelity,
191 warnings: m.warnings.clone(),
192 target_type,
193 target_status,
194 target_note,
195 autoload_type,
196 cast_sql,
197 }
198 })
199 .collect();
200
201 let recovery_sql =
205 target.and_then(|t| t.recovery_sql(&t.resolve_table(&mappings), &export.name));
206
207 Ok(ExportTypeReport {
208 export: export.name.clone(),
209 columns: rows,
210 violations,
211 target_failures,
212 recovery_sql,
213 })
214}
215
216pub fn print_table(report: &ExportTypeReport, target: Option<ExportTarget>) {
218 let col_w = col_width(&report.columns, |r| r.column.len());
219 let src_w = col_width(&report.columns, |r| r.source_type.len()).max("Source type".len());
220 let rv_w = col_width(&report.columns, |r| r.rivet_type.len()).max("Rivet type".len());
221 let arr_w = col_width(&report.columns, |r| r.arrow_type.len()).max("Arrow type".len());
222 let fid_w = "logical_string".len();
223
224 println!();
225 if let Some(tgt) = target {
226 println!("Export: {} [target: {}]", report.export, tgt.label());
227 } else {
228 println!("Export: {}", report.export);
229 }
230
231 if target.is_some() {
232 let tgt_w = col_width(&report.columns, |r| {
233 r.target_type.as_deref().unwrap_or("-").len()
234 })
235 .max("Target type".len());
236 let sta_w = "Status".len();
237
238 println!(
239 " {:<col_w$} {:<src_w$} {:<rv_w$} {:<arr_w$} {:<fid_w$} {:<tgt_w$} {:<sta_w$}",
240 "Column",
241 "Source type",
242 "Rivet type",
243 "Arrow type",
244 "Fidelity",
245 "Target type",
246 "Status"
247 );
248 println!(
249 " {:-<col_w$} {:-<src_w$} {:-<rv_w$} {:-<arr_w$} {:-<fid_w$} {:-<tgt_w$} {:-<sta_w$}",
250 "", "", "", "", "", "", ""
251 );
252 for row in &report.columns {
253 let status_label = row.target_status.as_ref().map(|s| s.label()).unwrap_or("-");
254 let tgt_type = row.target_type.as_deref().unwrap_or("-");
255 let status_marker = match &row.target_status {
256 Some(TargetStatus::Fail) => " ✗",
257 Some(TargetStatus::Warn) => " ~",
258 _ => "",
259 };
260 println!(
261 " {:<col_w$} {:<src_w$} {:<rv_w$} {:<arr_w$} {}{:<rest$} {:<tgt_w$} {}{}",
262 row.column,
263 row.source_type,
264 row.rivet_type,
265 row.arrow_type,
266 row.fidelity.label(),
267 "",
268 tgt_type,
269 status_label,
270 status_marker,
271 rest = fid_w - row.fidelity.label().len(),
272 );
273 if let Some(autoload) = &row.autoload_type {
274 println!(" {:<col_w$} autoload: {}", "", autoload);
275 }
276 if let Some(note) = &row.target_note {
277 println!(" {:<col_w$} note: {}", "", note);
278 }
279 if let Some(cast) = &row.cast_sql {
280 println!(" {:<col_w$} recover: {}", "", cast);
281 }
282 for w in &row.warnings {
283 println!(" {:<col_w$} warning: {}", "", w);
284 }
285 }
286 } else {
287 println!(
288 " {:<col_w$} {:<src_w$} {:<rv_w$} {:<arr_w$} {:<fid_w$}",
289 "Column", "Source type", "Rivet type", "Arrow type", "Fidelity"
290 );
291 println!(
292 " {:-<col_w$} {:-<src_w$} {:-<rv_w$} {:-<arr_w$} {:-<fid_w$}",
293 "", "", "", "", ""
294 );
295 for row in &report.columns {
296 println!(
297 " {:<col_w$} {:<src_w$} {:<rv_w$} {:<arr_w$} {}{}",
298 row.column,
299 row.source_type,
300 row.rivet_type,
301 row.arrow_type,
302 row.fidelity.label(),
303 fidelity_marker(row.fidelity),
304 );
305 for w in &row.warnings {
306 println!(" {:<col_w$} warning: {}", "", w);
307 }
308 }
309 }
310
311 if !report.violations.is_empty() {
312 println!();
313 for v in &report.violations {
314 let prefix = if v.fatal { " FAIL" } else { " WARN" };
315 println!("{}: {}", prefix, v.message);
316 }
317 }
318
319 if let Some(sql) = &report.recovery_sql {
320 println!();
321 println!(
322 " {} type recovery — bare autoload degrades JSON/UUID→BYTES, naive",
323 target.map(|t| t.label()).unwrap_or("target")
324 );
325 println!(" timestamp→TIMESTAMP, array→RECORD; load with --autodetect then run:");
326 for line in sql.lines() {
327 println!(" {line}");
328 }
329 }
330}
331
332pub fn print_json(report: &ExportTypeReport) -> Result<()> {
334 let s = serde_json::to_string(report)?;
335 println!("{}", s);
336 Ok(())
337}
338
339fn col_width(rows: &[TypeReportRow], f: impl Fn(&TypeReportRow) -> usize) -> usize {
340 rows.iter().map(f).max().unwrap_or(8).max(8)
341}
342
343fn fidelity_marker(f: TypeFidelity) -> &'static str {
344 match f {
345 TypeFidelity::Lossy | TypeFidelity::Unsupported => " ✗",
346 TypeFidelity::LogicalString => " ~",
347 _ => "",
348 }
349}
350
351fn override_narrows(
364 source: &crate::types::RivetType,
365 overridden: &crate::types::RivetType,
366) -> Option<String> {
367 use crate::types::RivetType::Decimal;
368 if let (
369 Decimal {
370 precision: sp,
371 scale: ss,
372 },
373 Decimal {
374 precision: op,
375 scale: os,
376 },
377 ) = (source, overridden)
378 {
379 if os < ss {
382 return Some(format!(
383 "override decimal({op},{os}) reduces scale from source numeric({sp},{ss}) — \
384 {} fractional digit(s) are truncated at run; this is lossy, not exact",
385 (*ss as i16) - (*os as i16)
386 ));
387 }
388 let src_int_digits = *sp as i16 - *ss as i16;
391 let ov_int_digits = *op as i16 - *os as i16;
392 if ov_int_digits < src_int_digits {
393 return Some(format!(
394 "override decimal({op},{os}) reduces integer-digit capacity from source \
395 numeric({sp},{ss}) — large values overflow at run; this is lossy, not exact"
396 ));
397 }
398 }
399 None
400}
401
402fn rivet_type_label(t: &crate::types::RivetType) -> String {
403 use crate::types::RivetType::*;
404 match t {
405 Bool => "bool".into(),
406 Int16 => "int2".into(),
407 Int32 => "int4".into(),
408 Int64 => "int8".into(),
409 UInt64 => "uint8".into(),
410 Float32 => "float4".into(),
411 Float64 => "float8".into(),
412 Decimal { precision, scale } => format!("decimal({precision},{scale})"),
413 Date => "date".into(),
414 Time { .. } => "time".into(),
415 Timestamp {
416 timezone: Some(_), ..
417 } => "timestamp_tz".into(),
418 Timestamp { timezone: None, .. } => "timestamp".into(),
419 String => "text".into(),
420 Text => "text".into(),
421 Binary => "binary".into(),
422 Json => "json".into(),
423 Uuid => "uuid".into(),
424 Enum => "enum".into(),
425 Interval => "interval".into(),
426 List { inner } => format!("list<{}>", rivet_type_label(inner)),
427 Unsupported { native_type, .. } => format!("unsupported({native_type})"),
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use crate::types::{RivetType, TypeFidelity};
435
436 fn dec(precision: u8, scale: i8) -> RivetType {
439 RivetType::Decimal { precision, scale }
440 }
441
442 #[test]
443 fn narrows_flags_scale_reduction_as_lossy() {
444 let reason = override_narrows(&dec(10, 2), &dec(20, 0)).expect("scale drop is lossy");
447 assert!(
448 reason.contains("scale"),
449 "reason should name scale: {reason}"
450 );
451 assert!(
452 reason.contains("lossy"),
453 "reason should say lossy: {reason}"
454 );
455 }
456
457 #[test]
458 fn narrows_none_when_scale_preserved() {
459 assert!(override_narrows(&dec(10, 2), &dec(20, 2)).is_none());
461 assert!(override_narrows(&dec(10, 2), &dec(10, 2)).is_none());
463 }
464
465 #[test]
466 fn narrows_none_when_scale_widened() {
467 assert!(override_narrows(&dec(10, 2), &dec(12, 4)).is_none());
469 }
470
471 #[test]
472 fn narrows_flags_integer_digit_reduction_as_lossy() {
473 let reason =
476 override_narrows(&dec(20, 0), &dec(10, 0)).expect("integer-digit drop is lossy");
477 assert!(
478 reason.contains("integer-digit") && reason.contains("lossy"),
479 "reason: {reason}"
480 );
481 }
482
483 #[test]
484 fn narrows_none_for_non_decimal_overrides() {
485 assert!(override_narrows(&RivetType::Int32, &RivetType::Int64).is_none());
487 assert!(override_narrows(&RivetType::Int64, &RivetType::String).is_none());
488 }
489
490 #[test]
493 fn fidelity_marker_lossy_is_cross() {
494 assert_eq!(fidelity_marker(TypeFidelity::Lossy), " ✗");
495 }
496
497 #[test]
498 fn fidelity_marker_unsupported_is_cross() {
499 assert_eq!(fidelity_marker(TypeFidelity::Unsupported), " ✗");
500 }
501
502 #[test]
503 fn fidelity_marker_logical_string_is_tilde() {
504 assert_eq!(fidelity_marker(TypeFidelity::LogicalString), " ~");
505 }
506
507 #[test]
508 fn fidelity_marker_exact_is_empty() {
509 assert_eq!(fidelity_marker(TypeFidelity::Exact), "");
510 }
511
512 #[test]
513 fn fidelity_marker_compatible_is_empty() {
514 assert_eq!(fidelity_marker(TypeFidelity::Compatible), "");
515 }
516
517 #[test]
520 fn label_bool() {
521 assert_eq!(rivet_type_label(&RivetType::Bool), "bool");
522 }
523
524 #[test]
525 fn label_int64() {
526 assert_eq!(rivet_type_label(&RivetType::Int64), "int8");
527 }
528
529 #[test]
530 fn label_float64() {
531 assert_eq!(rivet_type_label(&RivetType::Float64), "float8");
532 }
533
534 #[test]
535 fn label_decimal_with_precision_and_scale() {
536 assert_eq!(
537 rivet_type_label(&RivetType::Decimal {
538 precision: 18,
539 scale: 2
540 }),
541 "decimal(18,2)"
542 );
543 }
544
545 #[test]
546 fn label_text() {
547 assert_eq!(rivet_type_label(&RivetType::Text), "text");
548 }
549
550 #[test]
551 fn label_uuid() {
552 assert_eq!(rivet_type_label(&RivetType::Uuid), "uuid");
553 }
554
555 #[test]
556 fn label_list_of_int64() {
557 let t = RivetType::List {
558 inner: Box::new(RivetType::Int64),
559 };
560 assert_eq!(rivet_type_label(&t), "list<int8>");
561 }
562
563 #[test]
564 fn label_unsupported_native_type() {
565 let t = RivetType::Unsupported {
566 native_type: "tsvector".into(),
567 reason: "not supported".into(),
568 };
569 assert_eq!(rivet_type_label(&t), "unsupported(tsvector)");
570 }
571
572 #[test]
575 fn col_width_empty_returns_minimum_8() {
576 let rows: Vec<TypeReportRow> = vec![];
577 assert_eq!(col_width(&rows, |_r| 0), 8);
578 }
579
580 #[test]
581 fn col_width_short_values_returns_minimum_8() {
582 let row = TypeReportRow {
583 column: "a".into(),
584 source_type: "b".into(),
585 rivet_type: "c".into(),
586 arrow_type: "d".into(),
587 fidelity: TypeFidelity::Exact,
588 warnings: vec![],
589 target_type: None,
590 target_status: None,
591 target_note: None,
592 autoload_type: None,
593 cast_sql: None,
594 };
595 assert_eq!(col_width(&[row], |r| r.column.len()), 8);
596 }
597
598 #[test]
599 fn col_width_long_value_returns_that_length() {
600 let row = TypeReportRow {
601 column: "a_very_long_column_name".into(),
602 source_type: "int8".into(),
603 rivet_type: "int8".into(),
604 arrow_type: "Int64".into(),
605 fidelity: TypeFidelity::Exact,
606 warnings: vec![],
607 target_type: None,
608 target_status: None,
609 target_note: None,
610 autoload_type: None,
611 cast_sql: None,
612 };
613 let w = col_width(&[row], |r| r.column.len());
614 assert_eq!(w, "a_very_long_column_name".len());
615 }
616}