Skip to main content

rivet/types/
target.rs

1//! Target-type resolver (ADR-0014 L4, roadmap §16).
2//!
3//! Given a column's canonical [`RivetType`] and a runtime-chosen
4//! [`ExportTarget`], resolve what the column becomes in a downstream
5//! warehouse: the **native** target type (full fidelity), the **autoload**
6//! type a generic Parquet reader infers without a declared schema, a safety
7//! [`TargetStatus`], a note, and an optional materialization (`cast_sql` /
8//! load-schema hint).
9//!
10//! Design (locked in the type-support architecture review):
11//!
12//! - **Dispatch on `RivetType`, never the physical Arrow type.** The previous
13//!   `bq_compat` matched on `arrow::DataType` and so was blind to `json` /
14//!   `uuid` / `enum` (all `Utf8` / `FixedSizeBinary` by then) and hard-failed
15//!   UUID. The resolver keys off the semantic type; Arrow is consulted only
16//!   for decimal precision (and `RivetType::Decimal` already carries `p,s`).
17//! - **Total & infallible.** Every `(RivetType, ExportTarget)` pair yields a
18//!   populated [`TargetColumnSpec`]; an unmappable column is a `status: Fail`
19//!   row, never an `Err`. This keeps the type-report table and `--json` shape
20//!   stable.
21//! - **`autoload_type` tells the truth.** It encodes the *empirically
22//!   verified* behavior of each target's Parquet autoloader — notably that
23//!   BigQuery autoload degrades Parquet `JsonType`/`UUIDType` to `BYTES`,
24//!   `isAdjustedToUTC=false` timestamps to `TIMESTAMP` (not `DATETIME`), and
25//!   3-level lists to `REPEATED RECORD{item}`. DuckDB honors all of them.
26//!
27//! BigQuery numeric limits (as of 2025):
28//!   NUMERIC    — precision 1–29, scale 0–9
29//!   BIGNUMERIC — precision 1–76, scale 0–38
30
31use arrow::datatypes::DataType;
32use serde::Serialize;
33
34use super::{RivetType, TimeUnit, TypeFidelity, TypeMapping};
35
36/// A supported downstream warehouse target. Closed, in-tree, contract-tested
37/// set; chosen at runtime from `--target X`, one per run.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ExportTarget {
40    /// Reference consumer of Rivet Parquet — honors every native logical type
41    /// (JSON, UUID, decimal, list) on autoload.
42    DuckDb,
43    /// Cloud warehouse. Its Parquet autoloader is weaker than DuckDB's: see
44    /// the per-type `autoload_type` notes in [`bigquery`].
45    BigQuery,
46    /// Cloud warehouse. Like BigQuery, its Parquet autoload degrades JSON,
47    /// UUID, naive timestamps and TIME — see [`snowflake`]. Verified live.
48    Snowflake,
49    /// Columnar warehouse with the most faithful Parquet autoload of the
50    /// warehouse targets: native `UInt64`, `Decimal`, `DateTime64` (naive *and*
51    /// tz), and `Array`, so most types autoload exactly. Diverges only on `UUID`
52    /// (lands as `FixedString(16)`), `JSON` (lands as `String`) and `TIME` (no
53    /// native type) — see [`clickhouse`]. Verified live against `clickhouse_load`.
54    ClickHouse,
55}
56
57impl ExportTarget {
58    pub fn parse(s: &str) -> Option<Self> {
59        match s.to_lowercase().as_str() {
60            "bigquery" | "bq" => Some(Self::BigQuery),
61            "duckdb" | "duck" => Some(Self::DuckDb),
62            "snowflake" | "sf" => Some(Self::Snowflake),
63            "clickhouse" | "ch" => Some(Self::ClickHouse),
64            _ => None,
65        }
66    }
67
68    /// Human-readable list of every spelling [`parse`](Self::parse) accepts,
69    /// for "unknown target" error messages. Single source so the message can't
70    /// drift from `parse` (it once said "bigquery, duckdb" and missed
71    /// snowflake). Aliases are shown in parens after each canonical name.
72    pub fn valid_target_names() -> &'static str {
73        "bigquery (bq), duckdb (duck), snowflake (sf), clickhouse (ch)"
74    }
75
76    pub fn label(self) -> &'static str {
77        match self {
78            Self::BigQuery => "bigquery",
79            Self::DuckDb => "duckdb",
80            Self::Snowflake => "snowflake",
81            Self::ClickHouse => "clickhouse",
82        }
83    }
84
85    /// Resolve one already-mapped column against this target.
86    pub fn resolve_column(self, input: TargetInput<'_>) -> TargetColumnSpec {
87        let mut spec = match self {
88            ExportTarget::BigQuery => bigquery::resolve(&input),
89            ExportTarget::DuckDb => duckdb::resolve(&input),
90            ExportTarget::Snowflake => snowflake::resolve(&input),
91            ExportTarget::ClickHouse => clickhouse::resolve(&input),
92        };
93        // Fidelity floor (ADR-0014 T6): the target status must not be rosier
94        // than the source fidelity warrants — a lossy/unsupported source column
95        // can never resolve to a clean `Ok`.
96        if input.fidelity.is_unsafe_for_strict_mode() && spec.status == TargetStatus::Ok {
97            spec.status = TargetStatus::Warn;
98        }
99        spec
100    }
101
102    /// Resolve a whole table's worth of columns, one spec per column in order.
103    /// Consumed today by the type-report's recovery-SQL emission
104    /// (`preflight::type_report`); `plan-load` (ADR-0014 Phase B) would be a
105    /// second consumer.
106    pub fn resolve_table(self, mappings: &[TypeMapping]) -> Vec<TargetColumnSpec> {
107        mappings
108            .iter()
109            .map(|m| self.resolve_column(TargetInput::from(m)))
110            .collect()
111    }
112
113    /// SQL that recovers target-native types after a bare autoload degraded
114    /// them (ADR-0014 L5). For BigQuery this is a `CREATE TABLE … AS SELECT`
115    /// over the autoloaded staging table applying per-column casts — BigQuery's
116    /// Parquet loader will NOT coerce a *declared* native type on load (verified
117    /// against live BQ: BYTES→JSON, TIMESTAMP→DATETIME loads are rejected), so
118    /// the recovery has to be a post-load transform, not a load schema. `None`
119    /// when the target reads the interchange Parquet faithfully (DuckDB).
120    pub fn recovery_sql(self, specs: &[TargetColumnSpec], table: &str) -> Option<String> {
121        match self {
122            ExportTarget::BigQuery => Some(bigquery_recovery_sql(specs, table)),
123            ExportTarget::Snowflake => Some(snowflake_recovery_sql(specs, table)),
124            ExportTarget::ClickHouse => Some(clickhouse_recovery_sql(specs, table)),
125            ExportTarget::DuckDb => None,
126        }
127    }
128}
129
130/// Status of a column's resolution against a specific target.
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
132#[serde(rename_all = "snake_case")]
133pub enum TargetStatus {
134    Ok,
135    Warn,
136    Fail,
137}
138
139impl TargetStatus {
140    pub fn label(&self) -> &'static str {
141        match self {
142            Self::Ok => "ok",
143            Self::Warn => "warn",
144            Self::Fail => "fail",
145        }
146    }
147}
148
149/// The borrowed subset a resolver is allowed to read. Built from a
150/// [`TypeMapping`] via [`From`]. Dispatch is on `rivet_type`; `arrow_type` is
151/// retained for callers that want it but the resolver reads precision from
152/// `RivetType::Decimal` directly.
153#[derive(Debug, Clone, Copy)]
154pub struct TargetInput<'a> {
155    pub column_name: &'a str,
156    pub rivet_type: &'a RivetType,
157    /// Retained for callers and future precision-sensitive targets; the
158    /// resolver reads precision from `RivetType::Decimal` directly today.
159    #[allow(dead_code)]
160    pub arrow_type: Option<&'a DataType>,
161    pub fidelity: TypeFidelity,
162}
163
164impl<'a> From<&'a TypeMapping> for TargetInput<'a> {
165    fn from(m: &'a TypeMapping) -> Self {
166        TargetInput {
167            column_name: &m.column_name,
168            rivet_type: &m.rivet_type,
169            arrow_type: m.arrow_type.as_ref(),
170            fidelity: m.fidelity,
171        }
172    }
173}
174
175/// One column's per-target materialization spec (ADR-0014 L4). Uniform across
176/// targets so the type-report table and `--json` stay stable; an unmappable
177/// column is a `status: Fail` row, not an error.
178#[derive(Debug, Clone, Serialize)]
179pub struct TargetColumnSpec {
180    /// Name copied through so a `Vec<TargetColumnSpec>` is self-describing.
181    pub column_name: String,
182    /// Native warehouse type for full fidelity, e.g. "JSON", "UBIGINT", "NUMERIC".
183    pub target_type: String,
184    /// Type a generic Parquet reader infers without a declared schema. May
185    /// differ from `target_type` (e.g. BigQuery autoloads JSON as "BYTES").
186    pub autoload_type: String,
187    pub status: TargetStatus,
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub note: Option<String>,
190    /// Materialization snippet / load-schema hint (L5) to recover the native
191    /// type when autoload diverges. `None` when `autoload_type == target_type`.
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub cast_sql: Option<String>,
194}
195
196/// Internal per-type resolution result, before the column name and cast
197/// substitution are applied.
198struct Resolved {
199    target_type: String,
200    autoload_type: String,
201    status: TargetStatus,
202    note: Option<String>,
203    /// `cast_sql` template with a `{col}` placeholder, or `None`.
204    cast: Option<String>,
205}
206
207impl Resolved {
208    fn ok(t: impl Into<String>) -> Self {
209        let t = t.into();
210        Self {
211            autoload_type: t.clone(),
212            target_type: t,
213            status: TargetStatus::Ok,
214            note: None,
215            cast: None,
216        }
217    }
218    /// Native type that *autoloads as something else* — the divergence the
219    /// resolver exists to surface.
220    fn diverge(
221        native: impl Into<String>,
222        autoload: impl Into<String>,
223        note: impl Into<String>,
224        cast: Option<&str>,
225    ) -> Self {
226        Self {
227            target_type: native.into(),
228            autoload_type: autoload.into(),
229            status: TargetStatus::Warn,
230            note: Some(note.into()),
231            cast: cast.map(str::to_string),
232        }
233    }
234    fn warn(t: impl Into<String>, note: impl Into<String>) -> Self {
235        let t = t.into();
236        Self {
237            autoload_type: t.clone(),
238            target_type: t,
239            status: TargetStatus::Warn,
240            note: Some(note.into()),
241            cast: None,
242        }
243    }
244    fn fail(note: impl Into<String>) -> Self {
245        Self {
246            target_type: "-".into(),
247            autoload_type: "-".into(),
248            status: TargetStatus::Fail,
249            note: Some(note.into()),
250            cast: None,
251        }
252    }
253    fn into_spec(self, input: &TargetInput<'_>) -> TargetColumnSpec {
254        TargetColumnSpec {
255            column_name: input.column_name.to_string(),
256            target_type: self.target_type,
257            autoload_type: self.autoload_type,
258            status: self.status,
259            note: self.note,
260            cast_sql: self.cast.map(|t| t.replace("{col}", input.column_name)),
261        }
262    }
263}
264
265fn unsupported_reason(t: &RivetType) -> String {
266    match t {
267        RivetType::Unsupported { reason, .. } => reason.clone(),
268        _ => "no target mapping".into(),
269    }
270}
271
272/// Emit a BigQuery type-recovery statement (ADR-0014 L5): load the interchange
273/// Parquet with `--autodetect` into `<table>__staging`, then run this CTAS to
274/// materialise the native types that bare autoload degrades (JSON/UUID→BYTES,
275/// naive timestamp→TIMESTAMP). Columns with a `cast_sql` get that cast; the
276/// rest pass through unchanged.
277///
278/// Verified against live BigQuery: a load schema that *declares* native types
279/// is rejected (the Parquet loader won't coerce a column's type on load), so
280/// the recovery must be this post-load transform.
281/// The recovery `SELECT` body, shared by every degrading target. The cast
282/// branch *is* the materialization contract (ADR-0014 L5) and is identical
283/// across targets — only the passthrough form (identifier quoting, alias)
284/// differs, so each target supplies that as `passthrough`. Deleting this and
285/// inlining the fold would re-scatter the cast logic across N targets.
286fn recovery_projection(specs: &[TargetColumnSpec], passthrough: impl Fn(&str) -> String) -> String {
287    specs
288        .iter()
289        .map(|s| match &s.cast_sql {
290            Some(cast) => format!("  {cast} AS {name}", name = s.column_name),
291            None => passthrough(&s.column_name),
292        })
293        .collect::<Vec<_>>()
294        .join(",\n")
295}
296
297fn bigquery_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
298    let cols = recovery_projection(specs, |name| format!("  {name}"));
299    format!(
300        "-- 1) bq load --autodetect --parquet_enable_list_inference \
301         --source_format=PARQUET {table}__staging <parquet>\n\
302         -- 2) recover native types:\n\
303         CREATE OR REPLACE TABLE `{table}` AS\n\
304         SELECT\n{cols}\n\
305         FROM `{table}__staging`;"
306    )
307}
308
309/// Emit a Snowflake type-recovery script (ADR-0014 L5). Snowflake's Parquet
310/// autoload degrades JSON→TEXT, UUID→BINARY, naive timestamp→NUMBER (µs) and
311/// TIME→NUMBER, so the recovery is a post-load CTAS over a `MATCH_BY_COLUMN_NAME`
312/// staging table. INFER_SCHEMA emits lowercase, case-sensitive names, so every
313/// reference is double-quoted. Verified live (2026-06-01) via the `snow` CLI.
314fn snowflake_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
315    let cols = recovery_projection(specs, |name| format!("  \"{name}\" AS {name}"));
316    format!(
317        "-- 1) ALTER SESSION SET TIMEZONE='UTC';\n\
318         -- 2) CREATE OR REPLACE FILE FORMAT rivet_pq TYPE=PARQUET BINARY_AS_TEXT=FALSE;\n\
319         -- 3) PUT file://<parquet> @<stage> AUTO_COMPRESS=FALSE;\n\
320         -- 4) CREATE OR REPLACE TABLE {table}__staging USING TEMPLATE (SELECT ARRAY_AGG(\n\
321         --      OBJECT_CONSTRUCT(*)) FROM TABLE(INFER_SCHEMA(LOCATION=>'@<stage>', FILE_FORMAT=>'rivet_pq')));\n\
322         --    COPY INTO {table}__staging FROM @<stage> FILE_FORMAT=(FORMAT_NAME='rivet_pq') MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;\n\
323         -- 5) recover native types:\n\
324         CREATE OR REPLACE TABLE {table} AS\n\
325         SELECT\n{cols}\n\
326         FROM {table}__staging;"
327    )
328}
329
330// ── BigQuery ─────────────────────────────────────────────────────────────────
331
332mod bigquery {
333    use super::*;
334
335    /// BigQuery NUMERIC precision/scale limits.
336    const NUMERIC_MAX_P: u8 = 29;
337    const NUMERIC_MAX_S: i8 = 9;
338    /// BigQuery BIGNUMERIC precision/scale limits.
339    const BIGNUMERIC_MAX_P: u8 = 76;
340    const BIGNUMERIC_MAX_S: i8 = 38;
341
342    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
343        native(input.rivet_type).into_spec(input)
344    }
345
346    fn native(t: &RivetType) -> Resolved {
347        match t {
348            RivetType::Bool => Resolved::ok("BOOL"),
349            RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("INT64"),
350            // u64 > i64::MAX overflows the INT64 autoload and cannot be
351            // recovered post-load (the bits are already wrong). The only fix is
352            // source-side: map the column to decimal(20,0) with a column
353            // override so it rides as Parquet DECIMAL → BigQuery NUMERIC.
354            RivetType::UInt64 => Resolved::diverge(
355                "NUMERIC",
356                "INT64",
357                "UINT64 > INT64_MAX overflows the INT64 autoload and cannot be recovered after \
358                 load — map the column to decimal(20,0) with a source column override",
359                None,
360            ),
361            RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT64"),
362            RivetType::Decimal { precision, scale } => decimal(*precision, *scale),
363            RivetType::Date => Resolved::ok("DATE"),
364            RivetType::Time { .. } => Resolved::ok("TIME"),
365            // tz-aware timestamp → instant → TIMESTAMP, autoloads cleanly.
366            RivetType::Timestamp {
367                timezone: Some(_), ..
368            } => Resolved::ok("TIMESTAMP"),
369            // Nanosecond naive timestamp (the `timestamp_ns` override, e.g. SQL
370            // Server datetime2(7)) has no BigQuery native temporal type: the loader
371            // does not recognise the Parquet TIMESTAMP(NANOS) logical type and
372            // autoloads the raw INT64 nanoseconds-since-epoch (verified live
373            // 2026-06-07 — lossless as an integer). A native TIMESTAMP needs
374            // TIMESTAMP_MICROS(DIV(col,1000)), which drops the sub-µs digits
375            // (BigQuery TIMESTAMP is microsecond), so no lossless temporal cast
376            // exists — keep INT64, or use the default `timestamp` for BigQuery.
377            RivetType::Timestamp {
378                unit: TimeUnit::Nanosecond,
379                timezone: None,
380            } => Resolved::diverge(
381                "INT64",
382                "INT64",
383                "nanosecond timestamp has no BigQuery native type — autoloads as INT64 (raw \
384                 nanos, lossless); a native TIMESTAMP via TIMESTAMP_MICROS(DIV(col,1000)) drops \
385                 sub-µs precision. Prefer `timestamp` (microsecond) for BigQuery targets.",
386                None,
387            ),
388            // naive timestamp → wall-clock → DATETIME, but BigQuery autoload
389            // ignores Parquet isAdjustedToUTC=false and yields TIMESTAMP
390            // (verified). `DATETIME(ts)` recovers the wall-clock after load.
391            RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
392                "DATETIME",
393                "TIMESTAMP",
394                "naive timestamp autoloads as TIMESTAMP (an instant); recover wall-clock with \
395                 DATETIME(col) after load",
396                Some("DATETIME({col})"),
397            ),
398            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("STRING"),
399            RivetType::Binary => Resolved::ok("BYTES"),
400            // Parquet JSON logical type autoloads as BYTES in BigQuery
401            // (verified). Declare JSON in the load schema for native JSON.
402            RivetType::Json => Resolved::diverge(
403                "JSON",
404                "BYTES",
405                "Parquet JSON logical type autoloads as BYTES in BigQuery; recover native JSON \
406                 with PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(col)) after load",
407                Some("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING({col}))"),
408            ),
409            // UUID rides as FixedSizeBinary(16) + UUIDType; BigQuery has no UUID
410            // type and autoloads it as 16-byte BYTES (verified). Native is
411            // STRING (canonical text).
412            RivetType::Uuid => Resolved::diverge(
413                "STRING",
414                "BYTES",
415                "UUID autoloads as 16-byte BYTES in BigQuery; recover hex text with TO_HEX(col) \
416                 after load (or keep BYTES)",
417                Some("TO_HEX({col})"),
418            ),
419            RivetType::Interval => Resolved::ok("STRING"),
420            RivetType::List { inner } => list(inner),
421            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
422        }
423    }
424
425    fn decimal(p: u8, s: i8) -> Resolved {
426        if s < 0 {
427            return Resolved::fail(format!(
428                "BigQuery has no negative scale; decimal({p},{s}) needs a STRING/INT64 cast"
429            ));
430        }
431        let native = if p <= NUMERIC_MAX_P && s <= NUMERIC_MAX_S {
432            "NUMERIC"
433        } else if p <= BIGNUMERIC_MAX_P && s <= BIGNUMERIC_MAX_S {
434            "BIGNUMERIC"
435        } else {
436            return Resolved::fail(format!(
437                "decimal({p},{s}) exceeds BigQuery BIGNUMERIC limits (max 76,38)"
438            ));
439        };
440        Resolved::ok(native)
441    }
442
443    fn list(inner: &RivetType) -> Resolved {
444        let inner_r = native(inner);
445        if inner_r.status == TargetStatus::Fail {
446            return Resolved::fail(format!(
447                "REPEATED of unsupported element: {}",
448                inner_r.target_type
449            ));
450        }
451        // Rivet writes the Parquet list element as `item` (arrow-rs default,
452        // not the spec's `element`), so BigQuery loads arrays as
453        // REPEATED RECORD{item} even with `--parquet_enable_list_inference`
454        // (without the flag they nest one level deeper as RECORD{list:...}).
455        // Verified live: flattening with `UNNEST(col)` + `el.item` recovers a
456        // clean REPEATED scalar.
457        Resolved::diverge(
458            format!("REPEATED {}", inner_r.target_type),
459            format!("REPEATED RECORD{{item {}}}", inner_r.autoload_type),
460            "arrays load as REPEATED RECORD{item}; load the staging table with \
461             --parquet_enable_list_inference, then flatten with UNNEST after load",
462            Some("ARRAY(SELECT el.item FROM UNNEST({col}) AS el)"),
463        )
464    }
465}
466
467// ── DuckDB ───────────────────────────────────────────────────────────────────
468
469mod duckdb {
470    use super::*;
471
472    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
473        native(input.rivet_type).into_spec(input)
474    }
475
476    /// DuckDB honors every native Parquet logical type Rivet writes, so
477    /// `autoload_type == target_type` for all supported variants (verified).
478    fn native(t: &RivetType) -> Resolved {
479        match t {
480            RivetType::Bool => Resolved::ok("BOOLEAN"),
481            RivetType::Int16 => Resolved::ok("SMALLINT"),
482            RivetType::Int32 => Resolved::ok("INTEGER"),
483            RivetType::Int64 => Resolved::ok("BIGINT"),
484            RivetType::UInt64 => Resolved::ok("UBIGINT"),
485            RivetType::Float32 => Resolved::ok("FLOAT"),
486            RivetType::Float64 => Resolved::ok("DOUBLE"),
487            RivetType::Decimal { precision, scale } => {
488                if *scale < 0 {
489                    Resolved::warn(
490                        "DECIMAL",
491                        format!(
492                            "DuckDB has no negative scale; decimal({precision},{scale}) loads via cast"
493                        ),
494                    )
495                } else if *precision <= 38 {
496                    Resolved::ok(format!("DECIMAL({precision},{scale})"))
497                } else {
498                    // DuckDB DECIMAL maxes at precision 38; wider rides as HUGEINT/DOUBLE.
499                    Resolved::warn(
500                        "DECIMAL(38,*)",
501                        format!("decimal({precision},{scale}) exceeds DuckDB DECIMAL(38); widens"),
502                    )
503                }
504            }
505            RivetType::Date => Resolved::ok("DATE"),
506            RivetType::Time { .. } => Resolved::ok("TIME"),
507            RivetType::Timestamp {
508                timezone: Some(_), ..
509            } => Resolved::ok("TIMESTAMPTZ"),
510            // DuckDB has a native nanosecond timestamp; the `timestamp_ns` override
511            // round-trips losslessly (verified live 2026-06-07).
512            RivetType::Timestamp {
513                unit: TimeUnit::Nanosecond,
514                timezone: None,
515            } => Resolved::ok("TIMESTAMP_NS"),
516            RivetType::Timestamp { timezone: None, .. } => Resolved::ok("TIMESTAMP"),
517            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("VARCHAR"),
518            RivetType::Binary => Resolved::ok("BLOB"),
519            RivetType::Json => Resolved::ok("JSON"),
520            RivetType::Uuid => Resolved::ok("UUID"),
521            RivetType::Interval => Resolved::ok("INTERVAL"),
522            RivetType::List { inner } => {
523                let inner_r = native(inner);
524                if inner_r.status == TargetStatus::Fail {
525                    Resolved::fail(format!(
526                        "LIST of unsupported element: {}",
527                        inner_r.target_type
528                    ))
529                } else {
530                    Resolved::ok(format!("{}[]", inner_r.target_type))
531                }
532            }
533            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
534        }
535    }
536}
537
538// ── Snowflake ────────────────────────────────────────────────────────────────
539
540mod snowflake {
541    use super::*;
542
543    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
544        native(input.rivet_type).into_spec(input)
545    }
546
547    /// Snowflake autoload (INFER_SCHEMA / COPY) + recovery casts — verified live
548    /// (2026-06-01). Needs `BINARY_AS_TEXT=FALSE` in the file format; cast column
549    /// refs are double-quoted because INFER_SCHEMA names are lowercase and
550    /// case-sensitive.
551    fn native(t: &RivetType) -> Resolved {
552        match t {
553            RivetType::Bool => Resolved::ok("BOOLEAN"),
554            RivetType::Int16 | RivetType::Int32 | RivetType::Int64 => Resolved::ok("NUMBER(38,0)"),
555            // u64 > INT64_MAX overflows the Parquet read; fix at source.
556            RivetType::UInt64 => Resolved::diverge(
557                "NUMBER(20,0)",
558                "NUMBER(38,0)",
559                "UINT64 > INT64_MAX overflows the Parquet read; map to decimal(20,0) at source",
560                None,
561            ),
562            RivetType::Float32 | RivetType::Float64 => Resolved::ok("FLOAT"),
563            RivetType::Decimal { precision, scale } => {
564                if *scale < 0 {
565                    Resolved::warn(
566                        "NUMBER",
567                        format!(
568                            "Snowflake NUMBER has no negative scale; decimal({precision},{scale}) loads via cast"
569                        ),
570                    )
571                } else {
572                    Resolved::ok(format!("NUMBER({precision},{scale})"))
573                }
574            }
575            RivetType::Date => Resolved::ok("DATE"),
576            // TIME autoloads as NUMBER (µs of day); rebuild with TIME_FROM_PARTS.
577            RivetType::Time { .. } => Resolved::diverge(
578                "TIME",
579                "NUMBER(38,0)",
580                "TIME autoloads as NUMBER (µs of day); recover with TIME_FROM_PARTS after load",
581                Some(r#"TIME_FROM_PARTS(0,0,FLOOR("{col}"/1000000),MOD("{col}",1000000)*1000)"#),
582            ),
583            // tz timestamp lands as TIMESTAMP_NTZ holding the UTC instant.
584            RivetType::Timestamp {
585                timezone: Some(_), ..
586            } => Resolved::diverge(
587                "TIMESTAMP_TZ",
588                "TIMESTAMP_NTZ",
589                "tz timestamp autoloads as TIMESTAMP_NTZ — ALTER SESSION SET TIMEZONE='UTC' before COPY so the instant matches",
590                None,
591            ),
592            // Nanosecond naive timestamp autoloads as NUMBER (ns since epoch),
593            // like the µs case but at scale 9. Snowflake TIMESTAMP_NTZ holds full
594            // 9-digit precision, so TO_TIMESTAMP_NTZ(col, 9) recovers it
595            // losslessly — verified live 2026-06-07 (NUMBER 1717761600123456700 →
596            // 2024-06-07 12:00:00.123456700, the 7th digit intact).
597            RivetType::Timestamp {
598                unit: TimeUnit::Nanosecond,
599                timezone: None,
600            } => Resolved::diverge(
601                "TIMESTAMP_NTZ",
602                "NUMBER(38,0)",
603                "nanosecond timestamp autoloads as NUMBER (ns since epoch); recover with \
604                 TO_TIMESTAMP_NTZ(col, 9) after load — Snowflake TIMESTAMP_NTZ holds full ns precision",
605                Some(r#"TO_TIMESTAMP_NTZ("{col}", 9)"#),
606            ),
607            // naive timestamp autoloads as NUMBER (µs since epoch).
608            RivetType::Timestamp { timezone: None, .. } => Resolved::diverge(
609                "TIMESTAMP_NTZ",
610                "NUMBER(38,0)",
611                "naive timestamp autoloads as NUMBER (µs since epoch); recover with TO_TIMESTAMP_NTZ after load",
612                Some(r#"TO_TIMESTAMP_NTZ("{col}", 6)"#),
613            ),
614            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("TEXT"),
615            // bytea/blob needs BINARY_AS_TEXT=FALSE or non-UTF8 bytes fail.
616            RivetType::Binary => Resolved::warn(
617                "BINARY",
618                "set BINARY_AS_TEXT=FALSE in the Parquet FILE FORMAT or non-UTF8 bytes fail to load",
619            ),
620            // JSON autoloads as TEXT; PARSE_JSON recovers native VARIANT.
621            RivetType::Json => Resolved::diverge(
622                "VARIANT",
623                "TEXT",
624                "JSON autoloads as TEXT; recover native VARIANT with PARSE_JSON after load",
625                Some(r#"PARSE_JSON("{col}")"#),
626            ),
627            // UUID (FixedSizeBinary 16) autoloads as 16-byte BINARY.
628            RivetType::Uuid => Resolved::diverge(
629                "TEXT",
630                "BINARY",
631                "UUID autoloads as 16-byte BINARY; recover canonical text with HEX_ENCODE + REGEXP after load",
632                Some(
633                    r#"REGEXP_REPLACE(LOWER(HEX_ENCODE("{col}")),'^(.{8})(.{4})(.{4})(.{4})(.{12})$','\\1-\\2-\\3-\\4-\\5')"#,
634                ),
635            ),
636            RivetType::Interval => Resolved::ok("TEXT"),
637            // A Parquet list autoloads as VARIANT (holding the JSON array), not
638            // native ARRAY — verified live 2026-06-01: INFER_SCHEMA reports
639            // VARIANT for both `tags` (text[]) and `nums` (int[]). Recover the
640            // native ARRAY with `::ARRAY` after load.
641            RivetType::List { inner } => {
642                let inner_r = native(inner);
643                if inner_r.status == TargetStatus::Fail {
644                    Resolved::fail(format!(
645                        "ARRAY of unsupported element: {}",
646                        inner_r.target_type
647                    ))
648                } else {
649                    Resolved::diverge(
650                        "ARRAY",
651                        "VARIANT",
652                        "list autoloads as VARIANT (the JSON array); recover native ARRAY with ::ARRAY after load",
653                        Some(r#""{col}"::ARRAY"#),
654                    )
655                }
656            }
657            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
658        }
659    }
660}
661
662// ── ClickHouse ───────────────────────────────────────────────────────────────
663
664fn clickhouse_recovery_sql(specs: &[TargetColumnSpec], table: &str) -> String {
665    let cols = recovery_projection(specs, |name| format!("  {name}"));
666    format!(
667        "-- 1) load the Parquet into a staging table, e.g.\n\
668         --    CREATE TABLE {table}__staging ENGINE = MergeTree ORDER BY tuple() AS\n\
669         --      SELECT * FROM file('<parquet>', 'Parquet');\n\
670         -- 2) recover native types:\n\
671         CREATE TABLE {table} ENGINE = MergeTree ORDER BY tuple() AS\n\
672         SELECT\n{cols}\n\
673         FROM {table}__staging;"
674    )
675}
676
677mod clickhouse {
678    use super::*;
679
680    pub(super) fn resolve(input: &TargetInput<'_>) -> TargetColumnSpec {
681        native(input.rivet_type).into_spec(input)
682    }
683
684    /// ClickHouse Parquet autoload (`file(..., 'Parquet')`) — pinned against the
685    /// live `clickhouse_load` matrix. The most faithful warehouse target: native
686    /// `UInt64`, `Decimal`, `DateTime64` (naive *and* tz) and `Array`, so most
687    /// types autoload exactly. Divergences: `UUID` -> `FixedString(16)`, `JSON`
688    /// -> `String`, and `TIME` (no native type -> `Int64`, µs of day).
689    fn native(t: &RivetType) -> Resolved {
690        match t {
691            RivetType::Bool => Resolved::ok("Bool"),
692            RivetType::Int16 => Resolved::ok("Int16"),
693            RivetType::Int32 => Resolved::ok("Int32"),
694            RivetType::Int64 => Resolved::ok("Int64"),
695            // Native unsigned 64-bit — ClickHouse holds the full UInt64 range, so
696            // the overflow that forces BigQuery/Snowflake to a load-schema note
697            // never happens here. The headline difference from the cloud warehouses.
698            RivetType::UInt64 => Resolved::ok("UInt64"),
699            RivetType::Float32 => Resolved::ok("Float32"),
700            RivetType::Float64 => Resolved::ok("Float64"),
701            RivetType::Decimal { precision, scale } => {
702                if *scale < 0 {
703                    Resolved::warn(
704                        "Decimal",
705                        format!(
706                            "ClickHouse Decimal has no negative scale; decimal({precision},{scale}) needs a declared schema"
707                        ),
708                    )
709                } else {
710                    Resolved::ok(format!("Decimal({precision}, {scale})"))
711                }
712            }
713            RivetType::Date => Resolved::ok("Date32"),
714            // No time-of-day type; Time64 autoloads as Int64 (µs of day). There is
715            // no native type to recover to — fix at source if a real time matters.
716            RivetType::Time { .. } => Resolved::warn(
717                "Int64",
718                "ClickHouse has no TIME type; time-of-day autoloads as Int64 (µs of day)",
719            ),
720            // DateTime64 holds both naive and tz timestamps natively (verified:
721            // naive -> DateTime64(6), tz -> DateTime64(6, 'UTC')).
722            RivetType::Timestamp { unit, timezone } => {
723                let p = match unit {
724                    TimeUnit::Second => 0,
725                    TimeUnit::Millisecond => 3,
726                    TimeUnit::Microsecond => 6,
727                    TimeUnit::Nanosecond => 9,
728                };
729                match timezone {
730                    Some(tz) => Resolved::ok(format!("DateTime64({p}, '{tz}')")),
731                    None => Resolved::ok(format!("DateTime64({p})")),
732                }
733            }
734            RivetType::String | RivetType::Text | RivetType::Enum => Resolved::ok("String"),
735            // ClickHouse String holds arbitrary bytes, so bytea/blob round-trips
736            // losslessly — no BINARY_AS_TEXT caveat like Snowflake.
737            RivetType::Binary => Resolved::ok("String"),
738            // ClickHouse's native JSON type is a declared column (still settling
739            // across versions); Parquet JSON autoloads as String holding the valid
740            // JSON text. Recover by declaring a JSON column at load — there is no
741            // safe SELECT-time cast, so the recovery is upstream (a note).
742            RivetType::Json => Resolved::diverge(
743                "JSON",
744                "String",
745                "JSON autoloads as String (valid JSON text); declare a JSON column at load for the native type",
746                None,
747            ),
748            // The 16-byte UUID field autoloads as FixedString(16) (verified live).
749            // The bytes are the canonical UUID; recover the native UUID with the
750            // hex -> dashed-text -> toUUID round-trip clickhouse_load pins.
751            RivetType::Uuid => Resolved::diverge(
752                "UUID",
753                "FixedString(16)",
754                "UUID autoloads as FixedString(16); recover the native UUID with toUUID after load",
755                Some(
756                    "toUUID(concat(substring(lower(hex({col})),1,8),'-',substring(lower(hex({col})),9,4),'-',substring(lower(hex({col})),13,4),'-',substring(lower(hex({col})),17,4),'-',substring(lower(hex({col})),21,12)))",
757                ),
758            ),
759            RivetType::Interval => Resolved::ok("String"),
760            // A Parquet list autoloads as a native Array (verified: tags ->
761            // Array(Nullable(String)), nums -> Array(Nullable(Int32))).
762            RivetType::List { inner } => {
763                let inner_r = native(inner);
764                if inner_r.status == TargetStatus::Fail {
765                    Resolved::fail(format!(
766                        "Array of unsupported element: {}",
767                        inner_r.target_type
768                    ))
769                } else {
770                    Resolved::ok(format!("Array(Nullable({}))", inner_r.target_type))
771                }
772            }
773            RivetType::Unsupported { .. } => Resolved::fail(unsupported_reason(t)),
774        }
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    fn input<'a>(rt: &'a RivetType) -> TargetInput<'a> {
783        TargetInput {
784            column_name: "c",
785            rivet_type: rt,
786            arrow_type: None,
787            fidelity: TypeFidelity::Exact,
788        }
789    }
790
791    fn bq(rt: &RivetType) -> TargetColumnSpec {
792        ExportTarget::BigQuery.resolve_column(input(rt))
793    }
794    fn duck(rt: &RivetType) -> TargetColumnSpec {
795        ExportTarget::DuckDb.resolve_column(input(rt))
796    }
797    fn sf(rt: &RivetType) -> TargetColumnSpec {
798        ExportTarget::Snowflake.resolve_column(input(rt))
799    }
800
801    // ── nanosecond timestamp (`timestamp_ns` override) per-target autoload ────
802    // The default `timestamp` is microsecond; ns is opt-in (datetime2(7)). These
803    // pin the autoload truth verified live on 2026-06-07 so the preflight report
804    // doesn't claim the microsecond behaviour for a ns column.
805
806    #[test]
807    fn bq_nanosecond_timestamp_autoloads_as_int64() {
808        let ns = RivetType::Timestamp {
809            unit: super::super::TimeUnit::Nanosecond,
810            timezone: None,
811        };
812        let s = bq(&ns);
813        assert_eq!(s.target_type, "INT64");
814        assert_eq!(s.autoload_type, "INT64");
815        assert_eq!(s.status, TargetStatus::Warn);
816        // No lossless temporal recovery (ns→µs is lossy) — like the UINT64 case.
817        assert!(s.cast_sql.is_none(), "ns→BQ has no lossless temporal cast");
818    }
819
820    #[test]
821    fn duckdb_nanosecond_timestamp_is_native_timestamp_ns() {
822        let ns = RivetType::Timestamp {
823            unit: super::super::TimeUnit::Nanosecond,
824            timezone: None,
825        };
826        let s = duck(&ns);
827        assert_eq!(s.target_type, "TIMESTAMP_NS");
828        assert_eq!(s.status, TargetStatus::Ok);
829    }
830
831    #[test]
832    fn snowflake_nanosecond_timestamp_recovers_losslessly_at_scale_9() {
833        let ns = RivetType::Timestamp {
834            unit: super::super::TimeUnit::Nanosecond,
835            timezone: None,
836        };
837        let s = sf(&ns);
838        assert_eq!(s.target_type, "TIMESTAMP_NTZ");
839        assert_eq!(s.autoload_type, "NUMBER(38,0)");
840        // Lossless recovery (TIMESTAMP_NTZ holds 9 digits) → cast_sql is Some at
841        // scale 9 (not the µs scale 6); `{col}` is substituted with the column.
842        assert_eq!(s.cast_sql.as_deref(), Some(r#"TO_TIMESTAMP_NTZ("c", 9)"#));
843    }
844
845    // ── dispatch on RivetType, not Arrow — the headline fix ──────────────────
846
847    #[test]
848    fn bq_uuid_resolves_not_fails() {
849        // The old arrow-dispatch `bq_compat` hard-failed UUID (FixedSizeBinary
850        // had no arm). Now it resolves on RivetType: native STRING, BYTES on
851        // autoload.
852        let s = bq(&RivetType::Uuid);
853        assert_eq!(s.target_type, "STRING");
854        assert_eq!(s.autoload_type, "BYTES");
855        assert_eq!(s.status, TargetStatus::Warn);
856        assert!(s.cast_sql.unwrap().contains("c"));
857    }
858
859    #[test]
860    fn bq_json_native_is_json_autoload_is_bytes() {
861        let s = bq(&RivetType::Json);
862        assert_eq!(s.target_type, "JSON");
863        assert_eq!(s.autoload_type, "BYTES");
864        assert_eq!(s.status, TargetStatus::Warn);
865        assert!(s.cast_sql.unwrap().starts_with("PARSE_JSON"));
866    }
867
868    #[test]
869    fn bq_naive_timestamp_is_datetime_native_timestamp_autoload() {
870        let naive = RivetType::Timestamp {
871            unit: super::super::TimeUnit::Microsecond,
872            timezone: None,
873        };
874        let s = bq(&naive);
875        assert_eq!(s.target_type, "DATETIME");
876        assert_eq!(s.autoload_type, "TIMESTAMP");
877        assert_eq!(s.status, TargetStatus::Warn);
878    }
879
880    #[test]
881    fn bq_tz_timestamp_is_timestamp_ok() {
882        let tz = RivetType::Timestamp {
883            unit: super::super::TimeUnit::Microsecond,
884            timezone: Some("UTC".into()),
885        };
886        let s = bq(&tz);
887        assert_eq!(s.target_type, "TIMESTAMP");
888        assert_eq!(s.autoload_type, "TIMESTAMP");
889        assert_eq!(s.status, TargetStatus::Ok);
890    }
891
892    #[test]
893    fn bq_decimal_within_numeric_is_numeric() {
894        let s = bq(&RivetType::Decimal {
895            precision: 18,
896            scale: 2,
897        });
898        assert_eq!(s.target_type, "NUMERIC");
899        assert_eq!(s.status, TargetStatus::Ok);
900    }
901
902    #[test]
903    fn bq_decimal_escalates_to_bignumeric() {
904        let s = bq(&RivetType::Decimal {
905            precision: 38,
906            scale: 9,
907        });
908        assert_eq!(s.target_type, "BIGNUMERIC");
909        assert_eq!(s.status, TargetStatus::Ok);
910    }
911
912    #[test]
913    fn bq_decimal_negative_scale_fails() {
914        let s = bq(&RivetType::Decimal {
915            precision: 5,
916            scale: -2,
917        });
918        assert_eq!(s.status, TargetStatus::Fail);
919    }
920
921    #[test]
922    fn bq_uint64_recommends_numeric_warns_overflow() {
923        let s = bq(&RivetType::UInt64);
924        assert_eq!(s.target_type, "NUMERIC");
925        assert_eq!(s.autoload_type, "INT64");
926        assert_eq!(s.status, TargetStatus::Warn);
927    }
928
929    #[test]
930    fn bq_list_is_repeated_native_record_autoload() {
931        let t = RivetType::List {
932            inner: Box::new(RivetType::String),
933        };
934        let s = bq(&t);
935        assert_eq!(s.target_type, "REPEATED STRING");
936        assert!(s.autoload_type.contains("REPEATED RECORD"));
937        assert_eq!(s.status, TargetStatus::Warn);
938    }
939
940    #[test]
941    fn bq_unsupported_is_fail_row_not_panic() {
942        let t = RivetType::Unsupported {
943            native_type: "geometry".into(),
944            reason: "no mapping".into(),
945        };
946        let s = bq(&t);
947        assert_eq!(s.status, TargetStatus::Fail);
948        assert_eq!(s.target_type, "-");
949    }
950
951    #[test]
952    fn bq_standard_scalars_ok() {
953        for (rt, native) in [
954            (RivetType::Bool, "BOOL"),
955            (RivetType::Int64, "INT64"),
956            (RivetType::Float64, "FLOAT64"),
957            (RivetType::Date, "DATE"),
958            (RivetType::String, "STRING"),
959            (RivetType::Binary, "BYTES"),
960            (RivetType::Enum, "STRING"),
961        ] {
962            let s = bq(&rt);
963            assert_eq!(s.target_type, native, "{rt:?}");
964            assert_eq!(s.autoload_type, native, "{rt:?}");
965            assert_eq!(s.status, TargetStatus::Ok, "{rt:?}");
966        }
967    }
968
969    // ── DuckDB honors every logical type — autoload == native ────────────────
970
971    #[test]
972    fn duckdb_reads_everything_natively() {
973        let naive = RivetType::Timestamp {
974            unit: super::super::TimeUnit::Microsecond,
975            timezone: None,
976        };
977        for rt in [
978            RivetType::Json,
979            RivetType::Uuid,
980            RivetType::UInt64,
981            naive,
982            RivetType::List {
983                inner: Box::new(RivetType::Int64),
984            },
985        ] {
986            let s = duck(&rt);
987            assert_eq!(
988                s.target_type, s.autoload_type,
989                "DuckDB autoload must equal native for {rt:?}"
990            );
991            assert_ne!(s.status, TargetStatus::Fail, "{rt:?}");
992        }
993    }
994
995    #[test]
996    fn duckdb_native_type_names() {
997        assert_eq!(duck(&RivetType::Json).target_type, "JSON");
998        assert_eq!(duck(&RivetType::Uuid).target_type, "UUID");
999        assert_eq!(duck(&RivetType::UInt64).target_type, "UBIGINT");
1000        assert_eq!(
1001            duck(&RivetType::Decimal {
1002                precision: 18,
1003                scale: 2
1004            })
1005            .target_type,
1006            "DECIMAL(18,2)"
1007        );
1008        assert_eq!(
1009            duck(&RivetType::List {
1010                inner: Box::new(RivetType::Int64)
1011            })
1012            .target_type,
1013            "BIGINT[]"
1014        );
1015    }
1016
1017    #[test]
1018    fn parse_accepts_aliases() {
1019        assert_eq!(ExportTarget::parse("bq"), Some(ExportTarget::BigQuery));
1020        assert_eq!(
1021            ExportTarget::parse("BigQuery"),
1022            Some(ExportTarget::BigQuery)
1023        );
1024        assert_eq!(ExportTarget::parse("duckdb"), Some(ExportTarget::DuckDb));
1025        assert_eq!(ExportTarget::parse("nope"), None);
1026    }
1027
1028    #[test]
1029    fn resolve_table_preserves_order_and_names() {
1030        use super::super::SourceColumn;
1031        let mappings = vec![
1032            TypeMapping::from_source(&SourceColumn::simple("a", "int8", true), RivetType::Int64),
1033            TypeMapping::from_source(&SourceColumn::simple("b", "jsonb", true), RivetType::Json),
1034        ];
1035        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1036        assert_eq!(specs.len(), 2);
1037        assert_eq!(specs[0].column_name, "a");
1038        assert_eq!(specs[1].column_name, "b");
1039        assert_eq!(specs[1].target_type, "JSON");
1040    }
1041
1042    // ── edge cases: remediation hints must recover from the DEGRADED state ────
1043    // Regression guard for the bug class where the resolver proposes a post-load
1044    // cast that cannot actually recover an already-lossy value (e.g. a UINT64
1045    // that overflowed into INT64). See CLAUDE.md "Remediation hints must recover
1046    // from the degraded state".
1047
1048    #[test]
1049    fn cast_sql_is_none_when_post_load_recovery_is_impossible() {
1050        // UINT64 > INT64_MAX has already overflowed by the time it autoloads as
1051        // INT64; a SELECT-time cast would operate on corrupted bits. The only
1052        // fix is source-side (a decimal override) — cast_sql MUST be None and
1053        // the note must point there, not promise a post-load cast.
1054        let u = bq(&RivetType::UInt64);
1055        assert!(
1056            u.cast_sql.is_none(),
1057            "overflowed UINT64 has no lossless post-load recovery"
1058        );
1059        let note = u.note.unwrap().to_lowercase();
1060        assert!(
1061            note.contains("override"),
1062            "UINT64 note must point to the source-side override, got: {note}"
1063        );
1064    }
1065
1066    #[test]
1067    fn cast_sql_present_only_when_lossless_post_load() {
1068        // JSON/UUID/naive-timestamp autoload to a degraded type but still hold
1069        // the value losslessly, so a post-load cast genuinely recovers it.
1070        assert!(
1071            bq(&RivetType::Json)
1072                .cast_sql
1073                .unwrap()
1074                .contains("PARSE_JSON")
1075        );
1076        assert!(bq(&RivetType::Uuid).cast_sql.unwrap().contains("TO_HEX"));
1077        let naive = RivetType::Timestamp {
1078            unit: super::super::TimeUnit::Microsecond,
1079            timezone: None,
1080        };
1081        assert!(bq(&naive).cast_sql.unwrap().contains("DATETIME"));
1082    }
1083
1084    #[test]
1085    fn every_divergence_offers_a_recovery_path() {
1086        // Invariant: whenever BigQuery autoload diverges from the native type the
1087        // operator gets SOME recovery — a lossless post-load `cast_sql`, or a
1088        // note describing the fix (post-load transform, or a source override).
1089        // Never a silent no-op (the bug class).
1090        let naive = RivetType::Timestamp {
1091            unit: super::super::TimeUnit::Microsecond,
1092            timezone: None,
1093        };
1094        let cases = [
1095            RivetType::Json,
1096            RivetType::Uuid,
1097            RivetType::UInt64,
1098            naive,
1099            RivetType::List {
1100                inner: Box::new(RivetType::String),
1101            },
1102        ];
1103        for rt in cases {
1104            let s = bq(&rt);
1105            assert_ne!(s.autoload_type, s.target_type, "case must diverge: {rt:?}");
1106            let has_cast = s.cast_sql.is_some();
1107            let note = s.note.as_deref().unwrap_or("").to_lowercase();
1108            let describes_recovery = note.contains("after load") || note.contains("override");
1109            assert!(
1110                has_cast || describes_recovery,
1111                "divergent {rt:?} must offer a recovery (cast_sql or a recovery note)"
1112            );
1113        }
1114    }
1115
1116    // ── edge cases: decimal precision/scale overflow at the target boundary ───
1117
1118    #[test]
1119    fn bq_decimal_limit_boundaries() {
1120        // Exact BIGNUMERIC ceiling is ok.
1121        assert_eq!(
1122            bq(&RivetType::Decimal {
1123                precision: 76,
1124                scale: 38
1125            })
1126            .status,
1127            TargetStatus::Ok
1128        );
1129        // One past precision overflows BIGNUMERIC → Fail, never a silent clamp.
1130        assert_eq!(
1131            bq(&RivetType::Decimal {
1132                precision: 77,
1133                scale: 38
1134            })
1135            .status,
1136            TargetStatus::Fail
1137        );
1138        // One past scale → Fail.
1139        assert_eq!(
1140            bq(&RivetType::Decimal {
1141                precision: 76,
1142                scale: 39
1143            })
1144            .status,
1145            TargetStatus::Fail
1146        );
1147        // Between NUMERIC and BIGNUMERIC escalates rather than overflowing NUMERIC.
1148        assert_eq!(
1149            bq(&RivetType::Decimal {
1150                precision: 30,
1151                scale: 0
1152            })
1153            .target_type,
1154            "BIGNUMERIC"
1155        );
1156    }
1157
1158    #[test]
1159    fn duckdb_decimal_over_38_warns_not_silently_clamps() {
1160        let s = duck(&RivetType::Decimal {
1161            precision: 40,
1162            scale: 2,
1163        });
1164        assert_eq!(s.status, TargetStatus::Warn);
1165    }
1166
1167    // ── L5 recovery SQL (the post-load transform for BigQuery autoload) ───────
1168
1169    #[test]
1170    fn bq_recovery_sql_casts_native_types() {
1171        use super::super::{SourceColumn, TimeUnit};
1172        let naive = RivetType::Timestamp {
1173            unit: TimeUnit::Microsecond,
1174            timezone: None,
1175        };
1176        let mappings = vec![
1177            TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1178            TypeMapping::from_source(
1179                &SourceColumn::simple("attrs", "jsonb", true),
1180                RivetType::Json,
1181            ),
1182            TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1183            TypeMapping::from_source(
1184                &SourceColumn::simple("created_at", "timestamp", true),
1185                naive,
1186            ),
1187            TypeMapping::from_source(
1188                &SourceColumn::simple("tags", "_text", true),
1189                RivetType::List {
1190                    inner: Box::new(RivetType::String),
1191                },
1192            ),
1193        ];
1194        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1195        let sql = ExportTarget::BigQuery
1196            .recovery_sql(&specs, "payments")
1197            .expect("BigQuery has a recovery SQL");
1198        // The post-load casts that actually recover native types (verified live
1199        // against BigQuery — a declared-type load is rejected, a cast is not).
1200        assert!(sql.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1201        assert!(sql.contains("TO_HEX(uid) AS uid"));
1202        assert!(sql.contains("DATETIME(created_at) AS created_at"));
1203        // Arrays flatten via UNNEST (verified live; staging loaded with
1204        // --parquet_enable_list_inference).
1205        assert!(sql.contains("ARRAY(SELECT el.item FROM UNNEST(tags) AS el) AS tags"));
1206        assert!(sql.contains("--parquet_enable_list_inference"));
1207        // OK columns pass through unchanged.
1208        assert!(sql.contains("SELECT\n  id"));
1209        // Reads the autoload staging table, writes the recovered table.
1210        assert!(sql.contains("CREATE OR REPLACE TABLE `payments`"));
1211        assert!(sql.contains("FROM `payments__staging`"));
1212    }
1213
1214    #[test]
1215    fn duckdb_needs_no_recovery() {
1216        let mappings = vec![TypeMapping::from_source(
1217            &super::super::SourceColumn::simple("attrs", "json", true),
1218            RivetType::Json,
1219        )];
1220        let specs = ExportTarget::DuckDb.resolve_table(&mappings);
1221        assert!(
1222            ExportTarget::DuckDb.recovery_sql(&specs, "t").is_none(),
1223            "DuckDB autoloads every logical type natively — no recovery needed"
1224        );
1225    }
1226
1227    #[test]
1228    fn recovery_sql_projects_every_column_once_and_only_casts_divergent() {
1229        use super::super::{SourceColumn, TimeUnit};
1230        let naive = RivetType::Timestamp {
1231            unit: TimeUnit::Microsecond,
1232            timezone: None,
1233        };
1234        let cols: [(&str, RivetType); 6] = [
1235            ("id", RivetType::Int64), // ok → passthrough
1236            (
1237                "amount",
1238                RivetType::Decimal {
1239                    precision: 18,
1240                    scale: 2,
1241                },
1242            ), // ok → passthrough
1243            ("attrs", RivetType::Json), // divergent → cast
1244            ("uid", RivetType::Uuid), // divergent → cast
1245            ("created_at", naive),    // divergent → cast
1246            (
1247                "tags",
1248                RivetType::List {
1249                    inner: Box::new(RivetType::String),
1250                },
1251            ), // divergent → cast
1252        ];
1253        let mappings: Vec<_> = cols
1254            .iter()
1255            .cloned()
1256            .map(|(n, rt)| TypeMapping::from_source(&SourceColumn::simple(n, "x", true), rt))
1257            .collect();
1258        let specs = ExportTarget::BigQuery.resolve_table(&mappings);
1259        let sql = ExportTarget::BigQuery.recovery_sql(&specs, "t").unwrap();
1260
1261        // The SELECT projects exactly one item per input column — nothing dropped,
1262        // nothing duplicated.
1263        let body = sql
1264            .split("SELECT\n")
1265            .nth(1)
1266            .and_then(|s| s.split("\nFROM").next())
1267            .expect("recovery SQL has a SELECT … FROM body");
1268        assert_eq!(
1269            body.split(",\n").count(),
1270            cols.len(),
1271            "one projection per column, got:\n{body}"
1272        );
1273        for (name, _) in &cols {
1274            assert!(body.contains(name), "column {name} missing:\n{body}");
1275        }
1276        // OK columns pass through unchanged (bare `  name,`); divergent ones
1277        // carry their cast (`… AS name`).
1278        assert!(body.contains("  id,") && !body.contains("AS id"));
1279        assert!(body.contains("  amount,") && !body.contains("AS amount"));
1280        assert!(body.contains("PARSE_JSON(SAFE_CONVERT_BYTES_TO_STRING(attrs)) AS attrs"));
1281        assert!(body.contains("TO_HEX(uid) AS uid"));
1282        assert!(body.contains("DATETIME(created_at) AS created_at"));
1283        assert!(body.contains("UNNEST(tags) AS el) AS tags"));
1284    }
1285
1286    // ── Snowflake (verified live 2026-06-01) ─────────────────────────────────
1287
1288    #[test]
1289    fn snowflake_autoload_degradations_and_native_casts() {
1290        // JSON → TEXT autoload / VARIANT native, recover PARSE_JSON.
1291        let j = sf(&RivetType::Json);
1292        assert_eq!(j.target_type, "VARIANT");
1293        assert_eq!(j.autoload_type, "TEXT");
1294        assert!(j.cast_sql.unwrap().starts_with("PARSE_JSON"));
1295        // UUID → BINARY autoload / TEXT native, recover via HEX_ENCODE + REGEXP.
1296        let u = sf(&RivetType::Uuid);
1297        assert_eq!(u.target_type, "TEXT");
1298        assert_eq!(u.autoload_type, "BINARY");
1299        assert!(u.cast_sql.unwrap().contains("HEX_ENCODE"));
1300        // naive timestamp → NUMBER autoload / TIMESTAMP_NTZ native.
1301        let naive = RivetType::Timestamp {
1302            unit: super::super::TimeUnit::Microsecond,
1303            timezone: None,
1304        };
1305        let t = sf(&naive);
1306        assert_eq!(t.target_type, "TIMESTAMP_NTZ");
1307        assert_eq!(t.autoload_type, "NUMBER(38,0)");
1308        assert!(t.cast_sql.unwrap().contains("TO_TIMESTAMP_NTZ"));
1309        // TIME → NUMBER autoload, recover TIME_FROM_PARTS.
1310        let tm = sf(&RivetType::Time {
1311            unit: super::super::TimeUnit::Microsecond,
1312        });
1313        assert_eq!(tm.target_type, "TIME");
1314        assert!(tm.cast_sql.unwrap().contains("TIME_FROM_PARTS"));
1315        // decimal is native NUMBER(p,s) — no cast.
1316        let d = sf(&RivetType::Decimal {
1317            precision: 18,
1318            scale: 2,
1319        });
1320        assert_eq!(d.target_type, "NUMBER(18,2)");
1321        assert!(d.cast_sql.is_none());
1322        // list autoloads as VARIANT (verified live), recover native ARRAY with ::ARRAY.
1323        let l = sf(&RivetType::List {
1324            inner: Box::new(RivetType::Int64),
1325        });
1326        assert_eq!(l.target_type, "ARRAY");
1327        assert_eq!(l.autoload_type, "VARIANT");
1328        assert!(l.cast_sql.unwrap().ends_with("::ARRAY"));
1329    }
1330
1331    #[test]
1332    fn snowflake_recovery_sql_quotes_columns_and_casts() {
1333        use super::super::{SourceColumn, TimeUnit};
1334        let naive = RivetType::Timestamp {
1335            unit: TimeUnit::Microsecond,
1336            timezone: None,
1337        };
1338        let mappings = vec![
1339            TypeMapping::from_source(&SourceColumn::simple("id", "int8", true), RivetType::Int64),
1340            TypeMapping::from_source(
1341                &SourceColumn::simple("attrs", "jsonb", true),
1342                RivetType::Json,
1343            ),
1344            TypeMapping::from_source(&SourceColumn::simple("uid", "uuid", true), RivetType::Uuid),
1345            TypeMapping::from_source(
1346                &SourceColumn::simple("created_at", "timestamp", true),
1347                naive,
1348            ),
1349        ];
1350        let specs = ExportTarget::Snowflake.resolve_table(&mappings);
1351        let sql = ExportTarget::Snowflake.recovery_sql(&specs, "t").unwrap();
1352        // Staging columns are lowercase + quoted; passthrough quotes the source.
1353        assert!(sql.contains("\"id\" AS id"));
1354        assert!(sql.contains("PARSE_JSON(\"attrs\") AS attrs"));
1355        assert!(sql.contains("HEX_ENCODE(\"uid\")"));
1356        assert!(sql.contains("TO_TIMESTAMP_NTZ(\"created_at\", 6) AS created_at"));
1357        // The load preamble the recovery depends on.
1358        assert!(sql.contains("BINARY_AS_TEXT=FALSE"));
1359        assert!(sql.contains("MATCH_BY_COLUMN_NAME"));
1360        assert!(sql.contains("FROM t__staging"));
1361    }
1362
1363    #[test]
1364    fn parse_accepts_snowflake() {
1365        assert_eq!(
1366            ExportTarget::parse("snowflake"),
1367            Some(ExportTarget::Snowflake)
1368        );
1369        assert_eq!(ExportTarget::parse("sf"), Some(ExportTarget::Snowflake));
1370    }
1371
1372    #[test]
1373    fn parse_accepts_clickhouse() {
1374        assert_eq!(
1375            ExportTarget::parse("clickhouse"),
1376            Some(ExportTarget::ClickHouse)
1377        );
1378        assert_eq!(ExportTarget::parse("ch"), Some(ExportTarget::ClickHouse));
1379    }
1380
1381    #[test]
1382    fn valid_target_names_lists_every_parseable_target() {
1383        // The "unknown target" error message reads from this; it must name
1384        // every target `parse` accepts, or the hint sends operators wrong.
1385        // snowflake regression: the old message hard-coded "bigquery, duckdb".
1386        let names = ExportTarget::valid_target_names();
1387        assert!(names.contains("snowflake"), "got: {names}");
1388        assert!(names.contains("bigquery"), "got: {names}");
1389        assert!(names.contains("duckdb"), "got: {names}");
1390        assert!(names.contains("clickhouse"), "got: {names}");
1391    }
1392}