1use arrow::datatypes::DataType;
32use serde::Serialize;
33
34use super::{RivetType, TimeUnit, TypeFidelity, TypeMapping};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ExportTarget {
40 DuckDb,
43 BigQuery,
46 Snowflake,
49}
50
51impl ExportTarget {
52 pub fn parse(s: &str) -> Option<Self> {
53 match s.to_lowercase().as_str() {
54 "bigquery" | "bq" => Some(Self::BigQuery),
55 "duckdb" | "duck" => Some(Self::DuckDb),
56 "snowflake" | "sf" => Some(Self::Snowflake),
57 _ => None,
58 }
59 }
60
61 pub fn label(self) -> &'static str {
62 match self {
63 Self::BigQuery => "bigquery",
64 Self::DuckDb => "duckdb",
65 Self::Snowflake => "snowflake",
66 }
67 }
68
69 pub fn resolve_column(self, input: TargetInput<'_>) -> TargetColumnSpec {
71 let mut spec = match self {
72 ExportTarget::BigQuery => bigquery::resolve(&input),
73 ExportTarget::DuckDb => duckdb::resolve(&input),
74 ExportTarget::Snowflake => snowflake::resolve(&input),
75 };
76 if input.fidelity.is_unsafe_for_strict_mode() && spec.status == TargetStatus::Ok {
80 spec.status = TargetStatus::Warn;
81 }
82 spec
83 }
84
85 pub fn resolve_table(self, mappings: &[TypeMapping]) -> Vec<TargetColumnSpec> {
90 mappings
91 .iter()
92 .map(|m| self.resolve_column(TargetInput::from(m)))
93 .collect()
94 }
95
96 pub fn recovery_sql(self, specs: &[TargetColumnSpec], table: &str) -> Option<String> {
104 match self {
105 ExportTarget::BigQuery => Some(bigquery_recovery_sql(specs, table)),
106 ExportTarget::Snowflake => Some(snowflake_recovery_sql(specs, table)),
107 ExportTarget::DuckDb => None,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
114#[serde(rename_all = "snake_case")]
115pub enum TargetStatus {
116 Ok,
117 Warn,
118 Fail,
119}
120
121impl TargetStatus {
122 pub fn label(&self) -> &'static str {
123 match self {
124 Self::Ok => "ok",
125 Self::Warn => "warn",
126 Self::Fail => "fail",
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy)]
136pub struct TargetInput<'a> {
137 pub column_name: &'a str,
138 pub rivet_type: &'a RivetType,
139 #[allow(dead_code)]
142 pub arrow_type: Option<&'a DataType>,
143 pub fidelity: TypeFidelity,
144}
145
146impl<'a> From<&'a TypeMapping> for TargetInput<'a> {
147 fn from(m: &'a TypeMapping) -> Self {
148 TargetInput {
149 column_name: &m.column_name,
150 rivet_type: &m.rivet_type,
151 arrow_type: m.arrow_type.as_ref(),
152 fidelity: m.fidelity,
153 }
154 }
155}
156
157#[derive(Debug, Clone, Serialize)]
161pub struct TargetColumnSpec {
162 pub column_name: String,
164 pub target_type: String,
166 pub autoload_type: String,
169 pub status: TargetStatus,
170 #[serde(skip_serializing_if = "Option::is_none")]
171 pub note: Option<String>,
172 #[serde(skip_serializing_if = "Option::is_none")]
175 pub cast_sql: Option<String>,
176}
177
178struct Resolved {
181 target_type: String,
182 autoload_type: String,
183 status: TargetStatus,
184 note: Option<String>,
185 cast: Option<String>,
187}
188
189impl Resolved {
190 fn ok(t: impl Into<String>) -> Self {
191 let t = t.into();
192 Self {
193 autoload_type: t.clone(),
194 target_type: t,
195 status: TargetStatus::Ok,
196 note: None,
197 cast: None,
198 }
199 }
200 fn diverge(
203 native: impl Into<String>,
204 autoload: impl Into<String>,
205 note: impl Into<String>,
206 cast: Option<&str>,
207 ) -> Self {
208 Self {
209 target_type: native.into(),
210 autoload_type: autoload.into(),
211 status: TargetStatus::Warn,
212 note: Some(note.into()),
213 cast: cast.map(str::to_string),
214 }
215 }
216 fn warn(t: impl Into<String>, note: impl Into<String>) -> Self {
217 let t = t.into();
218 Self {
219 autoload_type: t.clone(),
220 target_type: t,
221 status: TargetStatus::Warn,
222 note: Some(note.into()),
223 cast: None,
224 }
225 }
226 fn fail(note: impl Into<String>) -> Self {
227 Self {
228 target_type: "-".into(),
229 autoload_type: "-".into(),
230 status: TargetStatus::Fail,
231 note: Some(note.into()),
232 cast: None,
233 }
234 }
235 fn into_spec(self, input: &TargetInput<'_>) -> TargetColumnSpec {
236 TargetColumnSpec {
237 column_name: input.column_name.to_string(),
238 target_type: self.target_type,
239 autoload_type: self.autoload_type,
240 status: self.status,
241 note: self.note,
242 cast_sql: self.cast.map(|t| t.replace("{col}", input.column_name)),
243 }
244 }
245}
246
247fn unsupported_reason(t: &RivetType) -> String {
248 match t {
249 RivetType::Unsupported { reason, .. } => reason.clone(),
250 _ => "no target mapping".into(),
251 }
252}
253
254fn recovery_projection(specs: &[TargetColumnSpec], passthrough: impl Fn(&str) -> String) -> String {
269 specs
270 .iter()
271 .map(|s| match &s.cast_sql {
272 Some(cast) => format!(" {cast} AS {name}", name = s.column_name),
273 None => passthrough(&s.column_name),
274 })
275 .collect::<Vec<_>>()
276 .join(",\n")
277}
278
279fn bigquery_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
280 let cols = recovery_projection(specs, |name| format!(" {name}"));
281 format!(
282 "-- 1) bq load --autodetect --parquet_enable_list_inference \
283 --source_format=PARQUET {table}__staging <parquet>\n\
284 -- 2) recover native types:\n\
285 CREATE OR REPLACE TABLE `{table}` AS\n\
286 SELECT\n{cols}\n\
287 FROM `{table}__staging`;"
288 )
289}
290
291fn snowflake_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
297 let cols = recovery_projection(specs, |name| format!(" \"{name}\" AS {name}"));
298 format!(
299 "-- 1) ALTER SESSION SET TIMEZONE='UTC';\n\
300 -- 2) CREATE OR REPLACE FILE FORMAT rivet_pq TYPE=PARQUET BINARY_AS_TEXT=FALSE;\n\
301 -- 3) PUT file://<parquet> @<stage> AUTO_COMPRESS=FALSE;\n\
302 -- 4) CREATE OR REPLACE TABLE {table}__staging USING TEMPLATE (SELECT ARRAY_AGG(\n\
303 -- OBJECT_CONSTRUCT(*)) FROM TABLE(INFER_SCHEMA(LOCATION=>'@<stage>', FILE_FORMAT=>'rivet_pq')));\n\
304 -- COPY INTO {table}__staging FROM @<stage> FILE_FORMAT=(FORMAT_NAME='rivet_pq') MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;\n\
305 -- 5) recover native types:\n\
306 CREATE OR REPLACE TABLE {table} AS\n\
307 SELECT\n{cols}\n\
308 FROM {table}__staging;"
309 )
310}
311
312mod bigquery {
315 use super::*;
316
317 const NUMERIC_MAX_P: u8 = 29;
319 const NUMERIC_MAX_S: i8 = 9;
320 const BIGNUMERIC_MAX_P: u8 = 76;
322 const BIGNUMERIC_MAX_S: i8 = 38;
323
324 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
325 native(input.rivet_type).into_spec(input)
326 }
327
328 fn native(t: &RivetType) -> Resolved {
329 match t {
330 RivetType::Bool => Resolved::ok("BOOL"),
331 RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("INT64"),
332 RivetType::UInt64 => Resolved::diverge(
337 "NUMERIC",
338 "INT64",
339 "UINT64 > INT64_MAX overflows the INT64 autoload and cannot be recovered after \
340 load — map the column to decimal(20,0) with a source column override",
341 None,
342 ),
343 RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT64"),
344 RivetType::Decimal { precision, scale } => decimal(*precision, *scale),
345 RivetType::Date => Resolved::ok("DATE"),
346 RivetType::Time { .. } => Resolved::ok("TIME"),
347 RivetType::Timestamp {
349 timezone: Some(_), ..
350 } => Resolved::ok("TIMESTAMP"),
351 RivetType::Timestamp {
360 unit: TimeUnit::Nanosecond,
361 timezone: None,
362 } => Resolved::diverge(
363 "INT64",
364 "INT64",
365 "nanosecond timestamp has no BigQuery native type — autoloads as INT64 (raw \
366 nanos, lossless); a native TIMESTAMP via TIMESTAMP_MICROS(DIV(col,1000)) drops \
367 sub-µs precision. Prefer `timestamp` (microsecond) for BigQuery targets.",
368 None,
369 ),
370 RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
374 "DATETIME",
375 "TIMESTAMP",
376 "naive timestamp autoloads as TIMESTAMP (an instant); recover wall-clock with \
377 DATETIME(col) after load",
378 Some("DATETIME({col})"),
379 ),
380 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("STRING"),
381 RivetType::Binary => Resolved::ok("BYTES"),
382 RivetType::Json => Resolved::diverge(
385 "JSON",
386 "BYTES",
387 "Parquet JSON logical type autoloads as BYTES in BigQuery; recover native JSON \
388 with PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(col)) after load",
389 Some("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING({col}))"),
390 ),
391 RivetType::Uuid => Resolved::diverge(
395 "STRING",
396 "BYTES",
397 "UUID autoloads as 16-byte BYTES in BigQuery; recover hex text with TO_HEX(col) \
398 after load (or keep BYTES)",
399 Some("TO_HEX({col})"),
400 ),
401 RivetType::Interval => Resolved::ok("STRING"),
402 RivetType::List { inner } => list(inner),
403 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
404 }
405 }
406
407 fn decimal(p: u8, s: i8) -> Resolved {
408 if s < 0 {
409 return Resolved::fail(format!(
410 "BigQuery has no negative scale; decimal({p},{s}) needs a STRING/INT64 cast"
411 ));
412 }
413 let native = if p <= NUMERIC_MAX_P && s <= NUMERIC_MAX_S {
414 "NUMERIC"
415 } else if p <= BIGNUMERIC_MAX_P && s <= BIGNUMERIC_MAX_S {
416 "BIGNUMERIC"
417 } else {
418 return Resolved::fail(format!(
419 "decimal({p},{s}) exceeds BigQuery BIGNUMERIC limits (max 76,38)"
420 ));
421 };
422 Resolved::ok(native)
423 }
424
425 fn list(inner: &RivetType) -> Resolved {
426 let inner_r = native(inner);
427 if inner_r.status == TargetStatus::Fail {
428 return Resolved::fail(format!(
429 "REPEATED of unsupported element: {}",
430 inner_r.target_type
431 ));
432 }
433 Resolved::diverge(
440 format!("REPEATED {}", inner_r.target_type),
441 format!("REPEATED RECORD{{item {}}}", inner_r.autoload_type),
442 "arrays load as REPEATED RECORD{item}; load the staging table with \
443 --parquet_enable_list_inference, then flatten with UNNEST after load",
444 Some("ARRAY(SELECT el.item FROM UNNEST({col}) AS el)"),
445 )
446 }
447}
448
449mod duckdb {
452 use super::*;
453
454 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
455 native(input.rivet_type).into_spec(input)
456 }
457
458 fn native(t: &RivetType) -> Resolved {
461 match t {
462 RivetType::Bool => Resolved::ok("BOOLEAN"),
463 RivetType::Int16 => Resolved::ok("SMALLINT"),
464 RivetType::Int32 => Resolved::ok("INTEGER"),
465 RivetType::Int64 => Resolved::ok("BIGINT"),
466 RivetType::UInt64 => Resolved::ok("UBIGINT"),
467 RivetType::Float32 => Resolved::ok("FLOAT"),
468 RivetType::Float64 => Resolved::ok("DOUBLE"),
469 RivetType::Decimal { precision, scale } => {
470 if *scale < 0 {
471 Resolved::warn(
472 "DECIMAL",
473 format!(
474 "DuckDB has no negative scale; decimal({precision},{scale}) loads via cast"
475 ),
476 )
477 } else if *precision <= 38 {
478 Resolved::ok(format!("DECIMAL({precision},{scale})"))
479 } else {
480 Resolved::warn(
482 "DECIMAL(38,*)",
483 format!("decimal({precision},{scale}) exceeds DuckDB DECIMAL(38); widens"),
484 )
485 }
486 }
487 RivetType::Date => Resolved::ok("DATE"),
488 RivetType::Time { .. } => Resolved::ok("TIME"),
489 RivetType::Timestamp {
490 timezone: Some(_), ..
491 } => Resolved::ok("TIMESTAMPTZ"),
492 RivetType::Timestamp {
495 unit: TimeUnit::Nanosecond,
496 timezone: None,
497 } => Resolved::ok("TIMESTAMP_NS"),
498 RivetType::Timestamp { timezone: None, .. } => Resolved::ok("TIMESTAMP"),
499 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("VARCHAR"),
500 RivetType::Binary => Resolved::ok("BLOB"),
501 RivetType::Json => Resolved::ok("JSON"),
502 RivetType::Uuid => Resolved::ok("UUID"),
503 RivetType::Interval => Resolved::ok("INTERVAL"),
504 RivetType::List { inner } => {
505 let inner_r = native(inner);
506 if inner_r.status == TargetStatus::Fail {
507 Resolved::fail(format!(
508 "LIST of unsupported element: {}",
509 inner_r.target_type
510 ))
511 } else {
512 Resolved::ok(format!("{}[]", inner_r.target_type))
513 }
514 }
515 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
516 }
517 }
518}
519
520mod snowflake {
523 use super::*;
524
525 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
526 native(input.rivet_type).into_spec(input)
527 }
528
529 fn native(t: &RivetType) -> Resolved {
534 match t {
535 RivetType::Bool => Resolved::ok("BOOLEAN"),
536 RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("NUMBER(38,0)"),
537 RivetType::UInt64 => Resolved::diverge(
539 "NUMBER(20,0)",
540 "NUMBER(38,0)",
541 "UINT64 > INT64_MAX overflows the Parquet read; map to decimal(20,0) at source",
542 None,
543 ),
544 RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT"),
545 RivetType::Decimal { precision, scale } => {
546 if *scale < 0 {
547 Resolved::warn(
548 "NUMBER",
549 format!(
550 "Snowflake NUMBER has no negative scale; decimal({precision},{scale}) loads via cast"
551 ),
552 )
553 } else {
554 Resolved::ok(format!("NUMBER({precision},{scale})"))
555 }
556 }
557 RivetType::Date => Resolved::ok("DATE"),
558 RivetType::Time { .. } => Resolved::diverge(
560 "TIME",
561 "NUMBER(38,0)",
562 "TIME autoloads as NUMBER (µs of day); recover with TIME_FROM_PARTS after load",
563 Some(r#"TIME_FROM_PARTS(0,0,FLOOR("{col}"/1000000),MOD("{col}",1000000)*1000)"#),
564 ),
565 RivetType::Timestamp {
567 timezone: Some(_), ..
568 } => Resolved::diverge(
569 "TIMESTAMP_TZ",
570 "TIMESTAMP_NTZ",
571 "tz timestamp autoloads as TIMESTAMP_NTZ — ALTER SESSION SET TIMEZONE='UTC' before COPY so the instant matches",
572 None,
573 ),
574 RivetType::Timestamp {
580 unit: TimeUnit::Nanosecond,
581 timezone: None,
582 } => Resolved::diverge(
583 "TIMESTAMP_NTZ",
584 "NUMBER(38,0)",
585 "nanosecond timestamp autoloads as NUMBER (ns since epoch); recover with \
586 TO_TIMESTAMP_NTZ(col, 9) after load — Snowflake TIMESTAMP_NTZ holds full ns precision",
587 Some(r#"TO_TIMESTAMP_NTZ("{col}", 9)"#),
588 ),
589 RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
591 "TIMESTAMP_NTZ",
592 "NUMBER(38,0)",
593 "naive timestamp autoloads as NUMBER (µs since epoch); recover with TO_TIMESTAMP_NTZ after load",
594 Some(r#"TO_TIMESTAMP_NTZ("{col}", 6)"#),
595 ),
596 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("TEXT"),
597 RivetType::Binary => Resolved::warn(
599 "BINARY",
600 "set BINARY_AS_TEXT=FALSE in the Parquet FILE FORMAT or non-UTF8 bytes fail to load",
601 ),
602 RivetType::Json => Resolved::diverge(
604 "VARIANT",
605 "TEXT",
606 "JSON autoloads as TEXT; recover native VARIANT with PARSE_JSON after load",
607 Some(r#"PARSE_JSON("{col}")"#),
608 ),
609 RivetType::Uuid => Resolved::diverge(
611 "TEXT",
612 "BINARY",
613 "UUID autoloads as 16-byte BINARY; recover canonical text with HEX_ENCODE + REGEXP after load",
614 Some(
615 r#"REGEXP_REPLACE(LOWER(HEX_ENCODE("{col}")),'^(.{8})(.{4})(.{4})(.{4})(.{12})$','\\1-\\2-\\3-\\4-\\5')"#,
616 ),
617 ),
618 RivetType::Interval => Resolved::ok("TEXT"),
619 RivetType::List { inner } => {
624 let inner_r = native(inner);
625 if inner_r.status == TargetStatus::Fail {
626 Resolved::fail(format!(
627 "ARRAY of unsupported element: {}",
628 inner_r.target_type
629 ))
630 } else {
631 Resolved::diverge(
632 "ARRAY",
633 "VARIANT",
634 "list autoloads as VARIANT (the JSON array); recover native ARRAY with ::ARRAY after load",
635 Some(r#""{col}"::ARRAY"#),
636 )
637 }
638 }
639 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
640 }
641 }
642}
643
644#[cfg(test)]
645mod tests {
646 use super::*;
647
648 fn input<'a>(rt: &'a RivetType) -> TargetInput<'a> {
649 TargetInput {
650 column_name: "c",
651 rivet_type: rt,
652 arrow_type: None,
653 fidelity: TypeFidelity::Exact,
654 }
655 }
656
657 fn bq(rt: &RivetType) -> TargetColumnSpec {
658 ExportTarget::BigQuery.resolve_column(input(rt))
659 }
660 fn duck(rt: &RivetType) -> TargetColumnSpec {
661 ExportTarget::DuckDb.resolve_column(input(rt))
662 }
663 fn sf(rt: &RivetType) -> TargetColumnSpec {
664 ExportTarget::Snowflake.resolve_column(input(rt))
665 }
666
667 #[test]
673 fn bq_nanosecond_timestamp_autoloads_as_int64() {
674 let ns = RivetType::Timestamp {
675 unit: super::super::TimeUnit::Nanosecond,
676 timezone: None,
677 };
678 let s = bq(&ns);
679 assert_eq!(s.target_type, "INT64");
680 assert_eq!(s.autoload_type, "INT64");
681 assert_eq!(s.status, TargetStatus::Warn);
682 assert!(s.cast_sql.is_none(), "ns→BQ has no lossless temporal cast");
684 }
685
686 #[test]
687 fn duckdb_nanosecond_timestamp_is_native_timestamp_ns() {
688 let ns = RivetType::Timestamp {
689 unit: super::super::TimeUnit::Nanosecond,
690 timezone: None,
691 };
692 let s = duck(&ns);
693 assert_eq!(s.target_type, "TIMESTAMP_NS");
694 assert_eq!(s.status, TargetStatus::Ok);
695 }
696
697 #[test]
698 fn snowflake_nanosecond_timestamp_recovers_losslessly_at_scale_9() {
699 let ns = RivetType::Timestamp {
700 unit: super::super::TimeUnit::Nanosecond,
701 timezone: None,
702 };
703 let s = sf(&ns);
704 assert_eq!(s.target_type, "TIMESTAMP_NTZ");
705 assert_eq!(s.autoload_type, "NUMBER(38,0)");
706 assert_eq!(s.cast_sql.as_deref(), Some(r#"TO_TIMESTAMP_NTZ("c", 9)"#));
709 }
710
711 #[test]
714 fn bq_uuid_resolves_not_fails() {
715 let s = bq(&RivetType::Uuid);
719 assert_eq!(s.target_type, "STRING");
720 assert_eq!(s.autoload_type, "BYTES");
721 assert_eq!(s.status, TargetStatus::Warn);
722 assert!(s.cast_sql.unwrap().contains("c"));
723 }
724
725 #[test]
726 fn bq_json_native_is_json_autoload_is_bytes() {
727 let s = bq(&RivetType::Json);
728 assert_eq!(s.target_type, "JSON");
729 assert_eq!(s.autoload_type, "BYTES");
730 assert_eq!(s.status, TargetStatus::Warn);
731 assert!(s.cast_sql.unwrap().starts_with("PARSE_JSON"));
732 }
733
734 #[test]
735 fn bq_naive_timestamp_is_datetime_native_timestamp_autoload() {
736 let naive = RivetType::Timestamp {
737 unit: super::super::TimeUnit::Microsecond,
738 timezone: None,
739 };
740 let s = bq(&naive);
741 assert_eq!(s.target_type, "DATETIME");
742 assert_eq!(s.autoload_type, "TIMESTAMP");
743 assert_eq!(s.status, TargetStatus::Warn);
744 }
745
746 #[test]
747 fn bq_tz_timestamp_is_timestamp_ok() {
748 let tz = RivetType::Timestamp {
749 unit: super::super::TimeUnit::Microsecond,
750 timezone: Some("UTC".into()),
751 };
752 let s = bq(&tz);
753 assert_eq!(s.target_type, "TIMESTAMP");
754 assert_eq!(s.autoload_type, "TIMESTAMP");
755 assert_eq!(s.status, TargetStatus::Ok);
756 }
757
758 #[test]
759 fn bq_decimal_within_numeric_is_numeric() {
760 let s = bq(&RivetType::Decimal {
761 precision: 18,
762 scale: 2,
763 });
764 assert_eq!(s.target_type, "NUMERIC");
765 assert_eq!(s.status, TargetStatus::Ok);
766 }
767
768 #[test]
769 fn bq_decimal_escalates_to_bignumeric() {
770 let s = bq(&RivetType::Decimal {
771 precision: 38,
772 scale: 9,
773 });
774 assert_eq!(s.target_type, "BIGNUMERIC");
775 assert_eq!(s.status, TargetStatus::Ok);
776 }
777
778 #[test]
779 fn bq_decimal_negative_scale_fails() {
780 let s = bq(&RivetType::Decimal {
781 precision: 5,
782 scale: -2,
783 });
784 assert_eq!(s.status, TargetStatus::Fail);
785 }
786
787 #[test]
788 fn bq_uint64_recommends_numeric_warns_overflow() {
789 let s = bq(&RivetType::UInt64);
790 assert_eq!(s.target_type, "NUMERIC");
791 assert_eq!(s.autoload_type, "INT64");
792 assert_eq!(s.status, TargetStatus::Warn);
793 }
794
795 #[test]
796 fn bq_list_is_repeated_native_record_autoload() {
797 let t = RivetType::List {
798 inner: Box::new(RivetType::String),
799 };
800 let s = bq(&t);
801 assert_eq!(s.target_type, "REPEATED STRING");
802 assert!(s.autoload_type.contains("REPEATED RECORD"));
803 assert_eq!(s.status, TargetStatus::Warn);
804 }
805
806 #[test]
807 fn bq_unsupported_is_fail_row_not_panic() {
808 let t = RivetType::Unsupported {
809 native_type: "geometry".into(),
810 reason: "no mapping".into(),
811 };
812 let s = bq(&t);
813 assert_eq!(s.status, TargetStatus::Fail);
814 assert_eq!(s.target_type, "-");
815 }
816
817 #[test]
818 fn bq_standard_scalars_ok() {
819 for (rt, native) in [
820 (RivetType::Bool, "BOOL"),
821 (RivetType::Int64, "INT64"),
822 (RivetType::Float64, "FLOAT64"),
823 (RivetType::Date, "DATE"),
824 (RivetType::String, "STRING"),
825 (RivetType::Binary, "BYTES"),
826 (RivetType::Enum, "STRING"),
827 ] {
828 let s = bq(&rt);
829 assert_eq!(s.target_type, native, "{rt:?}");
830 assert_eq!(s.autoload_type, native, "{rt:?}");
831 assert_eq!(s.status, TargetStatus::Ok, "{rt:?}");
832 }
833 }
834
835 #[test]
838 fn duckdb_reads_everything_natively() {
839 let naive = RivetType::Timestamp {
840 unit: super::super::TimeUnit::Microsecond,
841 timezone: None,
842 };
843 for rt in [
844 RivetType::Json,
845 RivetType::Uuid,
846 RivetType::UInt64,
847 naive,
848 RivetType::List {
849 inner: Box::new(RivetType::Int64),
850 },
851 ] {
852 let s = duck(&rt);
853 assert_eq!(
854 s.target_type, s.autoload_type,
855 "DuckDB autoload must equal native for {rt:?}"
856 );
857 assert_ne!(s.status, TargetStatus::Fail, "{rt:?}");
858 }
859 }
860
861 #[test]
862 fn duckdb_native_type_names() {
863 assert_eq!(duck(&RivetType::Json).target_type, "JSON");
864 assert_eq!(duck(&RivetType::Uuid).target_type, "UUID");
865 assert_eq!(duck(&RivetType::UInt64).target_type, "UBIGINT");
866 assert_eq!(
867 duck(&RivetType::Decimal {
868 precision: 18,
869 scale: 2
870 })
871 .target_type,
872 "DECIMAL(18,2)"
873 );
874 assert_eq!(
875 duck(&RivetType::List {
876 inner: Box::new(RivetType::Int64)
877 })
878 .target_type,
879 "BIGINT[]"
880 );
881 }
882
883 #[test]
884 fn parse_accepts_aliases() {
885 assert_eq!(ExportTarget::parse("bq"), Some(ExportTarget::BigQuery));
886 assert_eq!(
887 ExportTarget::parse("BigQuery"),
888 Some(ExportTarget::BigQuery)
889 );
890 assert_eq!(ExportTarget::parse("duckdb"), Some(ExportTarget::DuckDb));
891 assert_eq!(ExportTarget::parse("nope"), None);
892 }
893
894 #[test]
895 fn resolve_table_preserves_order_and_names() {
896 use super::super::SourceColumn;
897 let mappings = vec![
898 TypeMapping::from_source(&SourceColumn::simple("a", "int8", true), RivetType::Int64),
899 TypeMapping::from_source(&SourceColumn::simple("b", "jsonb", true), RivetType::Json),
900 ];
901 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
902 assert_eq!(specs.len(), 2);
903 assert_eq!(specs[0].column_name, "a");
904 assert_eq!(specs[1].column_name, "b");
905 assert_eq!(specs[1].target_type, "JSON");
906 }
907
908 #[test]
915 fn cast_sql_is_none_when_post_load_recovery_is_impossible() {
916 let u = bq(&RivetType::UInt64);
921 assert!(
922 u.cast_sql.is_none(),
923 "overflowed UINT64 has no lossless post-load recovery"
924 );
925 let note = u.note.unwrap().to_lowercase();
926 assert!(
927 note.contains("override"),
928 "UINT64 note must point to the source-side override, got: {note}"
929 );
930 }
931
932 #[test]
933 fn cast_sql_present_only_when_lossless_post_load() {
934 assert!(
937 bq(&RivetType::Json)
938 .cast_sql
939 .unwrap()
940 .contains("PARSE_JSON")
941 );
942 assert!(bq(&RivetType::Uuid).cast_sql.unwrap().contains("TO_HEX"));
943 let naive = RivetType::Timestamp {
944 unit: super::super::TimeUnit::Microsecond,
945 timezone: None,
946 };
947 assert!(bq(&naive).cast_sql.unwrap().contains("DATETIME"));
948 }
949
950 #[test]
951 fn every_divergence_offers_a_recovery_path() {
952 let naive = RivetType::Timestamp {
957 unit: super::super::TimeUnit::Microsecond,
958 timezone: None,
959 };
960 let cases = [
961 RivetType::Json,
962 RivetType::Uuid,
963 RivetType::UInt64,
964 naive,
965 RivetType::List {
966 inner: Box::new(RivetType::String),
967 },
968 ];
969 for rt in cases {
970 let s = bq(&rt);
971 assert_ne!(s.autoload_type, s.target_type, "case must diverge: {rt:?}");
972 let has_cast = s.cast_sql.is_some();
973 let note = s.note.as_deref().unwrap_or("").to_lowercase();
974 let describes_recovery = note.contains("after load") || note.contains("override");
975 assert!(
976 has_cast || describes_recovery,
977 "divergent {rt:?} must offer a recovery (cast_sql or a recovery note)"
978 );
979 }
980 }
981
982 #[test]
985 fn bq_decimal_limit_boundaries() {
986 assert_eq!(
988 bq(&RivetType::Decimal {
989 precision: 76,
990 scale: 38
991 })
992 .status,
993 TargetStatus::Ok
994 );
995 assert_eq!(
997 bq(&RivetType::Decimal {
998 precision: 77,
999 scale: 38
1000 })
1001 .status,
1002 TargetStatus::Fail
1003 );
1004 assert_eq!(
1006 bq(&RivetType::Decimal {
1007 precision: 76,
1008 scale: 39
1009 })
1010 .status,
1011 TargetStatus::Fail
1012 );
1013 assert_eq!(
1015 bq(&RivetType::Decimal {
1016 precision: 30,
1017 scale: 0
1018 })
1019 .target_type,
1020 "BIGNUMERIC"
1021 );
1022 }
1023
1024 #[test]
1025 fn duckdb_decimal_over_38_warns_not_silently_clamps() {
1026 let s = duck(&RivetType::Decimal {
1027 precision: 40,
1028 scale: 2,
1029 });
1030 assert_eq!(s.status, TargetStatus::Warn);
1031 }
1032
1033 #[test]
1036 fn bq_recovery_sql_casts_native_types() {
1037 use super::super::{SourceColumn, TimeUnit};
1038 let naive = RivetType::Timestamp {
1039 unit: TimeUnit::Microsecond,
1040 timezone: None,
1041 };
1042 let mappings = vec![
1043 TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1044 TypeMapping::from_source(
1045 &SourceColumn::simple("attrs", "jsonb", true),
1046 RivetType::Json,
1047 ),
1048 TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1049 TypeMapping::from_source(
1050 &SourceColumn::simple("created_at", "timestamp", true),
1051 naive,
1052 ),
1053 TypeMapping::from_source(
1054 &SourceColumn::simple("tags", "_text", true),
1055 RivetType::List {
1056 inner: Box::new(RivetType::String),
1057 },
1058 ),
1059 ];
1060 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1061 let sql = ExportTarget::BigQuery
1062 .recovery_sql(&specs, "payments")
1063 .expect("BigQuery has a recovery SQL");
1064 assert!(sql.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1067 assert!(sql.contains("TO_HEX(uid) AS uid"));
1068 assert!(sql.contains("DATETIME(created_at) AS created_at"));
1069 assert!(sql.contains("ARRAY(SELECT el.item FROM UNNEST(tags) AS el) AS tags"));
1072 assert!(sql.contains("--parquet_enable_list_inference"));
1073 assert!(sql.contains("SELECT\n id"));
1075 assert!(sql.contains("CREATE OR REPLACE TABLE `payments`"));
1077 assert!(sql.contains("FROM `payments__staging`"));
1078 }
1079
1080 #[test]
1081 fn duckdb_needs_no_recovery() {
1082 let mappings = vec![TypeMapping::from_source(
1083 &super::super::SourceColumn::simple("attrs", "json", true),
1084 RivetType::Json,
1085 )];
1086 let specs = ExportTarget::DuckDb.resolve_table(&mappings);
1087 assert!(
1088 ExportTarget::DuckDb.recovery_sql(&specs, "t").is_none(),
1089 "DuckDB autoloads every logical type natively — no recovery needed"
1090 );
1091 }
1092
1093 #[test]
1094 fn recovery_sql_projects_every_column_once_and_only_casts_divergent() {
1095 use super::super::{SourceColumn, TimeUnit};
1096 let naive = RivetType::Timestamp {
1097 unit: TimeUnit::Microsecond,
1098 timezone: None,
1099 };
1100 let cols: [(&str, RivetType); 6] = [
1101 ("id", RivetType::Int64), (
1103 "amount",
1104 RivetType::Decimal {
1105 precision: 18,
1106 scale: 2,
1107 },
1108 ), ("attrs", RivetType::Json), ("uid", RivetType::Uuid), ("created_at", naive), (
1113 "tags",
1114 RivetType::List {
1115 inner: Box::new(RivetType::String),
1116 },
1117 ), ];
1119 let mappings: Vec<_> = cols
1120 .iter()
1121 .cloned()
1122 .map(|(n, rt)| TypeMapping::from_source(&SourceColumn::simple(n, "x", true), rt))
1123 .collect();
1124 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1125 let sql = ExportTarget::BigQuery.recovery_sql(&specs, "t").unwrap();
1126
1127 let body = sql
1130 .split("SELECT\n")
1131 .nth(1)
1132 .and_then(|s| s.split("\nFROM").next())
1133 .expect("recovery SQL has a SELECT … FROM body");
1134 assert_eq!(
1135 body.split(",\n").count(),
1136 cols.len(),
1137 "one projection per column, got:\n{body}"
1138 );
1139 for (name, _) in &cols {
1140 assert!(body.contains(name), "column {name} missing:\n{body}");
1141 }
1142 assert!(body.contains(" id,") && !body.contains("AS id"));
1145 assert!(body.contains(" amount,") && !body.contains("AS amount"));
1146 assert!(body.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1147 assert!(body.contains("TO_HEX(uid) AS uid"));
1148 assert!(body.contains("DATETIME(created_at) AS created_at"));
1149 assert!(body.contains("UNNEST(tags) AS el) AS tags"));
1150 }
1151
1152 #[test]
1155 fn snowflake_autoload_degradations_and_native_casts() {
1156 let j = sf(&RivetType::Json);
1158 assert_eq!(j.target_type, "VARIANT");
1159 assert_eq!(j.autoload_type, "TEXT");
1160 assert!(j.cast_sql.unwrap().starts_with("PARSE_JSON"));
1161 let u = sf(&RivetType::Uuid);
1163 assert_eq!(u.target_type, "TEXT");
1164 assert_eq!(u.autoload_type, "BINARY");
1165 assert!(u.cast_sql.unwrap().contains("HEX_ENCODE"));
1166 let naive = RivetType::Timestamp {
1168 unit: super::super::TimeUnit::Microsecond,
1169 timezone: None,
1170 };
1171 let t = sf(&naive);
1172 assert_eq!(t.target_type, "TIMESTAMP_NTZ");
1173 assert_eq!(t.autoload_type, "NUMBER(38,0)");
1174 assert!(t.cast_sql.unwrap().contains("TO_TIMESTAMP_NTZ"));
1175 let tm = sf(&RivetType::Time {
1177 unit: super::super::TimeUnit::Microsecond,
1178 });
1179 assert_eq!(tm.target_type, "TIME");
1180 assert!(tm.cast_sql.unwrap().contains("TIME_FROM_PARTS"));
1181 let d = sf(&RivetType::Decimal {
1183 precision: 18,
1184 scale: 2,
1185 });
1186 assert_eq!(d.target_type, "NUMBER(18,2)");
1187 assert!(d.cast_sql.is_none());
1188 let l = sf(&RivetType::List {
1190 inner: Box::new(RivetType::Int64),
1191 });
1192 assert_eq!(l.target_type, "ARRAY");
1193 assert_eq!(l.autoload_type, "VARIANT");
1194 assert!(l.cast_sql.unwrap().ends_with("::ARRAY"));
1195 }
1196
1197 #[test]
1198 fn snowflake_recovery_sql_quotes_columns_and_casts() {
1199 use super::super::{SourceColumn, TimeUnit};
1200 let naive = RivetType::Timestamp {
1201 unit: TimeUnit::Microsecond,
1202 timezone: None,
1203 };
1204 let mappings = vec![
1205 TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1206 TypeMapping::from_source(
1207 &SourceColumn::simple("attrs", "jsonb", true),
1208 RivetType::Json,
1209 ),
1210 TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1211 TypeMapping::from_source(
1212 &SourceColumn::simple("created_at", "timestamp", true),
1213 naive,
1214 ),
1215 ];
1216 let specs = ExportTarget::Snowflake.resolve_table(&mappings);
1217 let sql = ExportTarget::Snowflake.recovery_sql(&specs, "t").unwrap();
1218 assert!(sql.contains("\"id\" AS id"));
1220 assert!(sql.contains("PARSE_JSON(\"attrs\") AS attrs"));
1221 assert!(sql.contains("HEX_ENCODE(\"uid\")"));
1222 assert!(sql.contains("TO_TIMESTAMP_NTZ(\"created_at\", 6) AS created_at"));
1223 assert!(sql.contains("BINARY_AS_TEXT=FALSE"));
1225 assert!(sql.contains("MATCH_BY_COLUMN_NAME"));
1226 assert!(sql.contains("FROM t__staging"));
1227 }
1228
1229 #[test]
1230 fn parse_accepts_snowflake() {
1231 assert_eq!(
1232 ExportTarget::parse("snowflake"),
1233 Some(ExportTarget::Snowflake)
1234 );
1235 assert_eq!(ExportTarget::parse("sf"), Some(ExportTarget::Snowflake));
1236 }
1237}