Skip to main content

rivet/types/
target.rs

1//! Target-type resolver (ADR-0014 L4, roadmap §16).
2//!
3//! Given a column's canonical [`RivetType`] and a runtime-chosen
4//! [`ExportTarget`], resolve what the column becomes in a downstream
5//! warehouse: the **native** target type (full fidelity), the **autoload**
6//! type a generic Parquet reader infers without a declared schema, a safety
7//! [`TargetStatus`], a note, and an optional materialization (`cast_sql` /
8//! load-schema hint).
9//!
10//! Design (locked in the type-support architecture review):
11//!
12//! - **Dispatch on `RivetType`, never the physical Arrow type.** The previous
13//!   `bq_compat` matched on `arrow::DataType` and so was blind to `json` /
14//!   `uuid` / `enum` (all `Utf8` / `FixedSizeBinary` by then) and hard-failed
15//!   UUID. The resolver keys off the semantic type; Arrow is consulted only
16//!   for decimal precision (and `RivetType::Decimal` already carries `p,s`).
17//! - **Total & infallible.** Every `(RivetType, ExportTarget)` pair yields a
18//!   populated [`TargetColumnSpec`]; an unmappable column is a `status: Fail`
19//!   row, never an `Err`. This keeps the type-report table and `--json` shape
20//!   stable.
21//! - **`autoload_type` tells the truth.** It encodes the *empirically
22//!   verified* behavior of each target's Parquet autoloader — notably that
23//!   BigQuery autoload degrades Parquet `JsonType`/`UUIDType` to `BYTES`,
24//!   `isAdjustedToUTC=false` timestamps to `TIMESTAMP` (not `DATETIME`), and
25//!   3-level lists to `REPEATED RECORD{item}`. DuckDB honors all of them.
26//!
27//! BigQuery numeric limits (as of 2025):
28//!   NUMERIC    — precision 1–29, scale 0–9
29//!   BIGNUMERIC — precision 1–76, scale 0–38
30
31use arrow::datatypes::DataType;
32use serde::Serialize;
33
34use super::{RivetType, TimeUnit, TypeFidelity, TypeMapping};
35
36/// A supported downstream warehouse target. Closed, in-tree, contract-tested
37/// set; chosen at runtime from `--target X`, one per run.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ExportTarget {
40    /// Reference consumer of Rivet Parquet — honors every native logical type
41    /// (JSON, UUID, decimal, list) on autoload.
42    DuckDb,
43    /// Cloud warehouse. Its Parquet autoloader is weaker than DuckDB's: see
44    /// the per-type `autoload_type` notes in [`bigquery`].
45    BigQuery,
46    /// Cloud warehouse. Like BigQuery, its Parquet autoload degrades JSON,
47    /// UUID, naive timestamps and TIME — see [`snowflake`]. Verified live.
48    Snowflake,
49}
50
51impl ExportTarget {
52    pub fn parse(s: &str) -> Option<Self> {
53        match s.to_lowercase().as_str() {
54            "bigquery" | "bq" => Some(Self::BigQuery),
55            "duckdb" | "duck" => Some(Self::DuckDb),
56            "snowflake" | "sf" => Some(Self::Snowflake),
57            _ => None,
58        }
59    }
60
61    pub fn label(self) -> &'static str {
62        match self {
63            Self::BigQuery => "bigquery",
64            Self::DuckDb => "duckdb",
65            Self::Snowflake => "snowflake",
66        }
67    }
68
69    /// Resolve one already-mapped column against this target.
70    pub fn resolve_column(self, input: TargetInput<'_>) -> TargetColumnSpec {
71        let mut spec = match self {
72            ExportTarget::BigQuery => bigquery::resolve(&input),
73            ExportTarget::DuckDb => duckdb::resolve(&input),
74            ExportTarget::Snowflake => snowflake::resolve(&input),
75        };
76        // Fidelity floor (ADR-0014 T6): the target status must not be rosier
77        // than the source fidelity warrants — a lossy/unsupported source column
78        // can never resolve to a clean `Ok`.
79        if input.fidelity.is_unsafe_for_strict_mode() && spec.status == TargetStatus::Ok {
80            spec.status = TargetStatus::Warn;
81        }
82        spec
83    }
84
85    /// Resolve a whole table's worth of columns, one spec per column in order.
86    /// Consumed today by the type-report's recovery-SQL emission
87    /// (`preflight::type_report`); `plan-load` (ADR-0014 Phase B) would be a
88    /// second consumer.
89    pub fn resolve_table(self, mappings: &[TypeMapping]) -> Vec<TargetColumnSpec> {
90        mappings
91            .iter()
92            .map(|m| self.resolve_column(TargetInput::from(m)))
93            .collect()
94    }
95
96    /// SQL that recovers target-native types after a bare autoload degraded
97    /// them (ADR-0014 L5). For BigQuery this is a `CREATE TABLE … AS SELECT`
98    /// over the autoloaded staging table applying per-column casts — BigQuery's
99    /// Parquet loader will NOT coerce a *declared* native type on load (verified
100    /// against live BQ: BYTES→JSON, TIMESTAMP→DATETIME loads are rejected), so
101    /// the recovery has to be a post-load transform, not a load schema. `None`
102    /// when the target reads the interchange Parquet faithfully (DuckDB).
103    pub fn recovery_sql(self, specs: &[TargetColumnSpec], table: &str) -> Option<String> {
104        match self {
105            ExportTarget::BigQuery => Some(bigquery_recovery_sql(specs, table)),
106            ExportTarget::Snowflake => Some(snowflake_recovery_sql(specs, table)),
107            ExportTarget::DuckDb => None,
108        }
109    }
110}
111
112/// Status of a column's resolution against a specific target.
113#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
114#[serde(rename_all = "snake_case")]
115pub enum TargetStatus {
116    Ok,
117    Warn,
118    Fail,
119}
120
121impl TargetStatus {
122    pub fn label(&self) -> &'static str {
123        match self {
124            Self::Ok => "ok",
125            Self::Warn => "warn",
126            Self::Fail => "fail",
127        }
128    }
129}
130
131/// The borrowed subset a resolver is allowed to read. Built from a
132/// [`TypeMapping`] via [`From`]. Dispatch is on `rivet_type`; `arrow_type` is
133/// retained for callers that want it but the resolver reads precision from
134/// `RivetType::Decimal` directly.
135#[derive(Debug, Clone, Copy)]
136pub struct TargetInput<'a> {
137    pub column_name: &'a str,
138    pub rivet_type: &'a RivetType,
139    /// Retained for callers and future precision-sensitive targets; the
140    /// resolver reads precision from `RivetType::Decimal` directly today.
141    #[allow(dead_code)]
142    pub arrow_type: Option<&'a DataType>,
143    pub fidelity: TypeFidelity,
144}
145
146impl<'a> From<&'a TypeMapping> for TargetInput<'a> {
147    fn from(m: &'a TypeMapping) -> Self {
148        TargetInput {
149            column_name: &m.column_name,
150            rivet_type: &m.rivet_type,
151            arrow_type: m.arrow_type.as_ref(),
152            fidelity: m.fidelity,
153        }
154    }
155}
156
157/// One column's per-target materialization spec (ADR-0014 L4). Uniform across
158/// targets so the type-report table and `--json` stay stable; an unmappable
159/// column is a `status: Fail` row, not an error.
160#[derive(Debug, Clone, Serialize)]
161pub struct TargetColumnSpec {
162    /// Name copied through so a `Vec<TargetColumnSpec>` is self-describing.
163    pub column_name: String,
164    /// Native warehouse type for full fidelity, e.g. "JSON", "UBIGINT", "NUMERIC".
165    pub target_type: String,
166    /// Type a generic Parquet reader infers without a declared schema. May
167    /// differ from `target_type` (e.g. BigQuery autoloads JSON as "BYTES").
168    pub autoload_type: String,
169    pub status: TargetStatus,
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub note: Option<String>,
172    /// Materialization snippet / load-schema hint (L5) to recover the native
173    /// type when autoload diverges. `None` when `autoload_type == target_type`.
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub cast_sql: Option<String>,
176}
177
178/// Internal per-type resolution result, before the column name and cast
179/// substitution are applied.
180struct Resolved {
181    target_type: String,
182    autoload_type: String,
183    status: TargetStatus,
184    note: Option<String>,
185    /// `cast_sql` template with a `{col}` placeholder, or `None`.
186    cast: Option<String>,
187}
188
189impl Resolved {
190    fn ok(t: impl Into<String>) -> Self {
191        let t = t.into();
192        Self {
193            autoload_type: t.clone(),
194            target_type: t,
195            status: TargetStatus::Ok,
196            note: None,
197            cast: None,
198        }
199    }
200    /// Native type that *autoloads as something else* — the divergence the
201    /// resolver exists to surface.
202    fn diverge(
203        native: impl Into<String>,
204        autoload: impl Into<String>,
205        note: impl Into<String>,
206        cast: Option<&str>,
207    ) -> Self {
208        Self {
209            target_type: native.into(),
210            autoload_type: autoload.into(),
211            status: TargetStatus::Warn,
212            note: Some(note.into()),
213            cast: cast.map(str::to_string),
214        }
215    }
216    fn warn(t: impl Into<String>, note: impl Into<String>) -> Self {
217        let t = t.into();
218        Self {
219            autoload_type: t.clone(),
220            target_type: t,
221            status: TargetStatus::Warn,
222            note: Some(note.into()),
223            cast: None,
224        }
225    }
226    fn fail(note: impl Into<String>) -> Self {
227        Self {
228            target_type: "-".into(),
229            autoload_type: "-".into(),
230            status: TargetStatus::Fail,
231            note: Some(note.into()),
232            cast: None,
233        }
234    }
235    fn into_spec(self, input: &TargetInput<'_>) -> TargetColumnSpec {
236        TargetColumnSpec {
237            column_name: input.column_name.to_string(),
238            target_type: self.target_type,
239            autoload_type: self.autoload_type,
240            status: self.status,
241            note: self.note,
242            cast_sql: self.cast.map(|t| t.replace("{col}", input.column_name)),
243        }
244    }
245}
246
247fn unsupported_reason(t: &RivetType) -> String {
248    match t {
249        RivetType::Unsupported { reason, .. } => reason.clone(),
250        _ => "no target mapping".into(),
251    }
252}
253
254/// Emit a BigQuery type-recovery statement (ADR-0014 L5): load the interchange
255/// Parquet with `--autodetect` into `<table>__staging`, then run this CTAS to
256/// materialise the native types that bare autoload degrades (JSON/UUID→BYTES,
257/// naive timestamp→TIMESTAMP). Columns with a `cast_sql` get that cast; the
258/// rest pass through unchanged.
259///
260/// Verified against live BigQuery: a load schema that *declares* native types
261/// is rejected (the Parquet loader won't coerce a column's type on load), so
262/// the recovery must be this post-load transform.
263/// The recovery `SELECT` body, shared by every degrading target. The cast
264/// branch *is* the materialization contract (ADR-0014 L5) and is identical
265/// across targets — only the passthrough form (identifier quoting, alias)
266/// differs, so each target supplies that as `passthrough`. Deleting this and
267/// inlining the fold would re-scatter the cast logic across N targets.
268fn recovery_projection(specs: &[TargetColumnSpec], passthrough: impl Fn(&str) -> String) -> String {
269    specs
270        .iter()
271        .map(|s| match &s.cast_sql {
272            Some(cast) => format!("  {cast} AS {name}", name = s.column_name),
273            None => passthrough(&s.column_name),
274        })
275        .collect::<Vec<_>>()
276        .join(",\n")
277}
278
279fn bigquery_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
280    let cols = recovery_projection(specs, |name| format!("  {name}"));
281    format!(
282        "-- 1) bq load --autodetect --parquet_enable_list_inference \
283         --source_format=PARQUET {table}__staging <parquet>\n\
284         -- 2) recover native types:\n\
285         CREATE OR REPLACE TABLE `{table}` AS\n\
286         SELECT\n{cols}\n\
287         FROM `{table}__staging`;"
288    )
289}
290
291/// Emit a Snowflake type-recovery script (ADR-0014 L5). Snowflake's Parquet
292/// autoload degrades JSON→TEXT, UUID→BINARY, naive timestamp→NUMBER (µs) and
293/// TIME→NUMBER, so the recovery is a post-load CTAS over a `MATCH_BY_COLUMN_NAME`
294/// staging table. INFER_SCHEMA emits lowercase, case-sensitive names, so every
295/// reference is double-quoted. Verified live (2026-06-01) via the `snow` CLI.
296fn snowflake_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
297    let cols = recovery_projection(specs, |name| format!("  \"{name}\" AS {name}"));
298    format!(
299        "-- 1) ALTER SESSION SET TIMEZONE='UTC';\n\
300         -- 2) CREATE OR REPLACE FILE FORMAT rivet_pq TYPE=PARQUET BINARY_AS_TEXT=FALSE;\n\
301         -- 3) PUT file://<parquet> @<stage> AUTO_COMPRESS=FALSE;\n\
302         -- 4) CREATE OR REPLACE TABLE {table}__staging USING TEMPLATE (SELECT ARRAY_AGG(\n\
303         --      OBJECT_CONSTRUCT(*)) FROM TABLE(INFER_SCHEMA(LOCATION=>'@<stage>', FILE_FORMAT=>'rivet_pq')));\n\
304         --    COPY INTO {table}__staging FROM @<stage> FILE_FORMAT=(FORMAT_NAME='rivet_pq') MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;\n\
305         -- 5) recover native types:\n\
306         CREATE OR REPLACE TABLE {table} AS\n\
307         SELECT\n{cols}\n\
308         FROM {table}__staging;"
309    )
310}
311
312// ── BigQuery ─────────────────────────────────────────────────────────────────
313
314mod bigquery {
315    use super::*;
316
317    /// BigQuery NUMERIC precision/scale limits.
318    const NUMERIC_MAX_P: u8 = 29;
319    const NUMERIC_MAX_S: i8 = 9;
320    /// BigQuery BIGNUMERIC precision/scale limits.
321    const BIGNUMERIC_MAX_P: u8 = 76;
322    const BIGNUMERIC_MAX_S: i8 = 38;
323
324    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
325        native(input.rivet_type).into_spec(input)
326    }
327
328    fn native(t: &RivetType) -> Resolved {
329        match t {
330            RivetType::Bool => Resolved::ok("BOOL"),
331            RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("INT64"),
332            // u64 > i64::MAX overflows the INT64 autoload and cannot be
333            // recovered post-load (the bits are already wrong). The only fix is
334            // source-side: map the column to decimal(20,0) with a column
335            // override so it rides as Parquet DECIMAL → BigQuery NUMERIC.
336            RivetType::UInt64 => Resolved::diverge(
337                "NUMERIC",
338                "INT64",
339                "UINT64 > INT64_MAX overflows the INT64 autoload and cannot be recovered after \
340                 load — map the column to decimal(20,0) with a source column override",
341                None,
342            ),
343            RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT64"),
344            RivetType::Decimal { precision, scale } => decimal(*precision, *scale),
345            RivetType::Date => Resolved::ok("DATE"),
346            RivetType::Time { .. } => Resolved::ok("TIME"),
347            // tz-aware timestamp → instant → TIMESTAMP, autoloads cleanly.
348            RivetType::Timestamp {
349                timezone: Some(_), ..
350            } => Resolved::ok("TIMESTAMP"),
351            // Nanosecond naive timestamp (the `timestamp_ns` override, e.g. SQL
352            // Server datetime2(7)) has no BigQuery native temporal type: the loader
353            // does not recognise the Parquet TIMESTAMP(NANOS) logical type and
354            // autoloads the raw INT64 nanoseconds-since-epoch (verified live
355            // 2026-06-07 — lossless as an integer). A native TIMESTAMP needs
356            // TIMESTAMP_MICROS(DIV(col,1000)), which drops the sub-µs digits
357            // (BigQuery TIMESTAMP is microsecond), so no lossless temporal cast
358            // exists — keep INT64, or use the default `timestamp` for BigQuery.
359            RivetType::Timestamp {
360                unit: TimeUnit::Nanosecond,
361                timezone: None,
362            } => Resolved::diverge(
363                "INT64",
364                "INT64",
365                "nanosecond timestamp has no BigQuery native type — autoloads as INT64 (raw \
366                 nanos, lossless); a native TIMESTAMP via TIMESTAMP_MICROS(DIV(col,1000)) drops \
367                 sub-µs precision. Prefer `timestamp` (microsecond) for BigQuery targets.",
368                None,
369            ),
370            // naive timestamp → wall-clock → DATETIME, but BigQuery autoload
371            // ignores Parquet isAdjustedToUTC=false and yields TIMESTAMP
372            // (verified). `DATETIME(ts)` recovers the wall-clock after load.
373            RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
374                "DATETIME",
375                "TIMESTAMP",
376                "naive timestamp autoloads as TIMESTAMP (an instant); recover wall-clock with \
377                 DATETIME(col) after load",
378                Some("DATETIME({col})"),
379            ),
380            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("STRING"),
381            RivetType::Binary => Resolved::ok("BYTES"),
382            // Parquet JSON logical type autoloads as BYTES in BigQuery
383            // (verified). Declare JSON in the load schema for native JSON.
384            RivetType::Json => Resolved::diverge(
385                "JSON",
386                "BYTES",
387                "Parquet JSON logical type autoloads as BYTES in BigQuery; recover native JSON \
388                 with PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(col)) after load",
389                Some("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING({col}))"),
390            ),
391            // UUID rides as FixedSizeBinary(16) + UUIDType; BigQuery has no UUID
392            // type and autoloads it as 16-byte BYTES (verified). Native is
393            // STRING (canonical text).
394            RivetType::Uuid => Resolved::diverge(
395                "STRING",
396                "BYTES",
397                "UUID autoloads as 16-byte BYTES in BigQuery; recover hex text with TO_HEX(col) \
398                 after load (or keep BYTES)",
399                Some("TO_HEX({col})"),
400            ),
401            RivetType::Interval => Resolved::ok("STRING"),
402            RivetType::List { inner } => list(inner),
403            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
404        }
405    }
406
407    fn decimal(p: u8, s: i8) -> Resolved {
408        if s < 0 {
409            return Resolved::fail(format!(
410                "BigQuery has no negative scale; decimal({p},{s}) needs a STRING/INT64 cast"
411            ));
412        }
413        let native = if p <= NUMERIC_MAX_P && s <= NUMERIC_MAX_S {
414            "NUMERIC"
415        } else if p <= BIGNUMERIC_MAX_P && s <= BIGNUMERIC_MAX_S {
416            "BIGNUMERIC"
417        } else {
418            return Resolved::fail(format!(
419                "decimal({p},{s}) exceeds BigQuery BIGNUMERIC limits (max 76,38)"
420            ));
421        };
422        Resolved::ok(native)
423    }
424
425    fn list(inner: &RivetType) -> Resolved {
426        let inner_r = native(inner);
427        if inner_r.status == TargetStatus::Fail {
428            return Resolved::fail(format!(
429                "REPEATED of unsupported element: {}",
430                inner_r.target_type
431            ));
432        }
433        // Rivet writes the Parquet list element as `item` (arrow-rs default,
434        // not the spec's `element`), so BigQuery loads arrays as
435        // REPEATED RECORD{item} even with `--parquet_enable_list_inference`
436        // (without the flag they nest one level deeper as RECORD{list:...}).
437        // Verified live: flattening with `UNNEST(col)` + `el.item` recovers a
438        // clean REPEATED scalar.
439        Resolved::diverge(
440            format!("REPEATED {}", inner_r.target_type),
441            format!("REPEATED RECORD{{item {}}}", inner_r.autoload_type),
442            "arrays load as REPEATED RECORD{item}; load the staging table with \
443             --parquet_enable_list_inference, then flatten with UNNEST after load",
444            Some("ARRAY(SELECT el.item FROM UNNEST({col}) AS el)"),
445        )
446    }
447}
448
449// ── DuckDB ───────────────────────────────────────────────────────────────────
450
451mod duckdb {
452    use super::*;
453
454    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
455        native(input.rivet_type).into_spec(input)
456    }
457
458    /// DuckDB honors every native Parquet logical type Rivet writes, so
459    /// `autoload_type == target_type` for all supported variants (verified).
460    fn native(t: &RivetType) -> Resolved {
461        match t {
462            RivetType::Bool => Resolved::ok("BOOLEAN"),
463            RivetType::Int16 => Resolved::ok("SMALLINT"),
464            RivetType::Int32 => Resolved::ok("INTEGER"),
465            RivetType::Int64 => Resolved::ok("BIGINT"),
466            RivetType::UInt64 => Resolved::ok("UBIGINT"),
467            RivetType::Float32 => Resolved::ok("FLOAT"),
468            RivetType::Float64 => Resolved::ok("DOUBLE"),
469            RivetType::Decimal { precision, scale } => {
470                if *scale < 0 {
471                    Resolved::warn(
472                        "DECIMAL",
473                        format!(
474                            "DuckDB has no negative scale; decimal({precision},{scale}) loads via cast"
475                        ),
476                    )
477                } else if *precision <= 38 {
478                    Resolved::ok(format!("DECIMAL({precision},{scale})"))
479                } else {
480                    // DuckDB DECIMAL maxes at precision 38; wider rides as HUGEINT/DOUBLE.
481                    Resolved::warn(
482                        "DECIMAL(38,*)",
483                        format!("decimal({precision},{scale}) exceeds DuckDB DECIMAL(38); widens"),
484                    )
485                }
486            }
487            RivetType::Date => Resolved::ok("DATE"),
488            RivetType::Time { .. } => Resolved::ok("TIME"),
489            RivetType::Timestamp {
490                timezone: Some(_), ..
491            } => Resolved::ok("TIMESTAMPTZ"),
492            // DuckDB has a native nanosecond timestamp; the `timestamp_ns` override
493            // round-trips losslessly (verified live 2026-06-07).
494            RivetType::Timestamp {
495                unit: TimeUnit::Nanosecond,
496                timezone: None,
497            } => Resolved::ok("TIMESTAMP_NS"),
498            RivetType::Timestamp { timezone: None, .. } => Resolved::ok("TIMESTAMP"),
499            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("VARCHAR"),
500            RivetType::Binary => Resolved::ok("BLOB"),
501            RivetType::Json => Resolved::ok("JSON"),
502            RivetType::Uuid => Resolved::ok("UUID"),
503            RivetType::Interval => Resolved::ok("INTERVAL"),
504            RivetType::List { inner } => {
505                let inner_r = native(inner);
506                if inner_r.status == TargetStatus::Fail {
507                    Resolved::fail(format!(
508                        "LIST of unsupported element: {}",
509                        inner_r.target_type
510                    ))
511                } else {
512                    Resolved::ok(format!("{}[]", inner_r.target_type))
513                }
514            }
515            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
516        }
517    }
518}
519
520// ── Snowflake ────────────────────────────────────────────────────────────────
521
522mod snowflake {
523    use super::*;
524
525    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
526        native(input.rivet_type).into_spec(input)
527    }
528
529    /// Snowflake autoload (INFER_SCHEMA / COPY) + recovery casts — verified live
530    /// (2026-06-01). Needs `BINARY_AS_TEXT=FALSE` in the file format; cast column
531    /// refs are double-quoted because INFER_SCHEMA names are lowercase and
532    /// case-sensitive.
533    fn native(t: &RivetType) -> Resolved {
534        match t {
535            RivetType::Bool => Resolved::ok("BOOLEAN"),
536            RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("NUMBER(38,0)"),
537            // u64 > INT64_MAX overflows the Parquet read; fix at source.
538            RivetType::UInt64 => Resolved::diverge(
539                "NUMBER(20,0)",
540                "NUMBER(38,0)",
541                "UINT64 > INT64_MAX overflows the Parquet read; map to decimal(20,0) at source",
542                None,
543            ),
544            RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT"),
545            RivetType::Decimal { precision, scale } => {
546                if *scale < 0 {
547                    Resolved::warn(
548                        "NUMBER",
549                        format!(
550                            "Snowflake NUMBER has no negative scale; decimal({precision},{scale}) loads via cast"
551                        ),
552                    )
553                } else {
554                    Resolved::ok(format!("NUMBER({precision},{scale})"))
555                }
556            }
557            RivetType::Date => Resolved::ok("DATE"),
558            // TIME autoloads as NUMBER (µs of day); rebuild with TIME_FROM_PARTS.
559            RivetType::Time { .. } => Resolved::diverge(
560                "TIME",
561                "NUMBER(38,0)",
562                "TIME autoloads as NUMBER (µs of day); recover with TIME_FROM_PARTS after load",
563                Some(r#"TIME_FROM_PARTS(0,0,FLOOR("{col}"/1000000),MOD("{col}",1000000)*1000)"#),
564            ),
565            // tz timestamp lands as TIMESTAMP_NTZ holding the UTC instant.
566            RivetType::Timestamp {
567                timezone: Some(_), ..
568            } => Resolved::diverge(
569                "TIMESTAMP_TZ",
570                "TIMESTAMP_NTZ",
571                "tz timestamp autoloads as TIMESTAMP_NTZ — ALTER SESSION SET TIMEZONE='UTC' before COPY so the instant matches",
572                None,
573            ),
574            // Nanosecond naive timestamp autoloads as NUMBER (ns since epoch),
575            // like the µs case but at scale 9. Snowflake TIMESTAMP_NTZ holds full
576            // 9-digit precision, so TO_TIMESTAMP_NTZ(col, 9) recovers it
577            // losslessly — verified live 2026-06-07 (NUMBER 1717761600123456700 →
578            // 2024-06-07 12:00:00.123456700, the 7th digit intact).
579            RivetType::Timestamp {
580                unit: TimeUnit::Nanosecond,
581                timezone: None,
582            } => Resolved::diverge(
583                "TIMESTAMP_NTZ",
584                "NUMBER(38,0)",
585                "nanosecond timestamp autoloads as NUMBER (ns since epoch); recover with \
586                 TO_TIMESTAMP_NTZ(col, 9) after load — Snowflake TIMESTAMP_NTZ holds full ns precision",
587                Some(r#"TO_TIMESTAMP_NTZ("{col}", 9)"#),
588            ),
589            // naive timestamp autoloads as NUMBER (µs since epoch).
590            RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
591                "TIMESTAMP_NTZ",
592                "NUMBER(38,0)",
593                "naive timestamp autoloads as NUMBER (µs since epoch); recover with TO_TIMESTAMP_NTZ after load",
594                Some(r#"TO_TIMESTAMP_NTZ("{col}", 6)"#),
595            ),
596            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("TEXT"),
597            // bytea/blob needs BINARY_AS_TEXT=FALSE or non-UTF8 bytes fail.
598            RivetType::Binary => Resolved::warn(
599                "BINARY",
600                "set BINARY_AS_TEXT=FALSE in the Parquet FILE FORMAT or non-UTF8 bytes fail to load",
601            ),
602            // JSON autoloads as TEXT; PARSE_JSON recovers native VARIANT.
603            RivetType::Json => Resolved::diverge(
604                "VARIANT",
605                "TEXT",
606                "JSON autoloads as TEXT; recover native VARIANT with PARSE_JSON after load",
607                Some(r#"PARSE_JSON("{col}")"#),
608            ),
609            // UUID (FixedSizeBinary 16) autoloads as 16-byte BINARY.
610            RivetType::Uuid => Resolved::diverge(
611                "TEXT",
612                "BINARY",
613                "UUID autoloads as 16-byte BINARY; recover canonical text with HEX_ENCODE + REGEXP after load",
614                Some(
615                    r#"REGEXP_REPLACE(LOWER(HEX_ENCODE("{col}")),'^(.{8})(.{4})(.{4})(.{4})(.{12})$','\\1-\\2-\\3-\\4-\\5')"#,
616                ),
617            ),
618            RivetType::Interval => Resolved::ok("TEXT"),
619            // A Parquet list autoloads as VARIANT (holding the JSON array), not
620            // native ARRAY — verified live 2026-06-01: INFER_SCHEMA reports
621            // VARIANT for both `tags` (text[]) and `nums` (int[]). Recover the
622            // native ARRAY with `::ARRAY` after load.
623            RivetType::List { inner } => {
624                let inner_r = native(inner);
625                if inner_r.status == TargetStatus::Fail {
626                    Resolved::fail(format!(
627                        "ARRAY of unsupported element: {}",
628                        inner_r.target_type
629                    ))
630                } else {
631                    Resolved::diverge(
632                        "ARRAY",
633                        "VARIANT",
634                        "list autoloads as VARIANT (the JSON array); recover native ARRAY with ::ARRAY after load",
635                        Some(r#""{col}"::ARRAY"#),
636                    )
637                }
638            }
639            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
640        }
641    }
642}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647
648    fn input<'a>(rt: &'a RivetType) -> TargetInput<'a> {
649        TargetInput {
650            column_name: "c",
651            rivet_type: rt,
652            arrow_type: None,
653            fidelity: TypeFidelity::Exact,
654        }
655    }
656
657    fn bq(rt: &RivetType) -> TargetColumnSpec {
658        ExportTarget::BigQuery.resolve_column(input(rt))
659    }
660    fn duck(rt: &RivetType) -> TargetColumnSpec {
661        ExportTarget::DuckDb.resolve_column(input(rt))
662    }
663    fn sf(rt: &RivetType) -> TargetColumnSpec {
664        ExportTarget::Snowflake.resolve_column(input(rt))
665    }
666
667    // ── nanosecond timestamp (`timestamp_ns` override) per-target autoload ────
668    // The default `timestamp` is microsecond; ns is opt-in (datetime2(7)). These
669    // pin the autoload truth verified live on 2026-06-07 so the preflight report
670    // doesn't claim the microsecond behaviour for a ns column.
671
672    #[test]
673    fn bq_nanosecond_timestamp_autoloads_as_int64() {
674        let ns = RivetType::Timestamp {
675            unit: super::super::TimeUnit::Nanosecond,
676            timezone: None,
677        };
678        let s = bq(&ns);
679        assert_eq!(s.target_type, "INT64");
680        assert_eq!(s.autoload_type, "INT64");
681        assert_eq!(s.status, TargetStatus::Warn);
682        // No lossless temporal recovery (ns→µs is lossy) — like the UINT64 case.
683        assert!(s.cast_sql.is_none(), "ns→BQ has no lossless temporal cast");
684    }
685
686    #[test]
687    fn duckdb_nanosecond_timestamp_is_native_timestamp_ns() {
688        let ns = RivetType::Timestamp {
689            unit: super::super::TimeUnit::Nanosecond,
690            timezone: None,
691        };
692        let s = duck(&ns);
693        assert_eq!(s.target_type, "TIMESTAMP_NS");
694        assert_eq!(s.status, TargetStatus::Ok);
695    }
696
697    #[test]
698    fn snowflake_nanosecond_timestamp_recovers_losslessly_at_scale_9() {
699        let ns = RivetType::Timestamp {
700            unit: super::super::TimeUnit::Nanosecond,
701            timezone: None,
702        };
703        let s = sf(&ns);
704        assert_eq!(s.target_type, "TIMESTAMP_NTZ");
705        assert_eq!(s.autoload_type, "NUMBER(38,0)");
706        // Lossless recovery (TIMESTAMP_NTZ holds 9 digits) → cast_sql is Some at
707        // scale 9 (not the µs scale 6); `{col}` is substituted with the column.
708        assert_eq!(s.cast_sql.as_deref(), Some(r#"TO_TIMESTAMP_NTZ("c", 9)"#));
709    }
710
711    // ── dispatch on RivetType, not Arrow — the headline fix ──────────────────
712
713    #[test]
714    fn bq_uuid_resolves_not_fails() {
715        // The old arrow-dispatch `bq_compat` hard-failed UUID (FixedSizeBinary
716        // had no arm). Now it resolves on RivetType: native STRING, BYTES on
717        // autoload.
718        let s = bq(&RivetType::Uuid);
719        assert_eq!(s.target_type, "STRING");
720        assert_eq!(s.autoload_type, "BYTES");
721        assert_eq!(s.status, TargetStatus::Warn);
722        assert!(s.cast_sql.unwrap().contains("c"));
723    }
724
725    #[test]
726    fn bq_json_native_is_json_autoload_is_bytes() {
727        let s = bq(&RivetType::Json);
728        assert_eq!(s.target_type, "JSON");
729        assert_eq!(s.autoload_type, "BYTES");
730        assert_eq!(s.status, TargetStatus::Warn);
731        assert!(s.cast_sql.unwrap().starts_with("PARSE_JSON"));
732    }
733
734    #[test]
735    fn bq_naive_timestamp_is_datetime_native_timestamp_autoload() {
736        let naive = RivetType::Timestamp {
737            unit: super::super::TimeUnit::Microsecond,
738            timezone: None,
739        };
740        let s = bq(&naive);
741        assert_eq!(s.target_type, "DATETIME");
742        assert_eq!(s.autoload_type, "TIMESTAMP");
743        assert_eq!(s.status, TargetStatus::Warn);
744    }
745
746    #[test]
747    fn bq_tz_timestamp_is_timestamp_ok() {
748        let tz = RivetType::Timestamp {
749            unit: super::super::TimeUnit::Microsecond,
750            timezone: Some("UTC".into()),
751        };
752        let s = bq(&tz);
753        assert_eq!(s.target_type, "TIMESTAMP");
754        assert_eq!(s.autoload_type, "TIMESTAMP");
755        assert_eq!(s.status, TargetStatus::Ok);
756    }
757
758    #[test]
759    fn bq_decimal_within_numeric_is_numeric() {
760        let s = bq(&RivetType::Decimal {
761            precision: 18,
762            scale: 2,
763        });
764        assert_eq!(s.target_type, "NUMERIC");
765        assert_eq!(s.status, TargetStatus::Ok);
766    }
767
768    #[test]
769    fn bq_decimal_escalates_to_bignumeric() {
770        let s = bq(&RivetType::Decimal {
771            precision: 38,
772            scale: 9,
773        });
774        assert_eq!(s.target_type, "BIGNUMERIC");
775        assert_eq!(s.status, TargetStatus::Ok);
776    }
777
778    #[test]
779    fn bq_decimal_negative_scale_fails() {
780        let s = bq(&RivetType::Decimal {
781            precision: 5,
782            scale: -2,
783        });
784        assert_eq!(s.status, TargetStatus::Fail);
785    }
786
787    #[test]
788    fn bq_uint64_recommends_numeric_warns_overflow() {
789        let s = bq(&RivetType::UInt64);
790        assert_eq!(s.target_type, "NUMERIC");
791        assert_eq!(s.autoload_type, "INT64");
792        assert_eq!(s.status, TargetStatus::Warn);
793    }
794
795    #[test]
796    fn bq_list_is_repeated_native_record_autoload() {
797        let t = RivetType::List {
798            inner: Box::new(RivetType::String),
799        };
800        let s = bq(&t);
801        assert_eq!(s.target_type, "REPEATED STRING");
802        assert!(s.autoload_type.contains("REPEATED RECORD"));
803        assert_eq!(s.status, TargetStatus::Warn);
804    }
805
806    #[test]
807    fn bq_unsupported_is_fail_row_not_panic() {
808        let t = RivetType::Unsupported {
809            native_type: "geometry".into(),
810            reason: "no mapping".into(),
811        };
812        let s = bq(&t);
813        assert_eq!(s.status, TargetStatus::Fail);
814        assert_eq!(s.target_type, "-");
815    }
816
817    #[test]
818    fn bq_standard_scalars_ok() {
819        for (rt, native) in [
820            (RivetType::Bool, "BOOL"),
821            (RivetType::Int64, "INT64"),
822            (RivetType::Float64, "FLOAT64"),
823            (RivetType::Date, "DATE"),
824            (RivetType::String, "STRING"),
825            (RivetType::Binary, "BYTES"),
826            (RivetType::Enum, "STRING"),
827        ] {
828            let s = bq(&rt);
829            assert_eq!(s.target_type, native, "{rt:?}");
830            assert_eq!(s.autoload_type, native, "{rt:?}");
831            assert_eq!(s.status, TargetStatus::Ok, "{rt:?}");
832        }
833    }
834
835    // ── DuckDB honors every logical type — autoload == native ────────────────
836
837    #[test]
838    fn duckdb_reads_everything_natively() {
839        let naive = RivetType::Timestamp {
840            unit: super::super::TimeUnit::Microsecond,
841            timezone: None,
842        };
843        for rt in [
844            RivetType::Json,
845            RivetType::Uuid,
846            RivetType::UInt64,
847            naive,
848            RivetType::List {
849                inner: Box::new(RivetType::Int64),
850            },
851        ] {
852            let s = duck(&rt);
853            assert_eq!(
854                s.target_type, s.autoload_type,
855                "DuckDB autoload must equal native for {rt:?}"
856            );
857            assert_ne!(s.status, TargetStatus::Fail, "{rt:?}");
858        }
859    }
860
861    #[test]
862    fn duckdb_native_type_names() {
863        assert_eq!(duck(&RivetType::Json).target_type, "JSON");
864        assert_eq!(duck(&RivetType::Uuid).target_type, "UUID");
865        assert_eq!(duck(&RivetType::UInt64).target_type, "UBIGINT");
866        assert_eq!(
867            duck(&RivetType::Decimal {
868                precision: 18,
869                scale: 2
870            })
871            .target_type,
872            "DECIMAL(18,2)"
873        );
874        assert_eq!(
875            duck(&RivetType::List {
876                inner: Box::new(RivetType::Int64)
877            })
878            .target_type,
879            "BIGINT[]"
880        );
881    }
882
883    #[test]
884    fn parse_accepts_aliases() {
885        assert_eq!(ExportTarget::parse("bq"), Some(ExportTarget::BigQuery));
886        assert_eq!(
887            ExportTarget::parse("BigQuery"),
888            Some(ExportTarget::BigQuery)
889        );
890        assert_eq!(ExportTarget::parse("duckdb"), Some(ExportTarget::DuckDb));
891        assert_eq!(ExportTarget::parse("nope"), None);
892    }
893
894    #[test]
895    fn resolve_table_preserves_order_and_names() {
896        use super::super::SourceColumn;
897        let mappings = vec![
898            TypeMapping::from_source(&SourceColumn::simple("a", "int8", true), RivetType::Int64),
899            TypeMapping::from_source(&SourceColumn::simple("b", "jsonb", true), RivetType::Json),
900        ];
901        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
902        assert_eq!(specs.len(), 2);
903        assert_eq!(specs[0].column_name, "a");
904        assert_eq!(specs[1].column_name, "b");
905        assert_eq!(specs[1].target_type, "JSON");
906    }
907
908    // ── edge cases: remediation hints must recover from the DEGRADED state ────
909    // Regression guard for the bug class where the resolver proposes a post-load
910    // cast that cannot actually recover an already-lossy value (e.g. a UINT64
911    // that overflowed into INT64). See CLAUDE.md "Remediation hints must recover
912    // from the degraded state".
913
914    #[test]
915    fn cast_sql_is_none_when_post_load_recovery_is_impossible() {
916        // UINT64 > INT64_MAX has already overflowed by the time it autoloads as
917        // INT64; a SELECT-time cast would operate on corrupted bits. The only
918        // fix is source-side (a decimal override) — cast_sql MUST be None and
919        // the note must point there, not promise a post-load cast.
920        let u = bq(&RivetType::UInt64);
921        assert!(
922            u.cast_sql.is_none(),
923            "overflowed UINT64 has no lossless post-load recovery"
924        );
925        let note = u.note.unwrap().to_lowercase();
926        assert!(
927            note.contains("override"),
928            "UINT64 note must point to the source-side override, got: {note}"
929        );
930    }
931
932    #[test]
933    fn cast_sql_present_only_when_lossless_post_load() {
934        // JSON/UUID/naive-timestamp autoload to a degraded type but still hold
935        // the value losslessly, so a post-load cast genuinely recovers it.
936        assert!(
937            bq(&RivetType::Json)
938                .cast_sql
939                .unwrap()
940                .contains("PARSE_JSON")
941        );
942        assert!(bq(&RivetType::Uuid).cast_sql.unwrap().contains("TO_HEX"));
943        let naive = RivetType::Timestamp {
944            unit: super::super::TimeUnit::Microsecond,
945            timezone: None,
946        };
947        assert!(bq(&naive).cast_sql.unwrap().contains("DATETIME"));
948    }
949
950    #[test]
951    fn every_divergence_offers_a_recovery_path() {
952        // Invariant: whenever BigQuery autoload diverges from the native type the
953        // operator gets SOME recovery — a lossless post-load `cast_sql`, or a
954        // note describing the fix (post-load transform, or a source override).
955        // Never a silent no-op (the bug class).
956        let naive = RivetType::Timestamp {
957            unit: super::super::TimeUnit::Microsecond,
958            timezone: None,
959        };
960        let cases = [
961            RivetType::Json,
962            RivetType::Uuid,
963            RivetType::UInt64,
964            naive,
965            RivetType::List {
966                inner: Box::new(RivetType::String),
967            },
968        ];
969        for rt in cases {
970            let s = bq(&rt);
971            assert_ne!(s.autoload_type, s.target_type, "case must diverge: {rt:?}");
972            let has_cast = s.cast_sql.is_some();
973            let note = s.note.as_deref().unwrap_or("").to_lowercase();
974            let describes_recovery = note.contains("after load") || note.contains("override");
975            assert!(
976                has_cast || describes_recovery,
977                "divergent {rt:?} must offer a recovery (cast_sql or a recovery note)"
978            );
979        }
980    }
981
982    // ── edge cases: decimal precision/scale overflow at the target boundary ───
983
984    #[test]
985    fn bq_decimal_limit_boundaries() {
986        // Exact BIGNUMERIC ceiling is ok.
987        assert_eq!(
988            bq(&RivetType::Decimal {
989                precision: 76,
990                scale: 38
991            })
992            .status,
993            TargetStatus::Ok
994        );
995        // One past precision overflows BIGNUMERIC → Fail, never a silent clamp.
996        assert_eq!(
997            bq(&RivetType::Decimal {
998                precision: 77,
999                scale: 38
1000            })
1001            .status,
1002            TargetStatus::Fail
1003        );
1004        // One past scale → Fail.
1005        assert_eq!(
1006            bq(&RivetType::Decimal {
1007                precision: 76,
1008                scale: 39
1009            })
1010            .status,
1011            TargetStatus::Fail
1012        );
1013        // Between NUMERIC and BIGNUMERIC escalates rather than overflowing NUMERIC.
1014        assert_eq!(
1015            bq(&RivetType::Decimal {
1016                precision: 30,
1017                scale: 0
1018            })
1019            .target_type,
1020            "BIGNUMERIC"
1021        );
1022    }
1023
1024    #[test]
1025    fn duckdb_decimal_over_38_warns_not_silently_clamps() {
1026        let s = duck(&RivetType::Decimal {
1027            precision: 40,
1028            scale: 2,
1029        });
1030        assert_eq!(s.status, TargetStatus::Warn);
1031    }
1032
1033    // ── L5 recovery SQL (the post-load transform for BigQuery autoload) ───────
1034
1035    #[test]
1036    fn bq_recovery_sql_casts_native_types() {
1037        use super::super::{SourceColumn, TimeUnit};
1038        let naive = RivetType::Timestamp {
1039            unit: TimeUnit::Microsecond,
1040            timezone: None,
1041        };
1042        let mappings = vec![
1043            TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1044            TypeMapping::from_source(
1045                &SourceColumn::simple("attrs", "jsonb", true),
1046                RivetType::Json,
1047            ),
1048            TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1049            TypeMapping::from_source(
1050                &SourceColumn::simple("created_at", "timestamp", true),
1051                naive,
1052            ),
1053            TypeMapping::from_source(
1054                &SourceColumn::simple("tags", "_text", true),
1055                RivetType::List {
1056                    inner: Box::new(RivetType::String),
1057                },
1058            ),
1059        ];
1060        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1061        let sql = ExportTarget::BigQuery
1062            .recovery_sql(&specs, "payments")
1063            .expect("BigQuery has a recovery SQL");
1064        // The post-load casts that actually recover native types (verified live
1065        // against BigQuery — a declared-type load is rejected, a cast is not).
1066        assert!(sql.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1067        assert!(sql.contains("TO_HEX(uid) AS uid"));
1068        assert!(sql.contains("DATETIME(created_at) AS created_at"));
1069        // Arrays flatten via UNNEST (verified live; staging loaded with
1070        // --parquet_enable_list_inference).
1071        assert!(sql.contains("ARRAY(SELECT el.item FROM UNNEST(tags) AS el) AS tags"));
1072        assert!(sql.contains("--parquet_enable_list_inference"));
1073        // OK columns pass through unchanged.
1074        assert!(sql.contains("SELECT\n  id"));
1075        // Reads the autoload staging table, writes the recovered table.
1076        assert!(sql.contains("CREATE OR REPLACE TABLE `payments`"));
1077        assert!(sql.contains("FROM `payments__staging`"));
1078    }
1079
1080    #[test]
1081    fn duckdb_needs_no_recovery() {
1082        let mappings = vec![TypeMapping::from_source(
1083            &super::super::SourceColumn::simple("attrs", "json", true),
1084            RivetType::Json,
1085        )];
1086        let specs = ExportTarget::DuckDb.resolve_table(&mappings);
1087        assert!(
1088            ExportTarget::DuckDb.recovery_sql(&specs, "t").is_none(),
1089            "DuckDB autoloads every logical type natively — no recovery needed"
1090        );
1091    }
1092
1093    #[test]
1094    fn recovery_sql_projects_every_column_once_and_only_casts_divergent() {
1095        use super::super::{SourceColumn, TimeUnit};
1096        let naive = RivetType::Timestamp {
1097            unit: TimeUnit::Microsecond,
1098            timezone: None,
1099        };
1100        let cols: [(&str, RivetType); 6] = [
1101            ("id", RivetType::Int64), // ok → passthrough
1102            (
1103                "amount",
1104                RivetType::Decimal {
1105                    precision: 18,
1106                    scale: 2,
1107                },
1108            ), // ok → passthrough
1109            ("attrs", RivetType::Json), // divergent → cast
1110            ("uid", RivetType::Uuid), // divergent → cast
1111            ("created_at", naive),    // divergent → cast
1112            (
1113                "tags",
1114                RivetType::List {
1115                    inner: Box::new(RivetType::String),
1116                },
1117            ), // divergent → cast
1118        ];
1119        let mappings: Vec<_> = cols
1120            .iter()
1121            .cloned()
1122            .map(|(n, rt)| TypeMapping::from_source(&SourceColumn::simple(n, "x", true), rt))
1123            .collect();
1124        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1125        let sql = ExportTarget::BigQuery.recovery_sql(&specs, "t").unwrap();
1126
1127        // The SELECT projects exactly one item per input column — nothing dropped,
1128        // nothing duplicated.
1129        let body = sql
1130            .split("SELECT\n")
1131            .nth(1)
1132            .and_then(|s| s.split("\nFROM").next())
1133            .expect("recovery SQL has a SELECT … FROM body");
1134        assert_eq!(
1135            body.split(",\n").count(),
1136            cols.len(),
1137            "one projection per column, got:\n{body}"
1138        );
1139        for (name, _) in &cols {
1140            assert!(body.contains(name), "column {name} missing:\n{body}");
1141        }
1142        // OK columns pass through unchanged (bare `  name,`); divergent ones
1143        // carry their cast (`… AS name`).
1144        assert!(body.contains("  id,") && !body.contains("AS id"));
1145        assert!(body.contains("  amount,") && !body.contains("AS amount"));
1146        assert!(body.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1147        assert!(body.contains("TO_HEX(uid) AS uid"));
1148        assert!(body.contains("DATETIME(created_at) AS created_at"));
1149        assert!(body.contains("UNNEST(tags) AS el) AS tags"));
1150    }
1151
1152    // ── Snowflake (verified live 2026-06-01) ─────────────────────────────────
1153
1154    #[test]
1155    fn snowflake_autoload_degradations_and_native_casts() {
1156        // JSON → TEXT autoload / VARIANT native, recover PARSE_JSON.
1157        let j = sf(&RivetType::Json);
1158        assert_eq!(j.target_type, "VARIANT");
1159        assert_eq!(j.autoload_type, "TEXT");
1160        assert!(j.cast_sql.unwrap().starts_with("PARSE_JSON"));
1161        // UUID → BINARY autoload / TEXT native, recover via HEX_ENCODE + REGEXP.
1162        let u = sf(&RivetType::Uuid);
1163        assert_eq!(u.target_type, "TEXT");
1164        assert_eq!(u.autoload_type, "BINARY");
1165        assert!(u.cast_sql.unwrap().contains("HEX_ENCODE"));
1166        // naive timestamp → NUMBER autoload / TIMESTAMP_NTZ native.
1167        let naive = RivetType::Timestamp {
1168            unit: super::super::TimeUnit::Microsecond,
1169            timezone: None,
1170        };
1171        let t = sf(&naive);
1172        assert_eq!(t.target_type, "TIMESTAMP_NTZ");
1173        assert_eq!(t.autoload_type, "NUMBER(38,0)");
1174        assert!(t.cast_sql.unwrap().contains("TO_TIMESTAMP_NTZ"));
1175        // TIME → NUMBER autoload, recover TIME_FROM_PARTS.
1176        let tm = sf(&RivetType::Time {
1177            unit: super::super::TimeUnit::Microsecond,
1178        });
1179        assert_eq!(tm.target_type, "TIME");
1180        assert!(tm.cast_sql.unwrap().contains("TIME_FROM_PARTS"));
1181        // decimal is native NUMBER(p,s) — no cast.
1182        let d = sf(&RivetType::Decimal {
1183            precision: 18,
1184            scale: 2,
1185        });
1186        assert_eq!(d.target_type, "NUMBER(18,2)");
1187        assert!(d.cast_sql.is_none());
1188        // list autoloads as VARIANT (verified live), recover native ARRAY with ::ARRAY.
1189        let l = sf(&RivetType::List {
1190            inner: Box::new(RivetType::Int64),
1191        });
1192        assert_eq!(l.target_type, "ARRAY");
1193        assert_eq!(l.autoload_type, "VARIANT");
1194        assert!(l.cast_sql.unwrap().ends_with("::ARRAY"));
1195    }
1196
1197    #[test]
1198    fn snowflake_recovery_sql_quotes_columns_and_casts() {
1199        use super::super::{SourceColumn, TimeUnit};
1200        let naive = RivetType::Timestamp {
1201            unit: TimeUnit::Microsecond,
1202            timezone: None,
1203        };
1204        let mappings = vec![
1205            TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1206            TypeMapping::from_source(
1207                &SourceColumn::simple("attrs", "jsonb", true),
1208                RivetType::Json,
1209            ),
1210            TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1211            TypeMapping::from_source(
1212                &SourceColumn::simple("created_at", "timestamp", true),
1213                naive,
1214            ),
1215        ];
1216        let specs = ExportTarget::Snowflake.resolve_table(&mappings);
1217        let sql = ExportTarget::Snowflake.recovery_sql(&specs, "t").unwrap();
1218        // Staging columns are lowercase + quoted; passthrough quotes the source.
1219        assert!(sql.contains("\"id\" AS id"));
1220        assert!(sql.contains("PARSE_JSON(\"attrs\") AS attrs"));
1221        assert!(sql.contains("HEX_ENCODE(\"uid\")"));
1222        assert!(sql.contains("TO_TIMESTAMP_NTZ(\"created_at\", 6) AS created_at"));
1223        // The load preamble the recovery depends on.
1224        assert!(sql.contains("BINARY_AS_TEXT=FALSE"));
1225        assert!(sql.contains("MATCH_BY_COLUMN_NAME"));
1226        assert!(sql.contains("FROM t__staging"));
1227    }
1228
1229    #[test]
1230    fn parse_accepts_snowflake() {
1231        assert_eq!(
1232            ExportTarget::parse("snowflake"),
1233            Some(ExportTarget::Snowflake)
1234        );
1235        assert_eq!(ExportTarget::parse("sf"), Some(ExportTarget::Snowflake));
1236    }
1237}