Skip to main content

rivet/types/
target.rs

1//! Target-type resolver (ADR-0014 L4, roadmap §16).
2//!
3//! Given a column's canonical [`RivetType`] and a runtime-chosen
4//! [`ExportTarget`], resolve what the column becomes in a downstream
5//! warehouse: the **native** target type (full fidelity), the **autoload**
6//! type a generic Parquet reader infers without a declared schema, a safety
7//! [`TargetStatus`], a note, and an optional materialization (`cast_sql` /
8//! load-schema hint).
9//!
10//! Design (locked in the type-support architecture review):
11//!
12//! - **Dispatch on `RivetType`, never the physical Arrow type.** The previous
13//!   `bq_compat` matched on `arrow::DataType` and so was blind to `json` /
14//!   `uuid` / `enum` (all `Utf8` / `FixedSizeBinary` by then) and hard-failed
15//!   UUID. The resolver keys off the semantic type; Arrow is consulted only
16//!   for decimal precision (and `RivetType::Decimal` already carries `p,s`).
17//! - **Total & infallible.** Every `(RivetType, ExportTarget)` pair yields a
18//!   populated [`TargetColumnSpec`]; an unmappable column is a `status: Fail`
19//!   row, never an `Err`. This keeps the type-report table and `--json` shape
20//!   stable.
21//! - **`autoload_type` tells the truth.** It encodes the *empirically
22//!   verified* behavior of each target's Parquet autoloader — notably that
23//!   BigQuery autoload degrades Parquet `JsonType`/`UUIDType` to `BYTES`,
24//!   `isAdjustedToUTC=false` timestamps to `TIMESTAMP` (not `DATETIME`), and
25//!   3-level lists to `REPEATED RECORD{item}`. DuckDB honors all of them.
26//!
27//! BigQuery numeric limits (as of 2025):
28//!   NUMERIC    — precision 1–29, scale 0–9
29//!   BIGNUMERIC — precision 1–76, scale 0–38
30
31use arrow::datatypes::DataType;
32use serde::Serialize;
33
34use super::{RivetType, TimeUnit, TypeFidelity, TypeMapping};
35
36/// A supported downstream warehouse target. Closed, in-tree, contract-tested
37/// set; chosen at runtime from `--target X`, one per run.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ExportTarget {
40    /// Reference consumer of Rivet Parquet — honors every native logical type
41    /// (JSON, UUID, decimal, list) on autoload.
42    DuckDb,
43    /// Cloud warehouse. Its Parquet autoloader is weaker than DuckDB's: see
44    /// the per-type `autoload_type` notes in [`bigquery`].
45    BigQuery,
46    /// Cloud warehouse. Like BigQuery, its Parquet autoload degrades JSON,
47    /// UUID, naive timestamps and TIME — see [`snowflake`]. Verified live.
48    Snowflake,
49}
50
51impl ExportTarget {
52    pub fn parse(s: &str) -> Option<Self> {
53        match s.to_lowercase().as_str() {
54            "bigquery" | "bq" => Some(Self::BigQuery),
55            "duckdb" | "duck" => Some(Self::DuckDb),
56            "snowflake" | "sf" => Some(Self::Snowflake),
57            _ => None,
58        }
59    }
60
61    /// Human-readable list of every spelling [`parse`](Self::parse) accepts,
62    /// for "unknown target" error messages. Single source so the message can't
63    /// drift from `parse` (it once said "bigquery, duckdb" and missed
64    /// snowflake). Aliases are shown in parens after each canonical name.
65    pub fn valid_target_names() -> &'static str {
66        "bigquery (bq), duckdb (duck), snowflake (sf)"
67    }
68
69    pub fn label(self) -> &'static str {
70        match self {
71            Self::BigQuery => "bigquery",
72            Self::DuckDb => "duckdb",
73            Self::Snowflake => "snowflake",
74        }
75    }
76
77    /// Resolve one already-mapped column against this target.
78    pub fn resolve_column(self, input: TargetInput<'_>) -> TargetColumnSpec {
79        let mut spec = match self {
80            ExportTarget::BigQuery => bigquery::resolve(&input),
81            ExportTarget::DuckDb => duckdb::resolve(&input),
82            ExportTarget::Snowflake => snowflake::resolve(&input),
83        };
84        // Fidelity floor (ADR-0014 T6): the target status must not be rosier
85        // than the source fidelity warrants — a lossy/unsupported source column
86        // can never resolve to a clean `Ok`.
87        if input.fidelity.is_unsafe_for_strict_mode() && spec.status == TargetStatus::Ok {
88            spec.status = TargetStatus::Warn;
89        }
90        spec
91    }
92
93    /// Resolve a whole table's worth of columns, one spec per column in order.
94    /// Consumed today by the type-report's recovery-SQL emission
95    /// (`preflight::type_report`); `plan-load` (ADR-0014 Phase B) would be a
96    /// second consumer.
97    pub fn resolve_table(self, mappings: &[TypeMapping]) -> Vec<TargetColumnSpec> {
98        mappings
99            .iter()
100            .map(|m| self.resolve_column(TargetInput::from(m)))
101            .collect()
102    }
103
104    /// SQL that recovers target-native types after a bare autoload degraded
105    /// them (ADR-0014 L5). For BigQuery this is a `CREATE TABLE … AS SELECT`
106    /// over the autoloaded staging table applying per-column casts — BigQuery's
107    /// Parquet loader will NOT coerce a *declared* native type on load (verified
108    /// against live BQ: BYTES→JSON, TIMESTAMP→DATETIME loads are rejected), so
109    /// the recovery has to be a post-load transform, not a load schema. `None`
110    /// when the target reads the interchange Parquet faithfully (DuckDB).
111    pub fn recovery_sql(self, specs: &[TargetColumnSpec], table: &str) -> Option<String> {
112        match self {
113            ExportTarget::BigQuery => Some(bigquery_recovery_sql(specs, table)),
114            ExportTarget::Snowflake => Some(snowflake_recovery_sql(specs, table)),
115            ExportTarget::DuckDb => None,
116        }
117    }
118}
119
120/// Status of a column's resolution against a specific target.
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
122#[serde(rename_all = "snake_case")]
123pub enum TargetStatus {
124    Ok,
125    Warn,
126    Fail,
127}
128
129impl TargetStatus {
130    pub fn label(&self) -> &'static str {
131        match self {
132            Self::Ok => "ok",
133            Self::Warn => "warn",
134            Self::Fail => "fail",
135        }
136    }
137}
138
139/// The borrowed subset a resolver is allowed to read. Built from a
140/// [`TypeMapping`] via [`From`]. Dispatch is on `rivet_type`; `arrow_type` is
141/// retained for callers that want it but the resolver reads precision from
142/// `RivetType::Decimal` directly.
143#[derive(Debug, Clone, Copy)]
144pub struct TargetInput<'a> {
145    pub column_name: &'a str,
146    pub rivet_type: &'a RivetType,
147    /// Retained for callers and future precision-sensitive targets; the
148    /// resolver reads precision from `RivetType::Decimal` directly today.
149    #[allow(dead_code)]
150    pub arrow_type: Option<&'a DataType>,
151    pub fidelity: TypeFidelity,
152}
153
154impl<'a> From<&'a TypeMapping> for TargetInput<'a> {
155    fn from(m: &'a TypeMapping) -> Self {
156        TargetInput {
157            column_name: &m.column_name,
158            rivet_type: &m.rivet_type,
159            arrow_type: m.arrow_type.as_ref(),
160            fidelity: m.fidelity,
161        }
162    }
163}
164
165/// One column's per-target materialization spec (ADR-0014 L4). Uniform across
166/// targets so the type-report table and `--json` stay stable; an unmappable
167/// column is a `status: Fail` row, not an error.
168#[derive(Debug, Clone, Serialize)]
169pub struct TargetColumnSpec {
170    /// Name copied through so a `Vec<TargetColumnSpec>` is self-describing.
171    pub column_name: String,
172    /// Native warehouse type for full fidelity, e.g. "JSON", "UBIGINT", "NUMERIC".
173    pub target_type: String,
174    /// Type a generic Parquet reader infers without a declared schema. May
175    /// differ from `target_type` (e.g. BigQuery autoloads JSON as "BYTES").
176    pub autoload_type: String,
177    pub status: TargetStatus,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub note: Option<String>,
180    /// Materialization snippet / load-schema hint (L5) to recover the native
181    /// type when autoload diverges. `None` when `autoload_type == target_type`.
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub cast_sql: Option<String>,
184}
185
186/// Internal per-type resolution result, before the column name and cast
187/// substitution are applied.
188struct Resolved {
189    target_type: String,
190    autoload_type: String,
191    status: TargetStatus,
192    note: Option<String>,
193    /// `cast_sql` template with a `{col}` placeholder, or `None`.
194    cast: Option<String>,
195}
196
197impl Resolved {
198    fn ok(t: impl Into<String>) -> Self {
199        let t = t.into();
200        Self {
201            autoload_type: t.clone(),
202            target_type: t,
203            status: TargetStatus::Ok,
204            note: None,
205            cast: None,
206        }
207    }
208    /// Native type that *autoloads as something else* — the divergence the
209    /// resolver exists to surface.
210    fn diverge(
211        native: impl Into<String>,
212        autoload: impl Into<String>,
213        note: impl Into<String>,
214        cast: Option<&str>,
215    ) -> Self {
216        Self {
217            target_type: native.into(),
218            autoload_type: autoload.into(),
219            status: TargetStatus::Warn,
220            note: Some(note.into()),
221            cast: cast.map(str::to_string),
222        }
223    }
224    fn warn(t: impl Into<String>, note: impl Into<String>) -> Self {
225        let t = t.into();
226        Self {
227            autoload_type: t.clone(),
228            target_type: t,
229            status: TargetStatus::Warn,
230            note: Some(note.into()),
231            cast: None,
232        }
233    }
234    fn fail(note: impl Into<String>) -> Self {
235        Self {
236            target_type: "-".into(),
237            autoload_type: "-".into(),
238            status: TargetStatus::Fail,
239            note: Some(note.into()),
240            cast: None,
241        }
242    }
243    fn into_spec(self, input: &TargetInput<'_>) -> TargetColumnSpec {
244        TargetColumnSpec {
245            column_name: input.column_name.to_string(),
246            target_type: self.target_type,
247            autoload_type: self.autoload_type,
248            status: self.status,
249            note: self.note,
250            cast_sql: self.cast.map(|t| t.replace("{col}", input.column_name)),
251        }
252    }
253}
254
255fn unsupported_reason(t: &RivetType) -> String {
256    match t {
257        RivetType::Unsupported { reason, .. } => reason.clone(),
258        _ => "no target mapping".into(),
259    }
260}
261
262/// Emit a BigQuery type-recovery statement (ADR-0014 L5): load the interchange
263/// Parquet with `--autodetect` into `<table>__staging`, then run this CTAS to
264/// materialise the native types that bare autoload degrades (JSON/UUID→BYTES,
265/// naive timestamp→TIMESTAMP). Columns with a `cast_sql` get that cast; the
266/// rest pass through unchanged.
267///
268/// Verified against live BigQuery: a load schema that *declares* native types
269/// is rejected (the Parquet loader won't coerce a column's type on load), so
270/// the recovery must be this post-load transform.
271/// The recovery `SELECT` body, shared by every degrading target. The cast
272/// branch *is* the materialization contract (ADR-0014 L5) and is identical
273/// across targets — only the passthrough form (identifier quoting, alias)
274/// differs, so each target supplies that as `passthrough`. Deleting this and
275/// inlining the fold would re-scatter the cast logic across N targets.
276fn recovery_projection(specs: &[TargetColumnSpec], passthrough: impl Fn(&str) -> String) -> String {
277    specs
278        .iter()
279        .map(|s| match &s.cast_sql {
280            Some(cast) => format!("  {cast} AS {name}", name = s.column_name),
281            None => passthrough(&s.column_name),
282        })
283        .collect::<Vec<_>>()
284        .join(",\n")
285}
286
287fn bigquery_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
288    let cols = recovery_projection(specs, |name| format!("  {name}"));
289    format!(
290        "-- 1) bq load --autodetect --parquet_enable_list_inference \
291         --source_format=PARQUET {table}__staging <parquet>\n\
292         -- 2) recover native types:\n\
293         CREATE OR REPLACE TABLE `{table}` AS\n\
294         SELECT\n{cols}\n\
295         FROM `{table}__staging`;"
296    )
297}
298
299/// Emit a Snowflake type-recovery script (ADR-0014 L5). Snowflake's Parquet
300/// autoload degrades JSON→TEXT, UUID→BINARY, naive timestamp→NUMBER (µs) and
301/// TIME→NUMBER, so the recovery is a post-load CTAS over a `MATCH_BY_COLUMN_NAME`
302/// staging table. INFER_SCHEMA emits lowercase, case-sensitive names, so every
303/// reference is double-quoted. Verified live (2026-06-01) via the `snow` CLI.
304fn snowflake_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
305    let cols = recovery_projection(specs, |name| format!("  \"{name}\" AS {name}"));
306    format!(
307        "-- 1) ALTER SESSION SET TIMEZONE='UTC';\n\
308         -- 2) CREATE OR REPLACE FILE FORMAT rivet_pq TYPE=PARQUET BINARY_AS_TEXT=FALSE;\n\
309         -- 3) PUT file://<parquet> @<stage> AUTO_COMPRESS=FALSE;\n\
310         -- 4) CREATE OR REPLACE TABLE {table}__staging USING TEMPLATE (SELECT ARRAY_AGG(\n\
311         --      OBJECT_CONSTRUCT(*)) FROM TABLE(INFER_SCHEMA(LOCATION=>'@<stage>', FILE_FORMAT=>'rivet_pq')));\n\
312         --    COPY INTO {table}__staging FROM @<stage> FILE_FORMAT=(FORMAT_NAME='rivet_pq') MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;\n\
313         -- 5) recover native types:\n\
314         CREATE OR REPLACE TABLE {table} AS\n\
315         SELECT\n{cols}\n\
316         FROM {table}__staging;"
317    )
318}
319
320// ── BigQuery ─────────────────────────────────────────────────────────────────
321
322mod bigquery {
323    use super::*;
324
325    /// BigQuery NUMERIC precision/scale limits.
326    const NUMERIC_MAX_P: u8 = 29;
327    const NUMERIC_MAX_S: i8 = 9;
328    /// BigQuery BIGNUMERIC precision/scale limits.
329    const BIGNUMERIC_MAX_P: u8 = 76;
330    const BIGNUMERIC_MAX_S: i8 = 38;
331
332    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
333        native(input.rivet_type).into_spec(input)
334    }
335
336    fn native(t: &RivetType) -> Resolved {
337        match t {
338            RivetType::Bool => Resolved::ok("BOOL"),
339            RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("INT64"),
340            // u64 > i64::MAX overflows the INT64 autoload and cannot be
341            // recovered post-load (the bits are already wrong). The only fix is
342            // source-side: map the column to decimal(20,0) with a column
343            // override so it rides as Parquet DECIMAL → BigQuery NUMERIC.
344            RivetType::UInt64 => Resolved::diverge(
345                "NUMERIC",
346                "INT64",
347                "UINT64 > INT64_MAX overflows the INT64 autoload and cannot be recovered after \
348                 load — map the column to decimal(20,0) with a source column override",
349                None,
350            ),
351            RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT64"),
352            RivetType::Decimal { precision, scale } => decimal(*precision, *scale),
353            RivetType::Date => Resolved::ok("DATE"),
354            RivetType::Time { .. } => Resolved::ok("TIME"),
355            // tz-aware timestamp → instant → TIMESTAMP, autoloads cleanly.
356            RivetType::Timestamp {
357                timezone: Some(_), ..
358            } => Resolved::ok("TIMESTAMP"),
359            // Nanosecond naive timestamp (the `timestamp_ns` override, e.g. SQL
360            // Server datetime2(7)) has no BigQuery native temporal type: the loader
361            // does not recognise the Parquet TIMESTAMP(NANOS) logical type and
362            // autoloads the raw INT64 nanoseconds-since-epoch (verified live
363            // 2026-06-07 — lossless as an integer). A native TIMESTAMP needs
364            // TIMESTAMP_MICROS(DIV(col,1000)), which drops the sub-µs digits
365            // (BigQuery TIMESTAMP is microsecond), so no lossless temporal cast
366            // exists — keep INT64, or use the default `timestamp` for BigQuery.
367            RivetType::Timestamp {
368                unit: TimeUnit::Nanosecond,
369                timezone: None,
370            } => Resolved::diverge(
371                "INT64",
372                "INT64",
373                "nanosecond timestamp has no BigQuery native type — autoloads as INT64 (raw \
374                 nanos, lossless); a native TIMESTAMP via TIMESTAMP_MICROS(DIV(col,1000)) drops \
375                 sub-µs precision. Prefer `timestamp` (microsecond) for BigQuery targets.",
376                None,
377            ),
378            // naive timestamp → wall-clock → DATETIME, but BigQuery autoload
379            // ignores Parquet isAdjustedToUTC=false and yields TIMESTAMP
380            // (verified). `DATETIME(ts)` recovers the wall-clock after load.
381            RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
382                "DATETIME",
383                "TIMESTAMP",
384                "naive timestamp autoloads as TIMESTAMP (an instant); recover wall-clock with \
385                 DATETIME(col) after load",
386                Some("DATETIME({col})"),
387            ),
388            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("STRING"),
389            RivetType::Binary => Resolved::ok("BYTES"),
390            // Parquet JSON logical type autoloads as BYTES in BigQuery
391            // (verified). Declare JSON in the load schema for native JSON.
392            RivetType::Json => Resolved::diverge(
393                "JSON",
394                "BYTES",
395                "Parquet JSON logical type autoloads as BYTES in BigQuery; recover native JSON \
396                 with PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(col)) after load",
397                Some("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING({col}))"),
398            ),
399            // UUID rides as FixedSizeBinary(16) + UUIDType; BigQuery has no UUID
400            // type and autoloads it as 16-byte BYTES (verified). Native is
401            // STRING (canonical text).
402            RivetType::Uuid => Resolved::diverge(
403                "STRING",
404                "BYTES",
405                "UUID autoloads as 16-byte BYTES in BigQuery; recover hex text with TO_HEX(col) \
406                 after load (or keep BYTES)",
407                Some("TO_HEX({col})"),
408            ),
409            RivetType::Interval => Resolved::ok("STRING"),
410            RivetType::List { inner } => list(inner),
411            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
412        }
413    }
414
415    fn decimal(p: u8, s: i8) -> Resolved {
416        if s < 0 {
417            return Resolved::fail(format!(
418                "BigQuery has no negative scale; decimal({p},{s}) needs a STRING/INT64 cast"
419            ));
420        }
421        let native = if p <= NUMERIC_MAX_P && s <= NUMERIC_MAX_S {
422            "NUMERIC"
423        } else if p <= BIGNUMERIC_MAX_P && s <= BIGNUMERIC_MAX_S {
424            "BIGNUMERIC"
425        } else {
426            return Resolved::fail(format!(
427                "decimal({p},{s}) exceeds BigQuery BIGNUMERIC limits (max 76,38)"
428            ));
429        };
430        Resolved::ok(native)
431    }
432
433    fn list(inner: &RivetType) -> Resolved {
434        let inner_r = native(inner);
435        if inner_r.status == TargetStatus::Fail {
436            return Resolved::fail(format!(
437                "REPEATED of unsupported element: {}",
438                inner_r.target_type
439            ));
440        }
441        // Rivet writes the Parquet list element as `item` (arrow-rs default,
442        // not the spec's `element`), so BigQuery loads arrays as
443        // REPEATED RECORD{item} even with `--parquet_enable_list_inference`
444        // (without the flag they nest one level deeper as RECORD{list:...}).
445        // Verified live: flattening with `UNNEST(col)` + `el.item` recovers a
446        // clean REPEATED scalar.
447        Resolved::diverge(
448            format!("REPEATED {}", inner_r.target_type),
449            format!("REPEATED RECORD{{item {}}}", inner_r.autoload_type),
450            "arrays load as REPEATED RECORD{item}; load the staging table with \
451             --parquet_enable_list_inference, then flatten with UNNEST after load",
452            Some("ARRAY(SELECT el.item FROM UNNEST({col}) AS el)"),
453        )
454    }
455}
456
457// ── DuckDB ───────────────────────────────────────────────────────────────────
458
459mod duckdb {
460    use super::*;
461
462    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
463        native(input.rivet_type).into_spec(input)
464    }
465
466    /// DuckDB honors every native Parquet logical type Rivet writes, so
467    /// `autoload_type == target_type` for all supported variants (verified).
468    fn native(t: &RivetType) -> Resolved {
469        match t {
470            RivetType::Bool => Resolved::ok("BOOLEAN"),
471            RivetType::Int16 => Resolved::ok("SMALLINT"),
472            RivetType::Int32 => Resolved::ok("INTEGER"),
473            RivetType::Int64 => Resolved::ok("BIGINT"),
474            RivetType::UInt64 => Resolved::ok("UBIGINT"),
475            RivetType::Float32 => Resolved::ok("FLOAT"),
476            RivetType::Float64 => Resolved::ok("DOUBLE"),
477            RivetType::Decimal { precision, scale } => {
478                if *scale < 0 {
479                    Resolved::warn(
480                        "DECIMAL",
481                        format!(
482                            "DuckDB has no negative scale; decimal({precision},{scale}) loads via cast"
483                        ),
484                    )
485                } else if *precision <= 38 {
486                    Resolved::ok(format!("DECIMAL({precision},{scale})"))
487                } else {
488                    // DuckDB DECIMAL maxes at precision 38; wider rides as HUGEINT/DOUBLE.
489                    Resolved::warn(
490                        "DECIMAL(38,*)",
491                        format!("decimal({precision},{scale}) exceeds DuckDB DECIMAL(38); widens"),
492                    )
493                }
494            }
495            RivetType::Date => Resolved::ok("DATE"),
496            RivetType::Time { .. } => Resolved::ok("TIME"),
497            RivetType::Timestamp {
498                timezone: Some(_), ..
499            } => Resolved::ok("TIMESTAMPTZ"),
500            // DuckDB has a native nanosecond timestamp; the `timestamp_ns` override
501            // round-trips losslessly (verified live 2026-06-07).
502            RivetType::Timestamp {
503                unit: TimeUnit::Nanosecond,
504                timezone: None,
505            } => Resolved::ok("TIMESTAMP_NS"),
506            RivetType::Timestamp { timezone: None, .. } => Resolved::ok("TIMESTAMP"),
507            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("VARCHAR"),
508            RivetType::Binary => Resolved::ok("BLOB"),
509            RivetType::Json => Resolved::ok("JSON"),
510            RivetType::Uuid => Resolved::ok("UUID"),
511            RivetType::Interval => Resolved::ok("INTERVAL"),
512            RivetType::List { inner } => {
513                let inner_r = native(inner);
514                if inner_r.status == TargetStatus::Fail {
515                    Resolved::fail(format!(
516                        "LIST of unsupported element: {}",
517                        inner_r.target_type
518                    ))
519                } else {
520                    Resolved::ok(format!("{}[]", inner_r.target_type))
521                }
522            }
523            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
524        }
525    }
526}
527
528// ── Snowflake ────────────────────────────────────────────────────────────────
529
530mod snowflake {
531    use super::*;
532
533    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
534        native(input.rivet_type).into_spec(input)
535    }
536
537    /// Snowflake autoload (INFER_SCHEMA / COPY) + recovery casts — verified live
538    /// (2026-06-01). Needs `BINARY_AS_TEXT=FALSE` in the file format; cast column
539    /// refs are double-quoted because INFER_SCHEMA names are lowercase and
540    /// case-sensitive.
541    fn native(t: &RivetType) -> Resolved {
542        match t {
543            RivetType::Bool => Resolved::ok("BOOLEAN"),
544            RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("NUMBER(38,0)"),
545            // u64 > INT64_MAX overflows the Parquet read; fix at source.
546            RivetType::UInt64 => Resolved::diverge(
547                "NUMBER(20,0)",
548                "NUMBER(38,0)",
549                "UINT64 > INT64_MAX overflows the Parquet read; map to decimal(20,0) at source",
550                None,
551            ),
552            RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT"),
553            RivetType::Decimal { precision, scale } => {
554                if *scale < 0 {
555                    Resolved::warn(
556                        "NUMBER",
557                        format!(
558                            "Snowflake NUMBER has no negative scale; decimal({precision},{scale}) loads via cast"
559                        ),
560                    )
561                } else {
562                    Resolved::ok(format!("NUMBER({precision},{scale})"))
563                }
564            }
565            RivetType::Date => Resolved::ok("DATE"),
566            // TIME autoloads as NUMBER (µs of day); rebuild with TIME_FROM_PARTS.
567            RivetType::Time { .. } => Resolved::diverge(
568                "TIME",
569                "NUMBER(38,0)",
570                "TIME autoloads as NUMBER (µs of day); recover with TIME_FROM_PARTS after load",
571                Some(r#"TIME_FROM_PARTS(0,0,FLOOR("{col}"/1000000),MOD("{col}",1000000)*1000)"#),
572            ),
573            // tz timestamp lands as TIMESTAMP_NTZ holding the UTC instant.
574            RivetType::Timestamp {
575                timezone: Some(_), ..
576            } => Resolved::diverge(
577                "TIMESTAMP_TZ",
578                "TIMESTAMP_NTZ",
579                "tz timestamp autoloads as TIMESTAMP_NTZ — ALTER SESSION SET TIMEZONE='UTC' before COPY so the instant matches",
580                None,
581            ),
582            // Nanosecond naive timestamp autoloads as NUMBER (ns since epoch),
583            // like the µs case but at scale 9. Snowflake TIMESTAMP_NTZ holds full
584            // 9-digit precision, so TO_TIMESTAMP_NTZ(col, 9) recovers it
585            // losslessly — verified live 2026-06-07 (NUMBER 1717761600123456700 →
586            // 2024-06-07 12:00:00.123456700, the 7th digit intact).
587            RivetType::Timestamp {
588                unit: TimeUnit::Nanosecond,
589                timezone: None,
590            } => Resolved::diverge(
591                "TIMESTAMP_NTZ",
592                "NUMBER(38,0)",
593                "nanosecond timestamp autoloads as NUMBER (ns since epoch); recover with \
594                 TO_TIMESTAMP_NTZ(col, 9) after load — Snowflake TIMESTAMP_NTZ holds full ns precision",
595                Some(r#"TO_TIMESTAMP_NTZ("{col}", 9)"#),
596            ),
597            // naive timestamp autoloads as NUMBER (µs since epoch).
598            RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
599                "TIMESTAMP_NTZ",
600                "NUMBER(38,0)",
601                "naive timestamp autoloads as NUMBER (µs since epoch); recover with TO_TIMESTAMP_NTZ after load",
602                Some(r#"TO_TIMESTAMP_NTZ("{col}", 6)"#),
603            ),
604            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("TEXT"),
605            // bytea/blob needs BINARY_AS_TEXT=FALSE or non-UTF8 bytes fail.
606            RivetType::Binary => Resolved::warn(
607                "BINARY",
608                "set BINARY_AS_TEXT=FALSE in the Parquet FILE FORMAT or non-UTF8 bytes fail to load",
609            ),
610            // JSON autoloads as TEXT; PARSE_JSON recovers native VARIANT.
611            RivetType::Json => Resolved::diverge(
612                "VARIANT",
613                "TEXT",
614                "JSON autoloads as TEXT; recover native VARIANT with PARSE_JSON after load",
615                Some(r#"PARSE_JSON("{col}")"#),
616            ),
617            // UUID (FixedSizeBinary 16) autoloads as 16-byte BINARY.
618            RivetType::Uuid => Resolved::diverge(
619                "TEXT",
620                "BINARY",
621                "UUID autoloads as 16-byte BINARY; recover canonical text with HEX_ENCODE + REGEXP after load",
622                Some(
623                    r#"REGEXP_REPLACE(LOWER(HEX_ENCODE("{col}")),'^(.{8})(.{4})(.{4})(.{4})(.{12})$','\\1-\\2-\\3-\\4-\\5')"#,
624                ),
625            ),
626            RivetType::Interval => Resolved::ok("TEXT"),
627            // A Parquet list autoloads as VARIANT (holding the JSON array), not
628            // native ARRAY — verified live 2026-06-01: INFER_SCHEMA reports
629            // VARIANT for both `tags` (text[]) and `nums` (int[]). Recover the
630            // native ARRAY with `::ARRAY` after load.
631            RivetType::List { inner } => {
632                let inner_r = native(inner);
633                if inner_r.status == TargetStatus::Fail {
634                    Resolved::fail(format!(
635                        "ARRAY of unsupported element: {}",
636                        inner_r.target_type
637                    ))
638                } else {
639                    Resolved::diverge(
640                        "ARRAY",
641                        "VARIANT",
642                        "list autoloads as VARIANT (the JSON array); recover native ARRAY with ::ARRAY after load",
643                        Some(r#""{col}"::ARRAY"#),
644                    )
645                }
646            }
647            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
648        }
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655
656    fn input<'a>(rt: &'a RivetType) -> TargetInput<'a> {
657        TargetInput {
658            column_name: "c",
659            rivet_type: rt,
660            arrow_type: None,
661            fidelity: TypeFidelity::Exact,
662        }
663    }
664
665    fn bq(rt: &RivetType) -> TargetColumnSpec {
666        ExportTarget::BigQuery.resolve_column(input(rt))
667    }
668    fn duck(rt: &RivetType) -> TargetColumnSpec {
669        ExportTarget::DuckDb.resolve_column(input(rt))
670    }
671    fn sf(rt: &RivetType) -> TargetColumnSpec {
672        ExportTarget::Snowflake.resolve_column(input(rt))
673    }
674
675    // ── nanosecond timestamp (`timestamp_ns` override) per-target autoload ────
676    // The default `timestamp` is microsecond; ns is opt-in (datetime2(7)). These
677    // pin the autoload truth verified live on 2026-06-07 so the preflight report
678    // doesn't claim the microsecond behaviour for a ns column.
679
680    #[test]
681    fn bq_nanosecond_timestamp_autoloads_as_int64() {
682        let ns = RivetType::Timestamp {
683            unit: super::super::TimeUnit::Nanosecond,
684            timezone: None,
685        };
686        let s = bq(&ns);
687        assert_eq!(s.target_type, "INT64");
688        assert_eq!(s.autoload_type, "INT64");
689        assert_eq!(s.status, TargetStatus::Warn);
690        // No lossless temporal recovery (ns→µs is lossy) — like the UINT64 case.
691        assert!(s.cast_sql.is_none(), "ns→BQ has no lossless temporal cast");
692    }
693
694    #[test]
695    fn duckdb_nanosecond_timestamp_is_native_timestamp_ns() {
696        let ns = RivetType::Timestamp {
697            unit: super::super::TimeUnit::Nanosecond,
698            timezone: None,
699        };
700        let s = duck(&ns);
701        assert_eq!(s.target_type, "TIMESTAMP_NS");
702        assert_eq!(s.status, TargetStatus::Ok);
703    }
704
705    #[test]
706    fn snowflake_nanosecond_timestamp_recovers_losslessly_at_scale_9() {
707        let ns = RivetType::Timestamp {
708            unit: super::super::TimeUnit::Nanosecond,
709            timezone: None,
710        };
711        let s = sf(&ns);
712        assert_eq!(s.target_type, "TIMESTAMP_NTZ");
713        assert_eq!(s.autoload_type, "NUMBER(38,0)");
714        // Lossless recovery (TIMESTAMP_NTZ holds 9 digits) → cast_sql is Some at
715        // scale 9 (not the µs scale 6); `{col}` is substituted with the column.
716        assert_eq!(s.cast_sql.as_deref(), Some(r#"TO_TIMESTAMP_NTZ("c", 9)"#));
717    }
718
719    // ── dispatch on RivetType, not Arrow — the headline fix ──────────────────
720
721    #[test]
722    fn bq_uuid_resolves_not_fails() {
723        // The old arrow-dispatch `bq_compat` hard-failed UUID (FixedSizeBinary
724        // had no arm). Now it resolves on RivetType: native STRING, BYTES on
725        // autoload.
726        let s = bq(&RivetType::Uuid);
727        assert_eq!(s.target_type, "STRING");
728        assert_eq!(s.autoload_type, "BYTES");
729        assert_eq!(s.status, TargetStatus::Warn);
730        assert!(s.cast_sql.unwrap().contains("c"));
731    }
732
733    #[test]
734    fn bq_json_native_is_json_autoload_is_bytes() {
735        let s = bq(&RivetType::Json);
736        assert_eq!(s.target_type, "JSON");
737        assert_eq!(s.autoload_type, "BYTES");
738        assert_eq!(s.status, TargetStatus::Warn);
739        assert!(s.cast_sql.unwrap().starts_with("PARSE_JSON"));
740    }
741
742    #[test]
743    fn bq_naive_timestamp_is_datetime_native_timestamp_autoload() {
744        let naive = RivetType::Timestamp {
745            unit: super::super::TimeUnit::Microsecond,
746            timezone: None,
747        };
748        let s = bq(&naive);
749        assert_eq!(s.target_type, "DATETIME");
750        assert_eq!(s.autoload_type, "TIMESTAMP");
751        assert_eq!(s.status, TargetStatus::Warn);
752    }
753
754    #[test]
755    fn bq_tz_timestamp_is_timestamp_ok() {
756        let tz = RivetType::Timestamp {
757            unit: super::super::TimeUnit::Microsecond,
758            timezone: Some("UTC".into()),
759        };
760        let s = bq(&tz);
761        assert_eq!(s.target_type, "TIMESTAMP");
762        assert_eq!(s.autoload_type, "TIMESTAMP");
763        assert_eq!(s.status, TargetStatus::Ok);
764    }
765
766    #[test]
767    fn bq_decimal_within_numeric_is_numeric() {
768        let s = bq(&RivetType::Decimal {
769            precision: 18,
770            scale: 2,
771        });
772        assert_eq!(s.target_type, "NUMERIC");
773        assert_eq!(s.status, TargetStatus::Ok);
774    }
775
776    #[test]
777    fn bq_decimal_escalates_to_bignumeric() {
778        let s = bq(&RivetType::Decimal {
779            precision: 38,
780            scale: 9,
781        });
782        assert_eq!(s.target_type, "BIGNUMERIC");
783        assert_eq!(s.status, TargetStatus::Ok);
784    }
785
786    #[test]
787    fn bq_decimal_negative_scale_fails() {
788        let s = bq(&RivetType::Decimal {
789            precision: 5,
790            scale: -2,
791        });
792        assert_eq!(s.status, TargetStatus::Fail);
793    }
794
795    #[test]
796    fn bq_uint64_recommends_numeric_warns_overflow() {
797        let s = bq(&RivetType::UInt64);
798        assert_eq!(s.target_type, "NUMERIC");
799        assert_eq!(s.autoload_type, "INT64");
800        assert_eq!(s.status, TargetStatus::Warn);
801    }
802
803    #[test]
804    fn bq_list_is_repeated_native_record_autoload() {
805        let t = RivetType::List {
806            inner: Box::new(RivetType::String),
807        };
808        let s = bq(&t);
809        assert_eq!(s.target_type, "REPEATED STRING");
810        assert!(s.autoload_type.contains("REPEATED RECORD"));
811        assert_eq!(s.status, TargetStatus::Warn);
812    }
813
814    #[test]
815    fn bq_unsupported_is_fail_row_not_panic() {
816        let t = RivetType::Unsupported {
817            native_type: "geometry".into(),
818            reason: "no mapping".into(),
819        };
820        let s = bq(&t);
821        assert_eq!(s.status, TargetStatus::Fail);
822        assert_eq!(s.target_type, "-");
823    }
824
825    #[test]
826    fn bq_standard_scalars_ok() {
827        for (rt, native) in [
828            (RivetType::Bool, "BOOL"),
829            (RivetType::Int64, "INT64"),
830            (RivetType::Float64, "FLOAT64"),
831            (RivetType::Date, "DATE"),
832            (RivetType::String, "STRING"),
833            (RivetType::Binary, "BYTES"),
834            (RivetType::Enum, "STRING"),
835        ] {
836            let s = bq(&rt);
837            assert_eq!(s.target_type, native, "{rt:?}");
838            assert_eq!(s.autoload_type, native, "{rt:?}");
839            assert_eq!(s.status, TargetStatus::Ok, "{rt:?}");
840        }
841    }
842
843    // ── DuckDB honors every logical type — autoload == native ────────────────
844
845    #[test]
846    fn duckdb_reads_everything_natively() {
847        let naive = RivetType::Timestamp {
848            unit: super::super::TimeUnit::Microsecond,
849            timezone: None,
850        };
851        for rt in [
852            RivetType::Json,
853            RivetType::Uuid,
854            RivetType::UInt64,
855            naive,
856            RivetType::List {
857                inner: Box::new(RivetType::Int64),
858            },
859        ] {
860            let s = duck(&rt);
861            assert_eq!(
862                s.target_type, s.autoload_type,
863                "DuckDB autoload must equal native for {rt:?}"
864            );
865            assert_ne!(s.status, TargetStatus::Fail, "{rt:?}");
866        }
867    }
868
869    #[test]
870    fn duckdb_native_type_names() {
871        assert_eq!(duck(&RivetType::Json).target_type, "JSON");
872        assert_eq!(duck(&RivetType::Uuid).target_type, "UUID");
873        assert_eq!(duck(&RivetType::UInt64).target_type, "UBIGINT");
874        assert_eq!(
875            duck(&RivetType::Decimal {
876                precision: 18,
877                scale: 2
878            })
879            .target_type,
880            "DECIMAL(18,2)"
881        );
882        assert_eq!(
883            duck(&RivetType::List {
884                inner: Box::new(RivetType::Int64)
885            })
886            .target_type,
887            "BIGINT[]"
888        );
889    }
890
891    #[test]
892    fn parse_accepts_aliases() {
893        assert_eq!(ExportTarget::parse("bq"), Some(ExportTarget::BigQuery));
894        assert_eq!(
895            ExportTarget::parse("BigQuery"),
896            Some(ExportTarget::BigQuery)
897        );
898        assert_eq!(ExportTarget::parse("duckdb"), Some(ExportTarget::DuckDb));
899        assert_eq!(ExportTarget::parse("nope"), None);
900    }
901
902    #[test]
903    fn resolve_table_preserves_order_and_names() {
904        use super::super::SourceColumn;
905        let mappings = vec![
906            TypeMapping::from_source(&SourceColumn::simple("a", "int8", true), RivetType::Int64),
907            TypeMapping::from_source(&SourceColumn::simple("b", "jsonb", true), RivetType::Json),
908        ];
909        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
910        assert_eq!(specs.len(), 2);
911        assert_eq!(specs[0].column_name, "a");
912        assert_eq!(specs[1].column_name, "b");
913        assert_eq!(specs[1].target_type, "JSON");
914    }
915
916    // ── edge cases: remediation hints must recover from the DEGRADED state ────
917    // Regression guard for the bug class where the resolver proposes a post-load
918    // cast that cannot actually recover an already-lossy value (e.g. a UINT64
919    // that overflowed into INT64). See CLAUDE.md "Remediation hints must recover
920    // from the degraded state".
921
922    #[test]
923    fn cast_sql_is_none_when_post_load_recovery_is_impossible() {
924        // UINT64 > INT64_MAX has already overflowed by the time it autoloads as
925        // INT64; a SELECT-time cast would operate on corrupted bits. The only
926        // fix is source-side (a decimal override) — cast_sql MUST be None and
927        // the note must point there, not promise a post-load cast.
928        let u = bq(&RivetType::UInt64);
929        assert!(
930            u.cast_sql.is_none(),
931            "overflowed UINT64 has no lossless post-load recovery"
932        );
933        let note = u.note.unwrap().to_lowercase();
934        assert!(
935            note.contains("override"),
936            "UINT64 note must point to the source-side override, got: {note}"
937        );
938    }
939
940    #[test]
941    fn cast_sql_present_only_when_lossless_post_load() {
942        // JSON/UUID/naive-timestamp autoload to a degraded type but still hold
943        // the value losslessly, so a post-load cast genuinely recovers it.
944        assert!(
945            bq(&RivetType::Json)
946                .cast_sql
947                .unwrap()
948                .contains("PARSE_JSON")
949        );
950        assert!(bq(&RivetType::Uuid).cast_sql.unwrap().contains("TO_HEX"));
951        let naive = RivetType::Timestamp {
952            unit: super::super::TimeUnit::Microsecond,
953            timezone: None,
954        };
955        assert!(bq(&naive).cast_sql.unwrap().contains("DATETIME"));
956    }
957
958    #[test]
959    fn every_divergence_offers_a_recovery_path() {
960        // Invariant: whenever BigQuery autoload diverges from the native type the
961        // operator gets SOME recovery — a lossless post-load `cast_sql`, or a
962        // note describing the fix (post-load transform, or a source override).
963        // Never a silent no-op (the bug class).
964        let naive = RivetType::Timestamp {
965            unit: super::super::TimeUnit::Microsecond,
966            timezone: None,
967        };
968        let cases = [
969            RivetType::Json,
970            RivetType::Uuid,
971            RivetType::UInt64,
972            naive,
973            RivetType::List {
974                inner: Box::new(RivetType::String),
975            },
976        ];
977        for rt in cases {
978            let s = bq(&rt);
979            assert_ne!(s.autoload_type, s.target_type, "case must diverge: {rt:?}");
980            let has_cast = s.cast_sql.is_some();
981            let note = s.note.as_deref().unwrap_or("").to_lowercase();
982            let describes_recovery = note.contains("after load") || note.contains("override");
983            assert!(
984                has_cast || describes_recovery,
985                "divergent {rt:?} must offer a recovery (cast_sql or a recovery note)"
986            );
987        }
988    }
989
990    // ── edge cases: decimal precision/scale overflow at the target boundary ───
991
992    #[test]
993    fn bq_decimal_limit_boundaries() {
994        // Exact BIGNUMERIC ceiling is ok.
995        assert_eq!(
996            bq(&RivetType::Decimal {
997                precision: 76,
998                scale: 38
999            })
1000            .status,
1001            TargetStatus::Ok
1002        );
1003        // One past precision overflows BIGNUMERIC → Fail, never a silent clamp.
1004        assert_eq!(
1005            bq(&RivetType::Decimal {
1006                precision: 77,
1007                scale: 38
1008            })
1009            .status,
1010            TargetStatus::Fail
1011        );
1012        // One past scale → Fail.
1013        assert_eq!(
1014            bq(&RivetType::Decimal {
1015                precision: 76,
1016                scale: 39
1017            })
1018            .status,
1019            TargetStatus::Fail
1020        );
1021        // Between NUMERIC and BIGNUMERIC escalates rather than overflowing NUMERIC.
1022        assert_eq!(
1023            bq(&RivetType::Decimal {
1024                precision: 30,
1025                scale: 0
1026            })
1027            .target_type,
1028            "BIGNUMERIC"
1029        );
1030    }
1031
1032    #[test]
1033    fn duckdb_decimal_over_38_warns_not_silently_clamps() {
1034        let s = duck(&RivetType::Decimal {
1035            precision: 40,
1036            scale: 2,
1037        });
1038        assert_eq!(s.status, TargetStatus::Warn);
1039    }
1040
1041    // ── L5 recovery SQL (the post-load transform for BigQuery autoload) ───────
1042
1043    #[test]
1044    fn bq_recovery_sql_casts_native_types() {
1045        use super::super::{SourceColumn, TimeUnit};
1046        let naive = RivetType::Timestamp {
1047            unit: TimeUnit::Microsecond,
1048            timezone: None,
1049        };
1050        let mappings = vec![
1051            TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1052            TypeMapping::from_source(
1053                &SourceColumn::simple("attrs", "jsonb", true),
1054                RivetType::Json,
1055            ),
1056            TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1057            TypeMapping::from_source(
1058                &SourceColumn::simple("created_at", "timestamp", true),
1059                naive,
1060            ),
1061            TypeMapping::from_source(
1062                &SourceColumn::simple("tags", "_text", true),
1063                RivetType::List {
1064                    inner: Box::new(RivetType::String),
1065                },
1066            ),
1067        ];
1068        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1069        let sql = ExportTarget::BigQuery
1070            .recovery_sql(&specs, "payments")
1071            .expect("BigQuery has a recovery SQL");
1072        // The post-load casts that actually recover native types (verified live
1073        // against BigQuery — a declared-type load is rejected, a cast is not).
1074        assert!(sql.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1075        assert!(sql.contains("TO_HEX(uid) AS uid"));
1076        assert!(sql.contains("DATETIME(created_at) AS created_at"));
1077        // Arrays flatten via UNNEST (verified live; staging loaded with
1078        // --parquet_enable_list_inference).
1079        assert!(sql.contains("ARRAY(SELECT el.item FROM UNNEST(tags) AS el) AS tags"));
1080        assert!(sql.contains("--parquet_enable_list_inference"));
1081        // OK columns pass through unchanged.
1082        assert!(sql.contains("SELECT\n  id"));
1083        // Reads the autoload staging table, writes the recovered table.
1084        assert!(sql.contains("CREATE OR REPLACE TABLE `payments`"));
1085        assert!(sql.contains("FROM `payments__staging`"));
1086    }
1087
1088    #[test]
1089    fn duckdb_needs_no_recovery() {
1090        let mappings = vec![TypeMapping::from_source(
1091            &super::super::SourceColumn::simple("attrs", "json", true),
1092            RivetType::Json,
1093        )];
1094        let specs = ExportTarget::DuckDb.resolve_table(&mappings);
1095        assert!(
1096            ExportTarget::DuckDb.recovery_sql(&specs, "t").is_none(),
1097            "DuckDB autoloads every logical type natively — no recovery needed"
1098        );
1099    }
1100
1101    #[test]
1102    fn recovery_sql_projects_every_column_once_and_only_casts_divergent() {
1103        use super::super::{SourceColumn, TimeUnit};
1104        let naive = RivetType::Timestamp {
1105            unit: TimeUnit::Microsecond,
1106            timezone: None,
1107        };
1108        let cols: [(&str, RivetType); 6] = [
1109            ("id", RivetType::Int64), // ok → passthrough
1110            (
1111                "amount",
1112                RivetType::Decimal {
1113                    precision: 18,
1114                    scale: 2,
1115                },
1116            ), // ok → passthrough
1117            ("attrs", RivetType::Json), // divergent → cast
1118            ("uid", RivetType::Uuid), // divergent → cast
1119            ("created_at", naive),    // divergent → cast
1120            (
1121                "tags",
1122                RivetType::List {
1123                    inner: Box::new(RivetType::String),
1124                },
1125            ), // divergent → cast
1126        ];
1127        let mappings: Vec<_> = cols
1128            .iter()
1129            .cloned()
1130            .map(|(n, rt)| TypeMapping::from_source(&SourceColumn::simple(n, "x", true), rt))
1131            .collect();
1132        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1133        let sql = ExportTarget::BigQuery.recovery_sql(&specs, "t").unwrap();
1134
1135        // The SELECT projects exactly one item per input column — nothing dropped,
1136        // nothing duplicated.
1137        let body = sql
1138            .split("SELECT\n")
1139            .nth(1)
1140            .and_then(|s| s.split("\nFROM").next())
1141            .expect("recovery SQL has a SELECT … FROM body");
1142        assert_eq!(
1143            body.split(",\n").count(),
1144            cols.len(),
1145            "one projection per column, got:\n{body}"
1146        );
1147        for (name, _) in &cols {
1148            assert!(body.contains(name), "column {name} missing:\n{body}");
1149        }
1150        // OK columns pass through unchanged (bare `  name,`); divergent ones
1151        // carry their cast (`… AS name`).
1152        assert!(body.contains("  id,") && !body.contains("AS id"));
1153        assert!(body.contains("  amount,") && !body.contains("AS amount"));
1154        assert!(body.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1155        assert!(body.contains("TO_HEX(uid) AS uid"));
1156        assert!(body.contains("DATETIME(created_at) AS created_at"));
1157        assert!(body.contains("UNNEST(tags) AS el) AS tags"));
1158    }
1159
1160    // ── Snowflake (verified live 2026-06-01) ─────────────────────────────────
1161
1162    #[test]
1163    fn snowflake_autoload_degradations_and_native_casts() {
1164        // JSON → TEXT autoload / VARIANT native, recover PARSE_JSON.
1165        let j = sf(&RivetType::Json);
1166        assert_eq!(j.target_type, "VARIANT");
1167        assert_eq!(j.autoload_type, "TEXT");
1168        assert!(j.cast_sql.unwrap().starts_with("PARSE_JSON"));
1169        // UUID → BINARY autoload / TEXT native, recover via HEX_ENCODE + REGEXP.
1170        let u = sf(&RivetType::Uuid);
1171        assert_eq!(u.target_type, "TEXT");
1172        assert_eq!(u.autoload_type, "BINARY");
1173        assert!(u.cast_sql.unwrap().contains("HEX_ENCODE"));
1174        // naive timestamp → NUMBER autoload / TIMESTAMP_NTZ native.
1175        let naive = RivetType::Timestamp {
1176            unit: super::super::TimeUnit::Microsecond,
1177            timezone: None,
1178        };
1179        let t = sf(&naive);
1180        assert_eq!(t.target_type, "TIMESTAMP_NTZ");
1181        assert_eq!(t.autoload_type, "NUMBER(38,0)");
1182        assert!(t.cast_sql.unwrap().contains("TO_TIMESTAMP_NTZ"));
1183        // TIME → NUMBER autoload, recover TIME_FROM_PARTS.
1184        let tm = sf(&RivetType::Time {
1185            unit: super::super::TimeUnit::Microsecond,
1186        });
1187        assert_eq!(tm.target_type, "TIME");
1188        assert!(tm.cast_sql.unwrap().contains("TIME_FROM_PARTS"));
1189        // decimal is native NUMBER(p,s) — no cast.
1190        let d = sf(&RivetType::Decimal {
1191            precision: 18,
1192            scale: 2,
1193        });
1194        assert_eq!(d.target_type, "NUMBER(18,2)");
1195        assert!(d.cast_sql.is_none());
1196        // list autoloads as VARIANT (verified live), recover native ARRAY with ::ARRAY.
1197        let l = sf(&RivetType::List {
1198            inner: Box::new(RivetType::Int64),
1199        });
1200        assert_eq!(l.target_type, "ARRAY");
1201        assert_eq!(l.autoload_type, "VARIANT");
1202        assert!(l.cast_sql.unwrap().ends_with("::ARRAY"));
1203    }
1204
1205    #[test]
1206    fn snowflake_recovery_sql_quotes_columns_and_casts() {
1207        use super::super::{SourceColumn, TimeUnit};
1208        let naive = RivetType::Timestamp {
1209            unit: TimeUnit::Microsecond,
1210            timezone: None,
1211        };
1212        let mappings = vec![
1213            TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1214            TypeMapping::from_source(
1215                &SourceColumn::simple("attrs", "jsonb", true),
1216                RivetType::Json,
1217            ),
1218            TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1219            TypeMapping::from_source(
1220                &SourceColumn::simple("created_at", "timestamp", true),
1221                naive,
1222            ),
1223        ];
1224        let specs = ExportTarget::Snowflake.resolve_table(&mappings);
1225        let sql = ExportTarget::Snowflake.recovery_sql(&specs, "t").unwrap();
1226        // Staging columns are lowercase + quoted; passthrough quotes the source.
1227        assert!(sql.contains("\"id\" AS id"));
1228        assert!(sql.contains("PARSE_JSON(\"attrs\") AS attrs"));
1229        assert!(sql.contains("HEX_ENCODE(\"uid\")"));
1230        assert!(sql.contains("TO_TIMESTAMP_NTZ(\"created_at\", 6) AS created_at"));
1231        // The load preamble the recovery depends on.
1232        assert!(sql.contains("BINARY_AS_TEXT=FALSE"));
1233        assert!(sql.contains("MATCH_BY_COLUMN_NAME"));
1234        assert!(sql.contains("FROM t__staging"));
1235    }
1236
1237    #[test]
1238    fn parse_accepts_snowflake() {
1239        assert_eq!(
1240            ExportTarget::parse("snowflake"),
1241            Some(ExportTarget::Snowflake)
1242        );
1243        assert_eq!(ExportTarget::parse("sf"), Some(ExportTarget::Snowflake));
1244    }
1245
1246    #[test]
1247    fn valid_target_names_lists_every_parseable_target() {
1248        // The "unknown target" error message reads from this; it must name
1249        // every target `parse` accepts, or the hint sends operators wrong.
1250        // snowflake regression: the old message hard-coded "bigquery, duckdb".
1251        let names = ExportTarget::valid_target_names();
1252        assert!(names.contains("snowflake"), "got: {names}");
1253        assert!(names.contains("bigquery"), "got: {names}");
1254        assert!(names.contains("duckdb"), "got: {names}");
1255    }
1256}