1use arrow::datatypes::DataType;
32use serde::Serialize;
33
34use super::{RivetType, TimeUnit, TypeFidelity, TypeMapping};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ExportTarget {
40 DuckDb,
43 BigQuery,
46 Snowflake,
49}
50
51impl ExportTarget {
52 pub fn parse(s: &str) -> Option<Self> {
53 match s.to_lowercase().as_str() {
54 "bigquery" | "bq" => Some(Self::BigQuery),
55 "duckdb" | "duck" => Some(Self::DuckDb),
56 "snowflake" | "sf" => Some(Self::Snowflake),
57 _ => None,
58 }
59 }
60
61 pub fn valid_target_names() -> &'static str {
66 "bigquery (bq), duckdb (duck), snowflake (sf)"
67 }
68
69 pub fn label(self) -> &'static str {
70 match self {
71 Self::BigQuery => "bigquery",
72 Self::DuckDb => "duckdb",
73 Self::Snowflake => "snowflake",
74 }
75 }
76
77 pub fn resolve_column(self, input: TargetInput<'_>) -> TargetColumnSpec {
79 let mut spec = match self {
80 ExportTarget::BigQuery => bigquery::resolve(&input),
81 ExportTarget::DuckDb => duckdb::resolve(&input),
82 ExportTarget::Snowflake => snowflake::resolve(&input),
83 };
84 if input.fidelity.is_unsafe_for_strict_mode() && spec.status == TargetStatus::Ok {
88 spec.status = TargetStatus::Warn;
89 }
90 spec
91 }
92
93 pub fn resolve_table(self, mappings: &[TypeMapping]) -> Vec<TargetColumnSpec> {
98 mappings
99 .iter()
100 .map(|m| self.resolve_column(TargetInput::from(m)))
101 .collect()
102 }
103
104 pub fn recovery_sql(self, specs: &[TargetColumnSpec], table: &str) -> Option<String> {
112 match self {
113 ExportTarget::BigQuery => Some(bigquery_recovery_sql(specs, table)),
114 ExportTarget::Snowflake => Some(snowflake_recovery_sql(specs, table)),
115 ExportTarget::DuckDb => None,
116 }
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
122#[serde(rename_all = "snake_case")]
123pub enum TargetStatus {
124 Ok,
125 Warn,
126 Fail,
127}
128
129impl TargetStatus {
130 pub fn label(&self) -> &'static str {
131 match self {
132 Self::Ok => "ok",
133 Self::Warn => "warn",
134 Self::Fail => "fail",
135 }
136 }
137}
138
139#[derive(Debug, Clone, Copy)]
144pub struct TargetInput<'a> {
145 pub column_name: &'a str,
146 pub rivet_type: &'a RivetType,
147 #[allow(dead_code)]
150 pub arrow_type: Option<&'a DataType>,
151 pub fidelity: TypeFidelity,
152}
153
154impl<'a> From<&'a TypeMapping> for TargetInput<'a> {
155 fn from(m: &'a TypeMapping) -> Self {
156 TargetInput {
157 column_name: &m.column_name,
158 rivet_type: &m.rivet_type,
159 arrow_type: m.arrow_type.as_ref(),
160 fidelity: m.fidelity,
161 }
162 }
163}
164
165#[derive(Debug, Clone, Serialize)]
169pub struct TargetColumnSpec {
170 pub column_name: String,
172 pub target_type: String,
174 pub autoload_type: String,
177 pub status: TargetStatus,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub note: Option<String>,
180 #[serde(skip_serializing_if = "Option::is_none")]
183 pub cast_sql: Option<String>,
184}
185
186struct Resolved {
189 target_type: String,
190 autoload_type: String,
191 status: TargetStatus,
192 note: Option<String>,
193 cast: Option<String>,
195}
196
197impl Resolved {
198 fn ok(t: impl Into<String>) -> Self {
199 let t = t.into();
200 Self {
201 autoload_type: t.clone(),
202 target_type: t,
203 status: TargetStatus::Ok,
204 note: None,
205 cast: None,
206 }
207 }
208 fn diverge(
211 native: impl Into<String>,
212 autoload: impl Into<String>,
213 note: impl Into<String>,
214 cast: Option<&str>,
215 ) -> Self {
216 Self {
217 target_type: native.into(),
218 autoload_type: autoload.into(),
219 status: TargetStatus::Warn,
220 note: Some(note.into()),
221 cast: cast.map(str::to_string),
222 }
223 }
224 fn warn(t: impl Into<String>, note: impl Into<String>) -> Self {
225 let t = t.into();
226 Self {
227 autoload_type: t.clone(),
228 target_type: t,
229 status: TargetStatus::Warn,
230 note: Some(note.into()),
231 cast: None,
232 }
233 }
234 fn fail(note: impl Into<String>) -> Self {
235 Self {
236 target_type: "-".into(),
237 autoload_type: "-".into(),
238 status: TargetStatus::Fail,
239 note: Some(note.into()),
240 cast: None,
241 }
242 }
243 fn into_spec(self, input: &TargetInput<'_>) -> TargetColumnSpec {
244 TargetColumnSpec {
245 column_name: input.column_name.to_string(),
246 target_type: self.target_type,
247 autoload_type: self.autoload_type,
248 status: self.status,
249 note: self.note,
250 cast_sql: self.cast.map(|t| t.replace("{col}", input.column_name)),
251 }
252 }
253}
254
255fn unsupported_reason(t: &RivetType) -> String {
256 match t {
257 RivetType::Unsupported { reason, .. } => reason.clone(),
258 _ => "no target mapping".into(),
259 }
260}
261
262fn recovery_projection(specs: &[TargetColumnSpec], passthrough: impl Fn(&str) -> String) -> String {
277 specs
278 .iter()
279 .map(|s| match &s.cast_sql {
280 Some(cast) => format!(" {cast} AS {name}", name = s.column_name),
281 None => passthrough(&s.column_name),
282 })
283 .collect::<Vec<_>>()
284 .join(",\n")
285}
286
287fn bigquery_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
288 let cols = recovery_projection(specs, |name| format!(" {name}"));
289 format!(
290 "-- 1) bq load --autodetect --parquet_enable_list_inference \
291 --source_format=PARQUET {table}__staging <parquet>\n\
292 -- 2) recover native types:\n\
293 CREATE OR REPLACE TABLE `{table}` AS\n\
294 SELECT\n{cols}\n\
295 FROM `{table}__staging`;"
296 )
297}
298
299fn snowflake_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
305 let cols = recovery_projection(specs, |name| format!(" \"{name}\" AS {name}"));
306 format!(
307 "-- 1) ALTER SESSION SET TIMEZONE='UTC';\n\
308 -- 2) CREATE OR REPLACE FILE FORMAT rivet_pq TYPE=PARQUET BINARY_AS_TEXT=FALSE;\n\
309 -- 3) PUT file://<parquet> @<stage> AUTO_COMPRESS=FALSE;\n\
310 -- 4) CREATE OR REPLACE TABLE {table}__staging USING TEMPLATE (SELECT ARRAY_AGG(\n\
311 -- OBJECT_CONSTRUCT(*)) FROM TABLE(INFER_SCHEMA(LOCATION=>'@<stage>', FILE_FORMAT=>'rivet_pq')));\n\
312 -- COPY INTO {table}__staging FROM @<stage> FILE_FORMAT=(FORMAT_NAME='rivet_pq') MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;\n\
313 -- 5) recover native types:\n\
314 CREATE OR REPLACE TABLE {table} AS\n\
315 SELECT\n{cols}\n\
316 FROM {table}__staging;"
317 )
318}
319
320mod bigquery {
323 use super::*;
324
325 const NUMERIC_MAX_P: u8 = 29;
327 const NUMERIC_MAX_S: i8 = 9;
328 const BIGNUMERIC_MAX_P: u8 = 76;
330 const BIGNUMERIC_MAX_S: i8 = 38;
331
332 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
333 native(input.rivet_type).into_spec(input)
334 }
335
336 fn native(t: &RivetType) -> Resolved {
337 match t {
338 RivetType::Bool => Resolved::ok("BOOL"),
339 RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("INT64"),
340 RivetType::UInt64 => Resolved::diverge(
345 "NUMERIC",
346 "INT64",
347 "UINT64 > INT64_MAX overflows the INT64 autoload and cannot be recovered after \
348 load — map the column to decimal(20,0) with a source column override",
349 None,
350 ),
351 RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT64"),
352 RivetType::Decimal { precision, scale } => decimal(*precision, *scale),
353 RivetType::Date => Resolved::ok("DATE"),
354 RivetType::Time { .. } => Resolved::ok("TIME"),
355 RivetType::Timestamp {
357 timezone: Some(_), ..
358 } => Resolved::ok("TIMESTAMP"),
359 RivetType::Timestamp {
368 unit: TimeUnit::Nanosecond,
369 timezone: None,
370 } => Resolved::diverge(
371 "INT64",
372 "INT64",
373 "nanosecond timestamp has no BigQuery native type — autoloads as INT64 (raw \
374 nanos, lossless); a native TIMESTAMP via TIMESTAMP_MICROS(DIV(col,1000)) drops \
375 sub-µs precision. Prefer `timestamp` (microsecond) for BigQuery targets.",
376 None,
377 ),
378 RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
382 "DATETIME",
383 "TIMESTAMP",
384 "naive timestamp autoloads as TIMESTAMP (an instant); recover wall-clock with \
385 DATETIME(col) after load",
386 Some("DATETIME({col})"),
387 ),
388 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("STRING"),
389 RivetType::Binary => Resolved::ok("BYTES"),
390 RivetType::Json => Resolved::diverge(
393 "JSON",
394 "BYTES",
395 "Parquet JSON logical type autoloads as BYTES in BigQuery; recover native JSON \
396 with PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(col)) after load",
397 Some("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING({col}))"),
398 ),
399 RivetType::Uuid => Resolved::diverge(
403 "STRING",
404 "BYTES",
405 "UUID autoloads as 16-byte BYTES in BigQuery; recover hex text with TO_HEX(col) \
406 after load (or keep BYTES)",
407 Some("TO_HEX({col})"),
408 ),
409 RivetType::Interval => Resolved::ok("STRING"),
410 RivetType::List { inner } => list(inner),
411 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
412 }
413 }
414
415 fn decimal(p: u8, s: i8) -> Resolved {
416 if s < 0 {
417 return Resolved::fail(format!(
418 "BigQuery has no negative scale; decimal({p},{s}) needs a STRING/INT64 cast"
419 ));
420 }
421 let native = if p <= NUMERIC_MAX_P && s <= NUMERIC_MAX_S {
422 "NUMERIC"
423 } else if p <= BIGNUMERIC_MAX_P && s <= BIGNUMERIC_MAX_S {
424 "BIGNUMERIC"
425 } else {
426 return Resolved::fail(format!(
427 "decimal({p},{s}) exceeds BigQuery BIGNUMERIC limits (max 76,38)"
428 ));
429 };
430 Resolved::ok(native)
431 }
432
433 fn list(inner: &RivetType) -> Resolved {
434 let inner_r = native(inner);
435 if inner_r.status == TargetStatus::Fail {
436 return Resolved::fail(format!(
437 "REPEATED of unsupported element: {}",
438 inner_r.target_type
439 ));
440 }
441 Resolved::diverge(
448 format!("REPEATED {}", inner_r.target_type),
449 format!("REPEATED RECORD{{item {}}}", inner_r.autoload_type),
450 "arrays load as REPEATED RECORD{item}; load the staging table with \
451 --parquet_enable_list_inference, then flatten with UNNEST after load",
452 Some("ARRAY(SELECT el.item FROM UNNEST({col}) AS el)"),
453 )
454 }
455}
456
457mod duckdb {
460 use super::*;
461
462 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
463 native(input.rivet_type).into_spec(input)
464 }
465
466 fn native(t: &RivetType) -> Resolved {
469 match t {
470 RivetType::Bool => Resolved::ok("BOOLEAN"),
471 RivetType::Int16 => Resolved::ok("SMALLINT"),
472 RivetType::Int32 => Resolved::ok("INTEGER"),
473 RivetType::Int64 => Resolved::ok("BIGINT"),
474 RivetType::UInt64 => Resolved::ok("UBIGINT"),
475 RivetType::Float32 => Resolved::ok("FLOAT"),
476 RivetType::Float64 => Resolved::ok("DOUBLE"),
477 RivetType::Decimal { precision, scale } => {
478 if *scale < 0 {
479 Resolved::warn(
480 "DECIMAL",
481 format!(
482 "DuckDB has no negative scale; decimal({precision},{scale}) loads via cast"
483 ),
484 )
485 } else if *precision <= 38 {
486 Resolved::ok(format!("DECIMAL({precision},{scale})"))
487 } else {
488 Resolved::warn(
490 "DECIMAL(38,*)",
491 format!("decimal({precision},{scale}) exceeds DuckDB DECIMAL(38); widens"),
492 )
493 }
494 }
495 RivetType::Date => Resolved::ok("DATE"),
496 RivetType::Time { .. } => Resolved::ok("TIME"),
497 RivetType::Timestamp {
498 timezone: Some(_), ..
499 } => Resolved::ok("TIMESTAMPTZ"),
500 RivetType::Timestamp {
503 unit: TimeUnit::Nanosecond,
504 timezone: None,
505 } => Resolved::ok("TIMESTAMP_NS"),
506 RivetType::Timestamp { timezone: None, .. } => Resolved::ok("TIMESTAMP"),
507 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("VARCHAR"),
508 RivetType::Binary => Resolved::ok("BLOB"),
509 RivetType::Json => Resolved::ok("JSON"),
510 RivetType::Uuid => Resolved::ok("UUID"),
511 RivetType::Interval => Resolved::ok("INTERVAL"),
512 RivetType::List { inner } => {
513 let inner_r = native(inner);
514 if inner_r.status == TargetStatus::Fail {
515 Resolved::fail(format!(
516 "LIST of unsupported element: {}",
517 inner_r.target_type
518 ))
519 } else {
520 Resolved::ok(format!("{}[]", inner_r.target_type))
521 }
522 }
523 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
524 }
525 }
526}
527
528mod snowflake {
531 use super::*;
532
533 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
534 native(input.rivet_type).into_spec(input)
535 }
536
537 fn native(t: &RivetType) -> Resolved {
542 match t {
543 RivetType::Bool => Resolved::ok("BOOLEAN"),
544 RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("NUMBER(38,0)"),
545 RivetType::UInt64 => Resolved::diverge(
547 "NUMBER(20,0)",
548 "NUMBER(38,0)",
549 "UINT64 > INT64_MAX overflows the Parquet read; map to decimal(20,0) at source",
550 None,
551 ),
552 RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT"),
553 RivetType::Decimal { precision, scale } => {
554 if *scale < 0 {
555 Resolved::warn(
556 "NUMBER",
557 format!(
558 "Snowflake NUMBER has no negative scale; decimal({precision},{scale}) loads via cast"
559 ),
560 )
561 } else {
562 Resolved::ok(format!("NUMBER({precision},{scale})"))
563 }
564 }
565 RivetType::Date => Resolved::ok("DATE"),
566 RivetType::Time { .. } => Resolved::diverge(
568 "TIME",
569 "NUMBER(38,0)",
570 "TIME autoloads as NUMBER (µs of day); recover with TIME_FROM_PARTS after load",
571 Some(r#"TIME_FROM_PARTS(0,0,FLOOR("{col}"/1000000),MOD("{col}",1000000)*1000)"#),
572 ),
573 RivetType::Timestamp {
575 timezone: Some(_), ..
576 } => Resolved::diverge(
577 "TIMESTAMP_TZ",
578 "TIMESTAMP_NTZ",
579 "tz timestamp autoloads as TIMESTAMP_NTZ — ALTER SESSION SET TIMEZONE='UTC' before COPY so the instant matches",
580 None,
581 ),
582 RivetType::Timestamp {
588 unit: TimeUnit::Nanosecond,
589 timezone: None,
590 } => Resolved::diverge(
591 "TIMESTAMP_NTZ",
592 "NUMBER(38,0)",
593 "nanosecond timestamp autoloads as NUMBER (ns since epoch); recover with \
594 TO_TIMESTAMP_NTZ(col, 9) after load — Snowflake TIMESTAMP_NTZ holds full ns precision",
595 Some(r#"TO_TIMESTAMP_NTZ("{col}", 9)"#),
596 ),
597 RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
599 "TIMESTAMP_NTZ",
600 "NUMBER(38,0)",
601 "naive timestamp autoloads as NUMBER (µs since epoch); recover with TO_TIMESTAMP_NTZ after load",
602 Some(r#"TO_TIMESTAMP_NTZ("{col}", 6)"#),
603 ),
604 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("TEXT"),
605 RivetType::Binary => Resolved::warn(
607 "BINARY",
608 "set BINARY_AS_TEXT=FALSE in the Parquet FILE FORMAT or non-UTF8 bytes fail to load",
609 ),
610 RivetType::Json => Resolved::diverge(
612 "VARIANT",
613 "TEXT",
614 "JSON autoloads as TEXT; recover native VARIANT with PARSE_JSON after load",
615 Some(r#"PARSE_JSON("{col}")"#),
616 ),
617 RivetType::Uuid => Resolved::diverge(
619 "TEXT",
620 "BINARY",
621 "UUID autoloads as 16-byte BINARY; recover canonical text with HEX_ENCODE + REGEXP after load",
622 Some(
623 r#"REGEXP_REPLACE(LOWER(HEX_ENCODE("{col}")),'^(.{8})(.{4})(.{4})(.{4})(.{12})$','\\1-\\2-\\3-\\4-\\5')"#,
624 ),
625 ),
626 RivetType::Interval => Resolved::ok("TEXT"),
627 RivetType::List { inner } => {
632 let inner_r = native(inner);
633 if inner_r.status == TargetStatus::Fail {
634 Resolved::fail(format!(
635 "ARRAY of unsupported element: {}",
636 inner_r.target_type
637 ))
638 } else {
639 Resolved::diverge(
640 "ARRAY",
641 "VARIANT",
642 "list autoloads as VARIANT (the JSON array); recover native ARRAY with ::ARRAY after load",
643 Some(r#""{col}"::ARRAY"#),
644 )
645 }
646 }
647 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
648 }
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655
656 fn input<'a>(rt: &'a RivetType) -> TargetInput<'a> {
657 TargetInput {
658 column_name: "c",
659 rivet_type: rt,
660 arrow_type: None,
661 fidelity: TypeFidelity::Exact,
662 }
663 }
664
665 fn bq(rt: &RivetType) -> TargetColumnSpec {
666 ExportTarget::BigQuery.resolve_column(input(rt))
667 }
668 fn duck(rt: &RivetType) -> TargetColumnSpec {
669 ExportTarget::DuckDb.resolve_column(input(rt))
670 }
671 fn sf(rt: &RivetType) -> TargetColumnSpec {
672 ExportTarget::Snowflake.resolve_column(input(rt))
673 }
674
675 #[test]
681 fn bq_nanosecond_timestamp_autoloads_as_int64() {
682 let ns = RivetType::Timestamp {
683 unit: super::super::TimeUnit::Nanosecond,
684 timezone: None,
685 };
686 let s = bq(&ns);
687 assert_eq!(s.target_type, "INT64");
688 assert_eq!(s.autoload_type, "INT64");
689 assert_eq!(s.status, TargetStatus::Warn);
690 assert!(s.cast_sql.is_none(), "ns→BQ has no lossless temporal cast");
692 }
693
694 #[test]
695 fn duckdb_nanosecond_timestamp_is_native_timestamp_ns() {
696 let ns = RivetType::Timestamp {
697 unit: super::super::TimeUnit::Nanosecond,
698 timezone: None,
699 };
700 let s = duck(&ns);
701 assert_eq!(s.target_type, "TIMESTAMP_NS");
702 assert_eq!(s.status, TargetStatus::Ok);
703 }
704
705 #[test]
706 fn snowflake_nanosecond_timestamp_recovers_losslessly_at_scale_9() {
707 let ns = RivetType::Timestamp {
708 unit: super::super::TimeUnit::Nanosecond,
709 timezone: None,
710 };
711 let s = sf(&ns);
712 assert_eq!(s.target_type, "TIMESTAMP_NTZ");
713 assert_eq!(s.autoload_type, "NUMBER(38,0)");
714 assert_eq!(s.cast_sql.as_deref(), Some(r#"TO_TIMESTAMP_NTZ("c", 9)"#));
717 }
718
719 #[test]
722 fn bq_uuid_resolves_not_fails() {
723 let s = bq(&RivetType::Uuid);
727 assert_eq!(s.target_type, "STRING");
728 assert_eq!(s.autoload_type, "BYTES");
729 assert_eq!(s.status, TargetStatus::Warn);
730 assert!(s.cast_sql.unwrap().contains("c"));
731 }
732
733 #[test]
734 fn bq_json_native_is_json_autoload_is_bytes() {
735 let s = bq(&RivetType::Json);
736 assert_eq!(s.target_type, "JSON");
737 assert_eq!(s.autoload_type, "BYTES");
738 assert_eq!(s.status, TargetStatus::Warn);
739 assert!(s.cast_sql.unwrap().starts_with("PARSE_JSON"));
740 }
741
742 #[test]
743 fn bq_naive_timestamp_is_datetime_native_timestamp_autoload() {
744 let naive = RivetType::Timestamp {
745 unit: super::super::TimeUnit::Microsecond,
746 timezone: None,
747 };
748 let s = bq(&naive);
749 assert_eq!(s.target_type, "DATETIME");
750 assert_eq!(s.autoload_type, "TIMESTAMP");
751 assert_eq!(s.status, TargetStatus::Warn);
752 }
753
754 #[test]
755 fn bq_tz_timestamp_is_timestamp_ok() {
756 let tz = RivetType::Timestamp {
757 unit: super::super::TimeUnit::Microsecond,
758 timezone: Some("UTC".into()),
759 };
760 let s = bq(&tz);
761 assert_eq!(s.target_type, "TIMESTAMP");
762 assert_eq!(s.autoload_type, "TIMESTAMP");
763 assert_eq!(s.status, TargetStatus::Ok);
764 }
765
766 #[test]
767 fn bq_decimal_within_numeric_is_numeric() {
768 let s = bq(&RivetType::Decimal {
769 precision: 18,
770 scale: 2,
771 });
772 assert_eq!(s.target_type, "NUMERIC");
773 assert_eq!(s.status, TargetStatus::Ok);
774 }
775
776 #[test]
777 fn bq_decimal_escalates_to_bignumeric() {
778 let s = bq(&RivetType::Decimal {
779 precision: 38,
780 scale: 9,
781 });
782 assert_eq!(s.target_type, "BIGNUMERIC");
783 assert_eq!(s.status, TargetStatus::Ok);
784 }
785
786 #[test]
787 fn bq_decimal_negative_scale_fails() {
788 let s = bq(&RivetType::Decimal {
789 precision: 5,
790 scale: -2,
791 });
792 assert_eq!(s.status, TargetStatus::Fail);
793 }
794
795 #[test]
796 fn bq_uint64_recommends_numeric_warns_overflow() {
797 let s = bq(&RivetType::UInt64);
798 assert_eq!(s.target_type, "NUMERIC");
799 assert_eq!(s.autoload_type, "INT64");
800 assert_eq!(s.status, TargetStatus::Warn);
801 }
802
803 #[test]
804 fn bq_list_is_repeated_native_record_autoload() {
805 let t = RivetType::List {
806 inner: Box::new(RivetType::String),
807 };
808 let s = bq(&t);
809 assert_eq!(s.target_type, "REPEATED STRING");
810 assert!(s.autoload_type.contains("REPEATED RECORD"));
811 assert_eq!(s.status, TargetStatus::Warn);
812 }
813
814 #[test]
815 fn bq_unsupported_is_fail_row_not_panic() {
816 let t = RivetType::Unsupported {
817 native_type: "geometry".into(),
818 reason: "no mapping".into(),
819 };
820 let s = bq(&t);
821 assert_eq!(s.status, TargetStatus::Fail);
822 assert_eq!(s.target_type, "-");
823 }
824
825 #[test]
826 fn bq_standard_scalars_ok() {
827 for (rt, native) in [
828 (RivetType::Bool, "BOOL"),
829 (RivetType::Int64, "INT64"),
830 (RivetType::Float64, "FLOAT64"),
831 (RivetType::Date, "DATE"),
832 (RivetType::String, "STRING"),
833 (RivetType::Binary, "BYTES"),
834 (RivetType::Enum, "STRING"),
835 ] {
836 let s = bq(&rt);
837 assert_eq!(s.target_type, native, "{rt:?}");
838 assert_eq!(s.autoload_type, native, "{rt:?}");
839 assert_eq!(s.status, TargetStatus::Ok, "{rt:?}");
840 }
841 }
842
843 #[test]
846 fn duckdb_reads_everything_natively() {
847 let naive = RivetType::Timestamp {
848 unit: super::super::TimeUnit::Microsecond,
849 timezone: None,
850 };
851 for rt in [
852 RivetType::Json,
853 RivetType::Uuid,
854 RivetType::UInt64,
855 naive,
856 RivetType::List {
857 inner: Box::new(RivetType::Int64),
858 },
859 ] {
860 let s = duck(&rt);
861 assert_eq!(
862 s.target_type, s.autoload_type,
863 "DuckDB autoload must equal native for {rt:?}"
864 );
865 assert_ne!(s.status, TargetStatus::Fail, "{rt:?}");
866 }
867 }
868
869 #[test]
870 fn duckdb_native_type_names() {
871 assert_eq!(duck(&RivetType::Json).target_type, "JSON");
872 assert_eq!(duck(&RivetType::Uuid).target_type, "UUID");
873 assert_eq!(duck(&RivetType::UInt64).target_type, "UBIGINT");
874 assert_eq!(
875 duck(&RivetType::Decimal {
876 precision: 18,
877 scale: 2
878 })
879 .target_type,
880 "DECIMAL(18,2)"
881 );
882 assert_eq!(
883 duck(&RivetType::List {
884 inner: Box::new(RivetType::Int64)
885 })
886 .target_type,
887 "BIGINT[]"
888 );
889 }
890
891 #[test]
892 fn parse_accepts_aliases() {
893 assert_eq!(ExportTarget::parse("bq"), Some(ExportTarget::BigQuery));
894 assert_eq!(
895 ExportTarget::parse("BigQuery"),
896 Some(ExportTarget::BigQuery)
897 );
898 assert_eq!(ExportTarget::parse("duckdb"), Some(ExportTarget::DuckDb));
899 assert_eq!(ExportTarget::parse("nope"), None);
900 }
901
902 #[test]
903 fn resolve_table_preserves_order_and_names() {
904 use super::super::SourceColumn;
905 let mappings = vec![
906 TypeMapping::from_source(&SourceColumn::simple("a", "int8", true), RivetType::Int64),
907 TypeMapping::from_source(&SourceColumn::simple("b", "jsonb", true), RivetType::Json),
908 ];
909 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
910 assert_eq!(specs.len(), 2);
911 assert_eq!(specs[0].column_name, "a");
912 assert_eq!(specs[1].column_name, "b");
913 assert_eq!(specs[1].target_type, "JSON");
914 }
915
916 #[test]
923 fn cast_sql_is_none_when_post_load_recovery_is_impossible() {
924 let u = bq(&RivetType::UInt64);
929 assert!(
930 u.cast_sql.is_none(),
931 "overflowed UINT64 has no lossless post-load recovery"
932 );
933 let note = u.note.unwrap().to_lowercase();
934 assert!(
935 note.contains("override"),
936 "UINT64 note must point to the source-side override, got: {note}"
937 );
938 }
939
940 #[test]
941 fn cast_sql_present_only_when_lossless_post_load() {
942 assert!(
945 bq(&RivetType::Json)
946 .cast_sql
947 .unwrap()
948 .contains("PARSE_JSON")
949 );
950 assert!(bq(&RivetType::Uuid).cast_sql.unwrap().contains("TO_HEX"));
951 let naive = RivetType::Timestamp {
952 unit: super::super::TimeUnit::Microsecond,
953 timezone: None,
954 };
955 assert!(bq(&naive).cast_sql.unwrap().contains("DATETIME"));
956 }
957
958 #[test]
959 fn every_divergence_offers_a_recovery_path() {
960 let naive = RivetType::Timestamp {
965 unit: super::super::TimeUnit::Microsecond,
966 timezone: None,
967 };
968 let cases = [
969 RivetType::Json,
970 RivetType::Uuid,
971 RivetType::UInt64,
972 naive,
973 RivetType::List {
974 inner: Box::new(RivetType::String),
975 },
976 ];
977 for rt in cases {
978 let s = bq(&rt);
979 assert_ne!(s.autoload_type, s.target_type, "case must diverge: {rt:?}");
980 let has_cast = s.cast_sql.is_some();
981 let note = s.note.as_deref().unwrap_or("").to_lowercase();
982 let describes_recovery = note.contains("after load") || note.contains("override");
983 assert!(
984 has_cast || describes_recovery,
985 "divergent {rt:?} must offer a recovery (cast_sql or a recovery note)"
986 );
987 }
988 }
989
990 #[test]
993 fn bq_decimal_limit_boundaries() {
994 assert_eq!(
996 bq(&RivetType::Decimal {
997 precision: 76,
998 scale: 38
999 })
1000 .status,
1001 TargetStatus::Ok
1002 );
1003 assert_eq!(
1005 bq(&RivetType::Decimal {
1006 precision: 77,
1007 scale: 38
1008 })
1009 .status,
1010 TargetStatus::Fail
1011 );
1012 assert_eq!(
1014 bq(&RivetType::Decimal {
1015 precision: 76,
1016 scale: 39
1017 })
1018 .status,
1019 TargetStatus::Fail
1020 );
1021 assert_eq!(
1023 bq(&RivetType::Decimal {
1024 precision: 30,
1025 scale: 0
1026 })
1027 .target_type,
1028 "BIGNUMERIC"
1029 );
1030 }
1031
1032 #[test]
1033 fn duckdb_decimal_over_38_warns_not_silently_clamps() {
1034 let s = duck(&RivetType::Decimal {
1035 precision: 40,
1036 scale: 2,
1037 });
1038 assert_eq!(s.status, TargetStatus::Warn);
1039 }
1040
1041 #[test]
1044 fn bq_recovery_sql_casts_native_types() {
1045 use super::super::{SourceColumn, TimeUnit};
1046 let naive = RivetType::Timestamp {
1047 unit: TimeUnit::Microsecond,
1048 timezone: None,
1049 };
1050 let mappings = vec![
1051 TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1052 TypeMapping::from_source(
1053 &SourceColumn::simple("attrs", "jsonb", true),
1054 RivetType::Json,
1055 ),
1056 TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1057 TypeMapping::from_source(
1058 &SourceColumn::simple("created_at", "timestamp", true),
1059 naive,
1060 ),
1061 TypeMapping::from_source(
1062 &SourceColumn::simple("tags", "_text", true),
1063 RivetType::List {
1064 inner: Box::new(RivetType::String),
1065 },
1066 ),
1067 ];
1068 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1069 let sql = ExportTarget::BigQuery
1070 .recovery_sql(&specs, "payments")
1071 .expect("BigQuery has a recovery SQL");
1072 assert!(sql.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1075 assert!(sql.contains("TO_HEX(uid) AS uid"));
1076 assert!(sql.contains("DATETIME(created_at) AS created_at"));
1077 assert!(sql.contains("ARRAY(SELECT el.item FROM UNNEST(tags) AS el) AS tags"));
1080 assert!(sql.contains("--parquet_enable_list_inference"));
1081 assert!(sql.contains("SELECT\n id"));
1083 assert!(sql.contains("CREATE OR REPLACE TABLE `payments`"));
1085 assert!(sql.contains("FROM `payments__staging`"));
1086 }
1087
1088 #[test]
1089 fn duckdb_needs_no_recovery() {
1090 let mappings = vec![TypeMapping::from_source(
1091 &super::super::SourceColumn::simple("attrs", "json", true),
1092 RivetType::Json,
1093 )];
1094 let specs = ExportTarget::DuckDb.resolve_table(&mappings);
1095 assert!(
1096 ExportTarget::DuckDb.recovery_sql(&specs, "t").is_none(),
1097 "DuckDB autoloads every logical type natively — no recovery needed"
1098 );
1099 }
1100
1101 #[test]
1102 fn recovery_sql_projects_every_column_once_and_only_casts_divergent() {
1103 use super::super::{SourceColumn, TimeUnit};
1104 let naive = RivetType::Timestamp {
1105 unit: TimeUnit::Microsecond,
1106 timezone: None,
1107 };
1108 let cols: [(&str, RivetType); 6] = [
1109 ("id", RivetType::Int64), (
1111 "amount",
1112 RivetType::Decimal {
1113 precision: 18,
1114 scale: 2,
1115 },
1116 ), ("attrs", RivetType::Json), ("uid", RivetType::Uuid), ("created_at", naive), (
1121 "tags",
1122 RivetType::List {
1123 inner: Box::new(RivetType::String),
1124 },
1125 ), ];
1127 let mappings: Vec<_> = cols
1128 .iter()
1129 .cloned()
1130 .map(|(n, rt)| TypeMapping::from_source(&SourceColumn::simple(n, "x", true), rt))
1131 .collect();
1132 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1133 let sql = ExportTarget::BigQuery.recovery_sql(&specs, "t").unwrap();
1134
1135 let body = sql
1138 .split("SELECT\n")
1139 .nth(1)
1140 .and_then(|s| s.split("\nFROM").next())
1141 .expect("recovery SQL has a SELECT … FROM body");
1142 assert_eq!(
1143 body.split(",\n").count(),
1144 cols.len(),
1145 "one projection per column, got:\n{body}"
1146 );
1147 for (name, _) in &cols {
1148 assert!(body.contains(name), "column {name} missing:\n{body}");
1149 }
1150 assert!(body.contains(" id,") && !body.contains("AS id"));
1153 assert!(body.contains(" amount,") && !body.contains("AS amount"));
1154 assert!(body.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1155 assert!(body.contains("TO_HEX(uid) AS uid"));
1156 assert!(body.contains("DATETIME(created_at) AS created_at"));
1157 assert!(body.contains("UNNEST(tags) AS el) AS tags"));
1158 }
1159
1160 #[test]
1163 fn snowflake_autoload_degradations_and_native_casts() {
1164 let j = sf(&RivetType::Json);
1166 assert_eq!(j.target_type, "VARIANT");
1167 assert_eq!(j.autoload_type, "TEXT");
1168 assert!(j.cast_sql.unwrap().starts_with("PARSE_JSON"));
1169 let u = sf(&RivetType::Uuid);
1171 assert_eq!(u.target_type, "TEXT");
1172 assert_eq!(u.autoload_type, "BINARY");
1173 assert!(u.cast_sql.unwrap().contains("HEX_ENCODE"));
1174 let naive = RivetType::Timestamp {
1176 unit: super::super::TimeUnit::Microsecond,
1177 timezone: None,
1178 };
1179 let t = sf(&naive);
1180 assert_eq!(t.target_type, "TIMESTAMP_NTZ");
1181 assert_eq!(t.autoload_type, "NUMBER(38,0)");
1182 assert!(t.cast_sql.unwrap().contains("TO_TIMESTAMP_NTZ"));
1183 let tm = sf(&RivetType::Time {
1185 unit: super::super::TimeUnit::Microsecond,
1186 });
1187 assert_eq!(tm.target_type, "TIME");
1188 assert!(tm.cast_sql.unwrap().contains("TIME_FROM_PARTS"));
1189 let d = sf(&RivetType::Decimal {
1191 precision: 18,
1192 scale: 2,
1193 });
1194 assert_eq!(d.target_type, "NUMBER(18,2)");
1195 assert!(d.cast_sql.is_none());
1196 let l = sf(&RivetType::List {
1198 inner: Box::new(RivetType::Int64),
1199 });
1200 assert_eq!(l.target_type, "ARRAY");
1201 assert_eq!(l.autoload_type, "VARIANT");
1202 assert!(l.cast_sql.unwrap().ends_with("::ARRAY"));
1203 }
1204
1205 #[test]
1206 fn snowflake_recovery_sql_quotes_columns_and_casts() {
1207 use super::super::{SourceColumn, TimeUnit};
1208 let naive = RivetType::Timestamp {
1209 unit: TimeUnit::Microsecond,
1210 timezone: None,
1211 };
1212 let mappings = vec![
1213 TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1214 TypeMapping::from_source(
1215 &SourceColumn::simple("attrs", "jsonb", true),
1216 RivetType::Json,
1217 ),
1218 TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1219 TypeMapping::from_source(
1220 &SourceColumn::simple("created_at", "timestamp", true),
1221 naive,
1222 ),
1223 ];
1224 let specs = ExportTarget::Snowflake.resolve_table(&mappings);
1225 let sql = ExportTarget::Snowflake.recovery_sql(&specs, "t").unwrap();
1226 assert!(sql.contains("\"id\" AS id"));
1228 assert!(sql.contains("PARSE_JSON(\"attrs\") AS attrs"));
1229 assert!(sql.contains("HEX_ENCODE(\"uid\")"));
1230 assert!(sql.contains("TO_TIMESTAMP_NTZ(\"created_at\", 6) AS created_at"));
1231 assert!(sql.contains("BINARY_AS_TEXT=FALSE"));
1233 assert!(sql.contains("MATCH_BY_COLUMN_NAME"));
1234 assert!(sql.contains("FROM t__staging"));
1235 }
1236
1237 #[test]
1238 fn parse_accepts_snowflake() {
1239 assert_eq!(
1240 ExportTarget::parse("snowflake"),
1241 Some(ExportTarget::Snowflake)
1242 );
1243 assert_eq!(ExportTarget::parse("sf"), Some(ExportTarget::Snowflake));
1244 }
1245
1246 #[test]
1247 fn valid_target_names_lists_every_parseable_target() {
1248 let names = ExportTarget::valid_target_names();
1252 assert!(names.contains("snowflake"), "got: {names}");
1253 assert!(names.contains("bigquery"), "got: {names}");
1254 assert!(names.contains("duckdb"), "got: {names}");
1255 }
1256}