1use arrow::datatypes::DataType;
32use serde::Serialize;
33
34use super::{RivetType, TimeUnit, TypeFidelity, TypeMapping};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ExportTarget {
40 DuckDb,
43 BigQuery,
46 Snowflake,
49 ClickHouse,
55}
56
57impl ExportTarget {
58 pub fn parse(s: &str) -> Option<Self> {
59 match s.to_lowercase().as_str() {
60 "bigquery" | "bq" => Some(Self::BigQuery),
61 "duckdb" | "duck" => Some(Self::DuckDb),
62 "snowflake" | "sf" => Some(Self::Snowflake),
63 "clickhouse" | "ch" => Some(Self::ClickHouse),
64 _ => None,
65 }
66 }
67
68 pub fn valid_target_names() -> &'static str {
73 "bigquery (bq), duckdb (duck), snowflake (sf), clickhouse (ch)"
74 }
75
76 pub fn label(self) -> &'static str {
77 match self {
78 Self::BigQuery => "bigquery",
79 Self::DuckDb => "duckdb",
80 Self::Snowflake => "snowflake",
81 Self::ClickHouse => "clickhouse",
82 }
83 }
84
85 pub fn resolve_column(self, input: TargetInput<'_>) -> TargetColumnSpec {
87 let mut spec = match self {
88 ExportTarget::BigQuery => bigquery::resolve(&input),
89 ExportTarget::DuckDb => duckdb::resolve(&input),
90 ExportTarget::Snowflake => snowflake::resolve(&input),
91 ExportTarget::ClickHouse => clickhouse::resolve(&input),
92 };
93 if input.fidelity.is_unsafe_for_strict_mode() && spec.status == TargetStatus::Ok {
97 spec.status = TargetStatus::Warn;
98 }
99 spec
100 }
101
102 pub fn resolve_table(self, mappings: &[TypeMapping]) -> Vec<TargetColumnSpec> {
107 mappings
108 .iter()
109 .map(|m| self.resolve_column(TargetInput::from(m)))
110 .collect()
111 }
112
113 pub fn recovery_sql(self, specs: &[TargetColumnSpec], table: &str) -> Option<String> {
121 match self {
122 ExportTarget::BigQuery => Some(bigquery_recovery_sql(specs, table)),
123 ExportTarget::Snowflake => Some(snowflake_recovery_sql(specs, table)),
124 ExportTarget::ClickHouse => Some(clickhouse_recovery_sql(specs, table)),
125 ExportTarget::DuckDb => None,
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
132#[serde(rename_all = "snake_case")]
133pub enum TargetStatus {
134 Ok,
135 Warn,
136 Fail,
137}
138
139impl TargetStatus {
140 pub fn label(&self) -> &'static str {
141 match self {
142 Self::Ok => "ok",
143 Self::Warn => "warn",
144 Self::Fail => "fail",
145 }
146 }
147}
148
149#[derive(Debug, Clone, Copy)]
154pub struct TargetInput<'a> {
155 pub column_name: &'a str,
156 pub rivet_type: &'a RivetType,
157 #[allow(dead_code)]
160 pub arrow_type: Option<&'a DataType>,
161 pub fidelity: TypeFidelity,
162}
163
164impl<'a> From<&'a TypeMapping> for TargetInput<'a> {
165 fn from(m: &'a TypeMapping) -> Self {
166 TargetInput {
167 column_name: &m.column_name,
168 rivet_type: &m.rivet_type,
169 arrow_type: m.arrow_type.as_ref(),
170 fidelity: m.fidelity,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Serialize)]
179pub struct TargetColumnSpec {
180 pub column_name: String,
182 pub target_type: String,
184 pub autoload_type: String,
187 pub status: TargetStatus,
188 #[serde(skip_serializing_if = "Option::is_none")]
189 pub note: Option<String>,
190 #[serde(skip_serializing_if = "Option::is_none")]
193 pub cast_sql: Option<String>,
194}
195
196struct Resolved {
199 target_type: String,
200 autoload_type: String,
201 status: TargetStatus,
202 note: Option<String>,
203 cast: Option<String>,
205}
206
207impl Resolved {
208 fn ok(t: impl Into<String>) -> Self {
209 let t = t.into();
210 Self {
211 autoload_type: t.clone(),
212 target_type: t,
213 status: TargetStatus::Ok,
214 note: None,
215 cast: None,
216 }
217 }
218 fn diverge(
221 native: impl Into<String>,
222 autoload: impl Into<String>,
223 note: impl Into<String>,
224 cast: Option<&str>,
225 ) -> Self {
226 Self {
227 target_type: native.into(),
228 autoload_type: autoload.into(),
229 status: TargetStatus::Warn,
230 note: Some(note.into()),
231 cast: cast.map(str::to_string),
232 }
233 }
234 fn warn(t: impl Into<String>, note: impl Into<String>) -> Self {
235 let t = t.into();
236 Self {
237 autoload_type: t.clone(),
238 target_type: t,
239 status: TargetStatus::Warn,
240 note: Some(note.into()),
241 cast: None,
242 }
243 }
244 fn fail(note: impl Into<String>) -> Self {
245 Self {
246 target_type: "-".into(),
247 autoload_type: "-".into(),
248 status: TargetStatus::Fail,
249 note: Some(note.into()),
250 cast: None,
251 }
252 }
253 fn into_spec(self, input: &TargetInput<'_>) -> TargetColumnSpec {
254 TargetColumnSpec {
255 column_name: input.column_name.to_string(),
256 target_type: self.target_type,
257 autoload_type: self.autoload_type,
258 status: self.status,
259 note: self.note,
260 cast_sql: self.cast.map(|t| t.replace("{col}", input.column_name)),
261 }
262 }
263}
264
265fn unsupported_reason(t: &RivetType) -> String {
266 match t {
267 RivetType::Unsupported { reason, .. } => reason.clone(),
268 _ => "no target mapping".into(),
269 }
270}
271
272fn recovery_projection(specs: &[TargetColumnSpec], passthrough: impl Fn(&str) -> String) -> String {
287 specs
288 .iter()
289 .map(|s| match &s.cast_sql {
290 Some(cast) => format!(" {cast} AS {name}", name = s.column_name),
291 None => passthrough(&s.column_name),
292 })
293 .collect::<Vec<_>>()
294 .join(",\n")
295}
296
297fn bigquery_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
298 let cols = recovery_projection(specs, |name| format!(" {name}"));
299 format!(
300 "-- 1) bq load --autodetect --parquet_enable_list_inference \
301 --source_format=PARQUET {table}__staging <parquet>\n\
302 -- 2) recover native types:\n\
303 CREATE OR REPLACE TABLE `{table}` AS\n\
304 SELECT\n{cols}\n\
305 FROM `{table}__staging`;"
306 )
307}
308
309fn snowflake_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
315 let cols = recovery_projection(specs, |name| format!(" \"{name}\" AS {name}"));
316 format!(
317 "-- 1) ALTER SESSION SET TIMEZONE='UTC';\n\
318 -- 2) CREATE OR REPLACE FILE FORMAT rivet_pq TYPE=PARQUET BINARY_AS_TEXT=FALSE;\n\
319 -- 3) PUT file://<parquet> @<stage> AUTO_COMPRESS=FALSE;\n\
320 -- 4) CREATE OR REPLACE TABLE {table}__staging USING TEMPLATE (SELECT ARRAY_AGG(\n\
321 -- OBJECT_CONSTRUCT(*)) FROM TABLE(INFER_SCHEMA(LOCATION=>'@<stage>', FILE_FORMAT=>'rivet_pq')));\n\
322 -- COPY INTO {table}__staging FROM @<stage> FILE_FORMAT=(FORMAT_NAME='rivet_pq') MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;\n\
323 -- 5) recover native types:\n\
324 CREATE OR REPLACE TABLE {table} AS\n\
325 SELECT\n{cols}\n\
326 FROM {table}__staging;"
327 )
328}
329
330mod bigquery {
333 use super::*;
334
335 const NUMERIC_MAX_P: u8 = 29;
337 const NUMERIC_MAX_S: i8 = 9;
338 const BIGNUMERIC_MAX_P: u8 = 76;
340 const BIGNUMERIC_MAX_S: i8 = 38;
341
342 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
343 native(input.rivet_type).into_spec(input)
344 }
345
346 fn native(t: &RivetType) -> Resolved {
347 match t {
348 RivetType::Bool => Resolved::ok("BOOL"),
349 RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("INT64"),
350 RivetType::UInt64 => Resolved::diverge(
355 "NUMERIC",
356 "INT64",
357 "UINT64 > INT64_MAX overflows the INT64 autoload and cannot be recovered after \
358 load — map the column to decimal(20,0) with a source column override",
359 None,
360 ),
361 RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT64"),
362 RivetType::Decimal { precision, scale } => decimal(*precision, *scale),
363 RivetType::Date => Resolved::ok("DATE"),
364 RivetType::Time { .. } => Resolved::ok("TIME"),
365 RivetType::Timestamp {
367 timezone: Some(_), ..
368 } => Resolved::ok("TIMESTAMP"),
369 RivetType::Timestamp {
378 unit: TimeUnit::Nanosecond,
379 timezone: None,
380 } => Resolved::diverge(
381 "INT64",
382 "INT64",
383 "nanosecond timestamp has no BigQuery native type — autoloads as INT64 (raw \
384 nanos, lossless); a native TIMESTAMP via TIMESTAMP_MICROS(DIV(col,1000)) drops \
385 sub-µs precision. Prefer `timestamp` (microsecond) for BigQuery targets.",
386 None,
387 ),
388 RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
392 "DATETIME",
393 "TIMESTAMP",
394 "naive timestamp autoloads as TIMESTAMP (an instant); recover wall-clock with \
395 DATETIME(col) after load",
396 Some("DATETIME({col})"),
397 ),
398 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("STRING"),
399 RivetType::Binary => Resolved::ok("BYTES"),
400 RivetType::Json => Resolved::diverge(
403 "JSON",
404 "BYTES",
405 "Parquet JSON logical type autoloads as BYTES in BigQuery; recover native JSON \
406 with PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(col)) after load",
407 Some("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING({col}))"),
408 ),
409 RivetType::Uuid => Resolved::diverge(
413 "STRING",
414 "BYTES",
415 "UUID autoloads as 16-byte BYTES in BigQuery; recover hex text with TO_HEX(col) \
416 after load (or keep BYTES)",
417 Some("TO_HEX({col})"),
418 ),
419 RivetType::Interval => Resolved::ok("STRING"),
420 RivetType::List { inner } => list(inner),
421 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
422 }
423 }
424
425 fn decimal(p: u8, s: i8) -> Resolved {
426 if s < 0 {
427 return Resolved::fail(format!(
428 "BigQuery has no negative scale; decimal({p},{s}) needs a STRING/INT64 cast"
429 ));
430 }
431 let native = if p <= NUMERIC_MAX_P && s <= NUMERIC_MAX_S {
432 "NUMERIC"
433 } else if p <= BIGNUMERIC_MAX_P && s <= BIGNUMERIC_MAX_S {
434 "BIGNUMERIC"
435 } else {
436 return Resolved::fail(format!(
437 "decimal({p},{s}) exceeds BigQuery BIGNUMERIC limits (max 76,38)"
438 ));
439 };
440 Resolved::ok(native)
441 }
442
443 fn list(inner: &RivetType) -> Resolved {
444 let inner_r = native(inner);
445 if inner_r.status == TargetStatus::Fail {
446 return Resolved::fail(format!(
447 "REPEATED of unsupported element: {}",
448 inner_r.target_type
449 ));
450 }
451 Resolved::diverge(
458 format!("REPEATED {}", inner_r.target_type),
459 format!("REPEATED RECORD{{item {}}}", inner_r.autoload_type),
460 "arrays load as REPEATED RECORD{item}; load the staging table with \
461 --parquet_enable_list_inference, then flatten with UNNEST after load",
462 Some("ARRAY(SELECT el.item FROM UNNEST({col}) AS el)"),
463 )
464 }
465}
466
467mod duckdb {
470 use super::*;
471
472 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
473 native(input.rivet_type).into_spec(input)
474 }
475
476 fn native(t: &RivetType) -> Resolved {
479 match t {
480 RivetType::Bool => Resolved::ok("BOOLEAN"),
481 RivetType::Int16 => Resolved::ok("SMALLINT"),
482 RivetType::Int32 => Resolved::ok("INTEGER"),
483 RivetType::Int64 => Resolved::ok("BIGINT"),
484 RivetType::UInt64 => Resolved::ok("UBIGINT"),
485 RivetType::Float32 => Resolved::ok("FLOAT"),
486 RivetType::Float64 => Resolved::ok("DOUBLE"),
487 RivetType::Decimal { precision, scale } => {
488 if *scale < 0 {
489 Resolved::warn(
490 "DECIMAL",
491 format!(
492 "DuckDB has no negative scale; decimal({precision},{scale}) loads via cast"
493 ),
494 )
495 } else if *precision <= 38 {
496 Resolved::ok(format!("DECIMAL({precision},{scale})"))
497 } else {
498 Resolved::warn(
500 "DECIMAL(38,*)",
501 format!("decimal({precision},{scale}) exceeds DuckDB DECIMAL(38); widens"),
502 )
503 }
504 }
505 RivetType::Date => Resolved::ok("DATE"),
506 RivetType::Time { .. } => Resolved::ok("TIME"),
507 RivetType::Timestamp {
508 timezone: Some(_), ..
509 } => Resolved::ok("TIMESTAMPTZ"),
510 RivetType::Timestamp {
513 unit: TimeUnit::Nanosecond,
514 timezone: None,
515 } => Resolved::ok("TIMESTAMP_NS"),
516 RivetType::Timestamp { timezone: None, .. } => Resolved::ok("TIMESTAMP"),
517 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("VARCHAR"),
518 RivetType::Binary => Resolved::ok("BLOB"),
519 RivetType::Json => Resolved::ok("JSON"),
520 RivetType::Uuid => Resolved::ok("UUID"),
521 RivetType::Interval => Resolved::ok("INTERVAL"),
522 RivetType::List { inner } => {
523 let inner_r = native(inner);
524 if inner_r.status == TargetStatus::Fail {
525 Resolved::fail(format!(
526 "LIST of unsupported element: {}",
527 inner_r.target_type
528 ))
529 } else {
530 Resolved::ok(format!("{}[]", inner_r.target_type))
531 }
532 }
533 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
534 }
535 }
536}
537
538mod snowflake {
541 use super::*;
542
543 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
544 native(input.rivet_type).into_spec(input)
545 }
546
547 fn native(t: &RivetType) -> Resolved {
552 match t {
553 RivetType::Bool => Resolved::ok("BOOLEAN"),
554 RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("NUMBER(38,0)"),
555 RivetType::UInt64 => Resolved::diverge(
557 "NUMBER(20,0)",
558 "NUMBER(38,0)",
559 "UINT64 > INT64_MAX overflows the Parquet read; map to decimal(20,0) at source",
560 None,
561 ),
562 RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT"),
563 RivetType::Decimal { precision, scale } => {
564 if *scale < 0 {
565 Resolved::warn(
566 "NUMBER",
567 format!(
568 "Snowflake NUMBER has no negative scale; decimal({precision},{scale}) loads via cast"
569 ),
570 )
571 } else {
572 Resolved::ok(format!("NUMBER({precision},{scale})"))
573 }
574 }
575 RivetType::Date => Resolved::ok("DATE"),
576 RivetType::Time { .. } => Resolved::diverge(
578 "TIME",
579 "NUMBER(38,0)",
580 "TIME autoloads as NUMBER (µs of day); recover with TIME_FROM_PARTS after load",
581 Some(r#"TIME_FROM_PARTS(0,0,FLOOR("{col}"/1000000),MOD("{col}",1000000)*1000)"#),
582 ),
583 RivetType::Timestamp {
585 timezone: Some(_), ..
586 } => Resolved::diverge(
587 "TIMESTAMP_TZ",
588 "TIMESTAMP_NTZ",
589 "tz timestamp autoloads as TIMESTAMP_NTZ — ALTER SESSION SET TIMEZONE='UTC' before COPY so the instant matches",
590 None,
591 ),
592 RivetType::Timestamp {
598 unit: TimeUnit::Nanosecond,
599 timezone: None,
600 } => Resolved::diverge(
601 "TIMESTAMP_NTZ",
602 "NUMBER(38,0)",
603 "nanosecond timestamp autoloads as NUMBER (ns since epoch); recover with \
604 TO_TIMESTAMP_NTZ(col, 9) after load — Snowflake TIMESTAMP_NTZ holds full ns precision",
605 Some(r#"TO_TIMESTAMP_NTZ("{col}", 9)"#),
606 ),
607 RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
609 "TIMESTAMP_NTZ",
610 "NUMBER(38,0)",
611 "naive timestamp autoloads as NUMBER (µs since epoch); recover with TO_TIMESTAMP_NTZ after load",
612 Some(r#"TO_TIMESTAMP_NTZ("{col}", 6)"#),
613 ),
614 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("TEXT"),
615 RivetType::Binary => Resolved::warn(
617 "BINARY",
618 "set BINARY_AS_TEXT=FALSE in the Parquet FILE FORMAT or non-UTF8 bytes fail to load",
619 ),
620 RivetType::Json => Resolved::diverge(
622 "VARIANT",
623 "TEXT",
624 "JSON autoloads as TEXT; recover native VARIANT with PARSE_JSON after load",
625 Some(r#"PARSE_JSON("{col}")"#),
626 ),
627 RivetType::Uuid => Resolved::diverge(
629 "TEXT",
630 "BINARY",
631 "UUID autoloads as 16-byte BINARY; recover canonical text with HEX_ENCODE + REGEXP after load",
632 Some(
633 r#"REGEXP_REPLACE(LOWER(HEX_ENCODE("{col}")),'^(.{8})(.{4})(.{4})(.{4})(.{12})$','\\1-\\2-\\3-\\4-\\5')"#,
634 ),
635 ),
636 RivetType::Interval => Resolved::ok("TEXT"),
637 RivetType::List { inner } => {
642 let inner_r = native(inner);
643 if inner_r.status == TargetStatus::Fail {
644 Resolved::fail(format!(
645 "ARRAY of unsupported element: {}",
646 inner_r.target_type
647 ))
648 } else {
649 Resolved::diverge(
650 "ARRAY",
651 "VARIANT",
652 "list autoloads as VARIANT (the JSON array); recover native ARRAY with ::ARRAY after load",
653 Some(r#""{col}"::ARRAY"#),
654 )
655 }
656 }
657 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
658 }
659 }
660}
661
662fn clickhouse_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
665 let cols = recovery_projection(specs, |name| format!(" {name}"));
666 format!(
667 "-- 1) load the Parquet into a staging table, e.g.\n\
668 -- CREATE TABLE {table}__staging ENGINE = MergeTree ORDER BY tuple() AS\n\
669 -- SELECT * FROM file('<parquet>', 'Parquet');\n\
670 -- 2) recover native types:\n\
671 CREATE TABLE {table} ENGINE = MergeTree ORDER BY tuple() AS\n\
672 SELECT\n{cols}\n\
673 FROM {table}__staging;"
674 )
675}
676
677mod clickhouse {
678 use super::*;
679
680 pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
681 native(input.rivet_type).into_spec(input)
682 }
683
684 fn native(t: &RivetType) -> Resolved {
690 match t {
691 RivetType::Bool => Resolved::ok("Bool"),
692 RivetType::Int16 => Resolved::ok("Int16"),
693 RivetType::Int32 => Resolved::ok("Int32"),
694 RivetType::Int64 => Resolved::ok("Int64"),
695 RivetType::UInt64 => Resolved::ok("UInt64"),
699 RivetType::Float32 => Resolved::ok("Float32"),
700 RivetType::Float64 => Resolved::ok("Float64"),
701 RivetType::Decimal { precision, scale } => {
702 if *scale < 0 {
703 Resolved::warn(
704 "Decimal",
705 format!(
706 "ClickHouse Decimal has no negative scale; decimal({precision},{scale}) needs a declared schema"
707 ),
708 )
709 } else {
710 Resolved::ok(format!("Decimal({precision}, {scale})"))
711 }
712 }
713 RivetType::Date => Resolved::ok("Date32"),
714 RivetType::Time { .. } => Resolved::warn(
717 "Int64",
718 "ClickHouse has no TIME type; time-of-day autoloads as Int64 (µs of day)",
719 ),
720 RivetType::Timestamp { unit, timezone } => {
723 let p = match unit {
724 TimeUnit::Second => 0,
725 TimeUnit::Millisecond => 3,
726 TimeUnit::Microsecond => 6,
727 TimeUnit::Nanosecond => 9,
728 };
729 match timezone {
730 Some(tz) => Resolved::ok(format!("DateTime64({p}, '{tz}')")),
731 None => Resolved::ok(format!("DateTime64({p})")),
732 }
733 }
734 RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("String"),
735 RivetType::Binary => Resolved::ok("String"),
738 RivetType::Json => Resolved::diverge(
743 "JSON",
744 "String",
745 "JSON autoloads as String (valid JSON text); declare a JSON column at load for the native type",
746 None,
747 ),
748 RivetType::Uuid => Resolved::diverge(
752 "UUID",
753 "FixedString(16)",
754 "UUID autoloads as FixedString(16); recover the native UUID with toUUID after load",
755 Some(
756 "toUUID(concat(substring(lower(hex({col})),1,8),'-',substring(lower(hex({col})),9,4),'-',substring(lower(hex({col})),13,4),'-',substring(lower(hex({col})),17,4),'-',substring(lower(hex({col})),21,12)))",
757 ),
758 ),
759 RivetType::Interval => Resolved::ok("String"),
760 RivetType::List { inner } => {
763 let inner_r = native(inner);
764 if inner_r.status == TargetStatus::Fail {
765 Resolved::fail(format!(
766 "Array of unsupported element: {}",
767 inner_r.target_type
768 ))
769 } else {
770 Resolved::ok(format!("Array(Nullable({}))", inner_r.target_type))
771 }
772 }
773 RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
774 }
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781
782 fn input<'a>(rt: &'a RivetType) -> TargetInput<'a> {
783 TargetInput {
784 column_name: "c",
785 rivet_type: rt,
786 arrow_type: None,
787 fidelity: TypeFidelity::Exact,
788 }
789 }
790
791 fn bq(rt: &RivetType) -> TargetColumnSpec {
792 ExportTarget::BigQuery.resolve_column(input(rt))
793 }
794 fn duck(rt: &RivetType) -> TargetColumnSpec {
795 ExportTarget::DuckDb.resolve_column(input(rt))
796 }
797 fn sf(rt: &RivetType) -> TargetColumnSpec {
798 ExportTarget::Snowflake.resolve_column(input(rt))
799 }
800
801 #[test]
807 fn bq_nanosecond_timestamp_autoloads_as_int64() {
808 let ns = RivetType::Timestamp {
809 unit: super::super::TimeUnit::Nanosecond,
810 timezone: None,
811 };
812 let s = bq(&ns);
813 assert_eq!(s.target_type, "INT64");
814 assert_eq!(s.autoload_type, "INT64");
815 assert_eq!(s.status, TargetStatus::Warn);
816 assert!(s.cast_sql.is_none(), "ns→BQ has no lossless temporal cast");
818 }
819
820 #[test]
821 fn duckdb_nanosecond_timestamp_is_native_timestamp_ns() {
822 let ns = RivetType::Timestamp {
823 unit: super::super::TimeUnit::Nanosecond,
824 timezone: None,
825 };
826 let s = duck(&ns);
827 assert_eq!(s.target_type, "TIMESTAMP_NS");
828 assert_eq!(s.status, TargetStatus::Ok);
829 }
830
831 #[test]
832 fn snowflake_nanosecond_timestamp_recovers_losslessly_at_scale_9() {
833 let ns = RivetType::Timestamp {
834 unit: super::super::TimeUnit::Nanosecond,
835 timezone: None,
836 };
837 let s = sf(&ns);
838 assert_eq!(s.target_type, "TIMESTAMP_NTZ");
839 assert_eq!(s.autoload_type, "NUMBER(38,0)");
840 assert_eq!(s.cast_sql.as_deref(), Some(r#"TO_TIMESTAMP_NTZ("c", 9)"#));
843 }
844
845 #[test]
848 fn bq_uuid_resolves_not_fails() {
849 let s = bq(&RivetType::Uuid);
853 assert_eq!(s.target_type, "STRING");
854 assert_eq!(s.autoload_type, "BYTES");
855 assert_eq!(s.status, TargetStatus::Warn);
856 assert!(s.cast_sql.unwrap().contains("c"));
857 }
858
859 #[test]
860 fn bq_json_native_is_json_autoload_is_bytes() {
861 let s = bq(&RivetType::Json);
862 assert_eq!(s.target_type, "JSON");
863 assert_eq!(s.autoload_type, "BYTES");
864 assert_eq!(s.status, TargetStatus::Warn);
865 assert!(s.cast_sql.unwrap().starts_with("PARSE_JSON"));
866 }
867
868 #[test]
869 fn bq_naive_timestamp_is_datetime_native_timestamp_autoload() {
870 let naive = RivetType::Timestamp {
871 unit: super::super::TimeUnit::Microsecond,
872 timezone: None,
873 };
874 let s = bq(&naive);
875 assert_eq!(s.target_type, "DATETIME");
876 assert_eq!(s.autoload_type, "TIMESTAMP");
877 assert_eq!(s.status, TargetStatus::Warn);
878 }
879
880 #[test]
881 fn bq_tz_timestamp_is_timestamp_ok() {
882 let tz = RivetType::Timestamp {
883 unit: super::super::TimeUnit::Microsecond,
884 timezone: Some("UTC".into()),
885 };
886 let s = bq(&tz);
887 assert_eq!(s.target_type, "TIMESTAMP");
888 assert_eq!(s.autoload_type, "TIMESTAMP");
889 assert_eq!(s.status, TargetStatus::Ok);
890 }
891
892 #[test]
893 fn bq_decimal_within_numeric_is_numeric() {
894 let s = bq(&RivetType::Decimal {
895 precision: 18,
896 scale: 2,
897 });
898 assert_eq!(s.target_type, "NUMERIC");
899 assert_eq!(s.status, TargetStatus::Ok);
900 }
901
902 #[test]
903 fn bq_decimal_escalates_to_bignumeric() {
904 let s = bq(&RivetType::Decimal {
905 precision: 38,
906 scale: 9,
907 });
908 assert_eq!(s.target_type, "BIGNUMERIC");
909 assert_eq!(s.status, TargetStatus::Ok);
910 }
911
912 #[test]
913 fn bq_decimal_negative_scale_fails() {
914 let s = bq(&RivetType::Decimal {
915 precision: 5,
916 scale: -2,
917 });
918 assert_eq!(s.status, TargetStatus::Fail);
919 }
920
921 #[test]
922 fn bq_uint64_recommends_numeric_warns_overflow() {
923 let s = bq(&RivetType::UInt64);
924 assert_eq!(s.target_type, "NUMERIC");
925 assert_eq!(s.autoload_type, "INT64");
926 assert_eq!(s.status, TargetStatus::Warn);
927 }
928
929 #[test]
930 fn bq_list_is_repeated_native_record_autoload() {
931 let t = RivetType::List {
932 inner: Box::new(RivetType::String),
933 };
934 let s = bq(&t);
935 assert_eq!(s.target_type, "REPEATED STRING");
936 assert!(s.autoload_type.contains("REPEATED RECORD"));
937 assert_eq!(s.status, TargetStatus::Warn);
938 }
939
940 #[test]
941 fn bq_unsupported_is_fail_row_not_panic() {
942 let t = RivetType::Unsupported {
943 native_type: "geometry".into(),
944 reason: "no mapping".into(),
945 };
946 let s = bq(&t);
947 assert_eq!(s.status, TargetStatus::Fail);
948 assert_eq!(s.target_type, "-");
949 }
950
951 #[test]
952 fn bq_standard_scalars_ok() {
953 for (rt, native) in [
954 (RivetType::Bool, "BOOL"),
955 (RivetType::Int64, "INT64"),
956 (RivetType::Float64, "FLOAT64"),
957 (RivetType::Date, "DATE"),
958 (RivetType::String, "STRING"),
959 (RivetType::Binary, "BYTES"),
960 (RivetType::Enum, "STRING"),
961 ] {
962 let s = bq(&rt);
963 assert_eq!(s.target_type, native, "{rt:?}");
964 assert_eq!(s.autoload_type, native, "{rt:?}");
965 assert_eq!(s.status, TargetStatus::Ok, "{rt:?}");
966 }
967 }
968
969 #[test]
972 fn duckdb_reads_everything_natively() {
973 let naive = RivetType::Timestamp {
974 unit: super::super::TimeUnit::Microsecond,
975 timezone: None,
976 };
977 for rt in [
978 RivetType::Json,
979 RivetType::Uuid,
980 RivetType::UInt64,
981 naive,
982 RivetType::List {
983 inner: Box::new(RivetType::Int64),
984 },
985 ] {
986 let s = duck(&rt);
987 assert_eq!(
988 s.target_type, s.autoload_type,
989 "DuckDB autoload must equal native for {rt:?}"
990 );
991 assert_ne!(s.status, TargetStatus::Fail, "{rt:?}");
992 }
993 }
994
995 #[test]
996 fn duckdb_native_type_names() {
997 assert_eq!(duck(&RivetType::Json).target_type, "JSON");
998 assert_eq!(duck(&RivetType::Uuid).target_type, "UUID");
999 assert_eq!(duck(&RivetType::UInt64).target_type, "UBIGINT");
1000 assert_eq!(
1001 duck(&RivetType::Decimal {
1002 precision: 18,
1003 scale: 2
1004 })
1005 .target_type,
1006 "DECIMAL(18,2)"
1007 );
1008 assert_eq!(
1009 duck(&RivetType::List {
1010 inner: Box::new(RivetType::Int64)
1011 })
1012 .target_type,
1013 "BIGINT[]"
1014 );
1015 }
1016
1017 #[test]
1018 fn parse_accepts_aliases() {
1019 assert_eq!(ExportTarget::parse("bq"), Some(ExportTarget::BigQuery));
1020 assert_eq!(
1021 ExportTarget::parse("BigQuery"),
1022 Some(ExportTarget::BigQuery)
1023 );
1024 assert_eq!(ExportTarget::parse("duckdb"), Some(ExportTarget::DuckDb));
1025 assert_eq!(ExportTarget::parse("nope"), None);
1026 }
1027
1028 #[test]
1029 fn resolve_table_preserves_order_and_names() {
1030 use super::super::SourceColumn;
1031 let mappings = vec![
1032 TypeMapping::from_source(&SourceColumn::simple("a", "int8", true), RivetType::Int64),
1033 TypeMapping::from_source(&SourceColumn::simple("b", "jsonb", true), RivetType::Json),
1034 ];
1035 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1036 assert_eq!(specs.len(), 2);
1037 assert_eq!(specs[0].column_name, "a");
1038 assert_eq!(specs[1].column_name, "b");
1039 assert_eq!(specs[1].target_type, "JSON");
1040 }
1041
1042 #[test]
1049 fn cast_sql_is_none_when_post_load_recovery_is_impossible() {
1050 let u = bq(&RivetType::UInt64);
1055 assert!(
1056 u.cast_sql.is_none(),
1057 "overflowed UINT64 has no lossless post-load recovery"
1058 );
1059 let note = u.note.unwrap().to_lowercase();
1060 assert!(
1061 note.contains("override"),
1062 "UINT64 note must point to the source-side override, got: {note}"
1063 );
1064 }
1065
1066 #[test]
1067 fn cast_sql_present_only_when_lossless_post_load() {
1068 assert!(
1071 bq(&RivetType::Json)
1072 .cast_sql
1073 .unwrap()
1074 .contains("PARSE_JSON")
1075 );
1076 assert!(bq(&RivetType::Uuid).cast_sql.unwrap().contains("TO_HEX"));
1077 let naive = RivetType::Timestamp {
1078 unit: super::super::TimeUnit::Microsecond,
1079 timezone: None,
1080 };
1081 assert!(bq(&naive).cast_sql.unwrap().contains("DATETIME"));
1082 }
1083
1084 #[test]
1085 fn every_divergence_offers_a_recovery_path() {
1086 let naive = RivetType::Timestamp {
1091 unit: super::super::TimeUnit::Microsecond,
1092 timezone: None,
1093 };
1094 let cases = [
1095 RivetType::Json,
1096 RivetType::Uuid,
1097 RivetType::UInt64,
1098 naive,
1099 RivetType::List {
1100 inner: Box::new(RivetType::String),
1101 },
1102 ];
1103 for rt in cases {
1104 let s = bq(&rt);
1105 assert_ne!(s.autoload_type, s.target_type, "case must diverge: {rt:?}");
1106 let has_cast = s.cast_sql.is_some();
1107 let note = s.note.as_deref().unwrap_or("").to_lowercase();
1108 let describes_recovery = note.contains("after load") || note.contains("override");
1109 assert!(
1110 has_cast || describes_recovery,
1111 "divergent {rt:?} must offer a recovery (cast_sql or a recovery note)"
1112 );
1113 }
1114 }
1115
1116 #[test]
1119 fn bq_decimal_limit_boundaries() {
1120 assert_eq!(
1122 bq(&RivetType::Decimal {
1123 precision: 76,
1124 scale: 38
1125 })
1126 .status,
1127 TargetStatus::Ok
1128 );
1129 assert_eq!(
1131 bq(&RivetType::Decimal {
1132 precision: 77,
1133 scale: 38
1134 })
1135 .status,
1136 TargetStatus::Fail
1137 );
1138 assert_eq!(
1140 bq(&RivetType::Decimal {
1141 precision: 76,
1142 scale: 39
1143 })
1144 .status,
1145 TargetStatus::Fail
1146 );
1147 assert_eq!(
1149 bq(&RivetType::Decimal {
1150 precision: 30,
1151 scale: 0
1152 })
1153 .target_type,
1154 "BIGNUMERIC"
1155 );
1156 }
1157
1158 #[test]
1159 fn duckdb_decimal_over_38_warns_not_silently_clamps() {
1160 let s = duck(&RivetType::Decimal {
1161 precision: 40,
1162 scale: 2,
1163 });
1164 assert_eq!(s.status, TargetStatus::Warn);
1165 }
1166
1167 #[test]
1170 fn bq_recovery_sql_casts_native_types() {
1171 use super::super::{SourceColumn, TimeUnit};
1172 let naive = RivetType::Timestamp {
1173 unit: TimeUnit::Microsecond,
1174 timezone: None,
1175 };
1176 let mappings = vec![
1177 TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1178 TypeMapping::from_source(
1179 &SourceColumn::simple("attrs", "jsonb", true),
1180 RivetType::Json,
1181 ),
1182 TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1183 TypeMapping::from_source(
1184 &SourceColumn::simple("created_at", "timestamp", true),
1185 naive,
1186 ),
1187 TypeMapping::from_source(
1188 &SourceColumn::simple("tags", "_text", true),
1189 RivetType::List {
1190 inner: Box::new(RivetType::String),
1191 },
1192 ),
1193 ];
1194 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1195 let sql = ExportTarget::BigQuery
1196 .recovery_sql(&specs, "payments")
1197 .expect("BigQuery has a recovery SQL");
1198 assert!(sql.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1201 assert!(sql.contains("TO_HEX(uid) AS uid"));
1202 assert!(sql.contains("DATETIME(created_at) AS created_at"));
1203 assert!(sql.contains("ARRAY(SELECT el.item FROM UNNEST(tags) AS el) AS tags"));
1206 assert!(sql.contains("--parquet_enable_list_inference"));
1207 assert!(sql.contains("SELECT\n id"));
1209 assert!(sql.contains("CREATE OR REPLACE TABLE `payments`"));
1211 assert!(sql.contains("FROM `payments__staging`"));
1212 }
1213
1214 #[test]
1215 fn duckdb_needs_no_recovery() {
1216 let mappings = vec![TypeMapping::from_source(
1217 &super::super::SourceColumn::simple("attrs", "json", true),
1218 RivetType::Json,
1219 )];
1220 let specs = ExportTarget::DuckDb.resolve_table(&mappings);
1221 assert!(
1222 ExportTarget::DuckDb.recovery_sql(&specs, "t").is_none(),
1223 "DuckDB autoloads every logical type natively — no recovery needed"
1224 );
1225 }
1226
1227 #[test]
1228 fn recovery_sql_projects_every_column_once_and_only_casts_divergent() {
1229 use super::super::{SourceColumn, TimeUnit};
1230 let naive = RivetType::Timestamp {
1231 unit: TimeUnit::Microsecond,
1232 timezone: None,
1233 };
1234 let cols: [(&str, RivetType); 6] = [
1235 ("id", RivetType::Int64), (
1237 "amount",
1238 RivetType::Decimal {
1239 precision: 18,
1240 scale: 2,
1241 },
1242 ), ("attrs", RivetType::Json), ("uid", RivetType::Uuid), ("created_at", naive), (
1247 "tags",
1248 RivetType::List {
1249 inner: Box::new(RivetType::String),
1250 },
1251 ), ];
1253 let mappings: Vec<_> = cols
1254 .iter()
1255 .cloned()
1256 .map(|(n, rt)| TypeMapping::from_source(&SourceColumn::simple(n, "x", true), rt))
1257 .collect();
1258 let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1259 let sql = ExportTarget::BigQuery.recovery_sql(&specs, "t").unwrap();
1260
1261 let body = sql
1264 .split("SELECT\n")
1265 .nth(1)
1266 .and_then(|s| s.split("\nFROM").next())
1267 .expect("recovery SQL has a SELECT … FROM body");
1268 assert_eq!(
1269 body.split(",\n").count(),
1270 cols.len(),
1271 "one projection per column, got:\n{body}"
1272 );
1273 for (name, _) in &cols {
1274 assert!(body.contains(name), "column {name} missing:\n{body}");
1275 }
1276 assert!(body.contains(" id,") && !body.contains("AS id"));
1279 assert!(body.contains(" amount,") && !body.contains("AS amount"));
1280 assert!(body.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1281 assert!(body.contains("TO_HEX(uid) AS uid"));
1282 assert!(body.contains("DATETIME(created_at) AS created_at"));
1283 assert!(body.contains("UNNEST(tags) AS el) AS tags"));
1284 }
1285
1286 #[test]
1289 fn snowflake_autoload_degradations_and_native_casts() {
1290 let j = sf(&RivetType::Json);
1292 assert_eq!(j.target_type, "VARIANT");
1293 assert_eq!(j.autoload_type, "TEXT");
1294 assert!(j.cast_sql.unwrap().starts_with("PARSE_JSON"));
1295 let u = sf(&RivetType::Uuid);
1297 assert_eq!(u.target_type, "TEXT");
1298 assert_eq!(u.autoload_type, "BINARY");
1299 assert!(u.cast_sql.unwrap().contains("HEX_ENCODE"));
1300 let naive = RivetType::Timestamp {
1302 unit: super::super::TimeUnit::Microsecond,
1303 timezone: None,
1304 };
1305 let t = sf(&naive);
1306 assert_eq!(t.target_type, "TIMESTAMP_NTZ");
1307 assert_eq!(t.autoload_type, "NUMBER(38,0)");
1308 assert!(t.cast_sql.unwrap().contains("TO_TIMESTAMP_NTZ"));
1309 let tm = sf(&RivetType::Time {
1311 unit: super::super::TimeUnit::Microsecond,
1312 });
1313 assert_eq!(tm.target_type, "TIME");
1314 assert!(tm.cast_sql.unwrap().contains("TIME_FROM_PARTS"));
1315 let d = sf(&RivetType::Decimal {
1317 precision: 18,
1318 scale: 2,
1319 });
1320 assert_eq!(d.target_type, "NUMBER(18,2)");
1321 assert!(d.cast_sql.is_none());
1322 let l = sf(&RivetType::List {
1324 inner: Box::new(RivetType::Int64),
1325 });
1326 assert_eq!(l.target_type, "ARRAY");
1327 assert_eq!(l.autoload_type, "VARIANT");
1328 assert!(l.cast_sql.unwrap().ends_with("::ARRAY"));
1329 }
1330
1331 #[test]
1332 fn snowflake_recovery_sql_quotes_columns_and_casts() {
1333 use super::super::{SourceColumn, TimeUnit};
1334 let naive = RivetType::Timestamp {
1335 unit: TimeUnit::Microsecond,
1336 timezone: None,
1337 };
1338 let mappings = vec![
1339 TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1340 TypeMapping::from_source(
1341 &SourceColumn::simple("attrs", "jsonb", true),
1342 RivetType::Json,
1343 ),
1344 TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1345 TypeMapping::from_source(
1346 &SourceColumn::simple("created_at", "timestamp", true),
1347 naive,
1348 ),
1349 ];
1350 let specs = ExportTarget::Snowflake.resolve_table(&mappings);
1351 let sql = ExportTarget::Snowflake.recovery_sql(&specs, "t").unwrap();
1352 assert!(sql.contains("\"id\" AS id"));
1354 assert!(sql.contains("PARSE_JSON(\"attrs\") AS attrs"));
1355 assert!(sql.contains("HEX_ENCODE(\"uid\")"));
1356 assert!(sql.contains("TO_TIMESTAMP_NTZ(\"created_at\", 6) AS created_at"));
1357 assert!(sql.contains("BINARY_AS_TEXT=FALSE"));
1359 assert!(sql.contains("MATCH_BY_COLUMN_NAME"));
1360 assert!(sql.contains("FROM t__staging"));
1361 }
1362
1363 #[test]
1364 fn parse_accepts_snowflake() {
1365 assert_eq!(
1366 ExportTarget::parse("snowflake"),
1367 Some(ExportTarget::Snowflake)
1368 );
1369 assert_eq!(ExportTarget::parse("sf"), Some(ExportTarget::Snowflake));
1370 }
1371
1372 #[test]
1373 fn parse_accepts_clickhouse() {
1374 assert_eq!(
1375 ExportTarget::parse("clickhouse"),
1376 Some(ExportTarget::ClickHouse)
1377 );
1378 assert_eq!(ExportTarget::parse("ch"), Some(ExportTarget::ClickHouse));
1379 }
1380
1381 #[test]
1382 fn valid_target_names_lists_every_parseable_target() {
1383 let names = ExportTarget::valid_target_names();
1387 assert!(names.contains("snowflake"), "got: {names}");
1388 assert!(names.contains("bigquery"), "got: {names}");
1389 assert!(names.contains("duckdb"), "got: {names}");
1390 assert!(names.contains("clickhouse"), "got: {names}");
1391 }
1392}