Skip to main content

rivet/types/
target.rs

1//! Export target compatibility checks (roadmap §16).
2//!
3//! Validates that an Arrow type produced by Rivet will load into the target
4//! warehouse as the expected type. Currently supports BigQuery.
5//!
6//! BigQuery numeric limits (as of 2025):
7//!   NUMERIC    — precision 1–29, scale 0–9
8//!   BIGNUMERIC — precision 1–76, scale 0–38
9
10use arrow::datatypes::DataType;
11use serde::Serialize;
12
13/// A supported downstream warehouse target.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ExportTarget {
16    BigQuery,
17}
18
19impl ExportTarget {
20    #[allow(dead_code)]
21    pub fn parse(s: &str) -> Option<Self> {
22        match s.to_lowercase().as_str() {
23            "bigquery" | "bq" => Some(Self::BigQuery),
24            _ => None,
25        }
26    }
27
28    pub fn label(self) -> &'static str {
29        match self {
30            Self::BigQuery => "bigquery",
31        }
32    }
33}
34
35/// Status of a column's Arrow type against a specific target.
36#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
37#[serde(rename_all = "snake_case")]
38pub enum TargetStatus {
39    Ok,
40    Warn,
41    Fail,
42}
43
44impl TargetStatus {
45    pub fn label(&self) -> &'static str {
46        match self {
47            Self::Ok => "ok",
48            Self::Warn => "warn",
49            Self::Fail => "fail",
50        }
51    }
52}
53
54/// One column's compatibility result against a target.
55#[derive(Debug, Clone, Serialize)]
56pub struct TargetCompat {
57    /// Target type name (e.g. "NUMERIC", "BIGNUMERIC", "TIMESTAMP").
58    pub target_type: String,
59    pub status: TargetStatus,
60    /// Optional note explaining a warning or failure.
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub note: Option<String>,
63}
64
65impl TargetCompat {
66    fn ok(target_type: impl Into<String>) -> Self {
67        Self {
68            target_type: target_type.into(),
69            status: TargetStatus::Ok,
70            note: None,
71        }
72    }
73    fn warn(target_type: impl Into<String>, note: impl Into<String>) -> Self {
74        Self {
75            target_type: target_type.into(),
76            status: TargetStatus::Warn,
77            note: Some(note.into()),
78        }
79    }
80    fn fail(note: impl Into<String>) -> Self {
81        Self {
82            target_type: "-".into(),
83            status: TargetStatus::Fail,
84            note: Some(note.into()),
85        }
86    }
87}
88
89/// Check whether `arrow_type` (the type Rivet will write into Parquet) will
90/// produce the expected type when loaded into `target`.
91///
92/// Returns `None` if `arrow_type` is `None` (unresolved / unsupported column).
93pub fn check_target_compat(arrow_type: Option<&DataType>, target: ExportTarget) -> TargetCompat {
94    match target {
95        ExportTarget::BigQuery => bq_compat(arrow_type),
96    }
97}
98
99// ── BigQuery ─────────────────────────────────────────────────────────────────
100
101/// BigQuery NUMERIC precision/scale limits.
102const BQ_NUMERIC_MAX_P: u8 = 29;
103const BQ_NUMERIC_MAX_S: i8 = 9;
104/// BigQuery BIGNUMERIC precision/scale limits.
105const BQ_BIGNUMERIC_MAX_P: u8 = 76;
106const BQ_BIGNUMERIC_MAX_S: i8 = 38;
107
108fn bq_compat(arrow_type: Option<&DataType>) -> TargetCompat {
109    let Some(dt) = arrow_type else {
110        return TargetCompat::fail(
111            "no Arrow type — column is unsupported or needs a type override",
112        );
113    };
114
115    match dt {
116        DataType::Boolean => TargetCompat::ok("BOOL"),
117        DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
118            TargetCompat::ok("INT64")
119        }
120        DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => TargetCompat::ok("INT64"),
121        DataType::UInt64 => TargetCompat::warn(
122            "INT64",
123            "UINT64 has no direct BigQuery type; values > INT64_MAX will overflow",
124        ),
125        DataType::Float32 => TargetCompat::ok("FLOAT64"),
126        DataType::Float64 => TargetCompat::ok("FLOAT64"),
127
128        DataType::Decimal128(p, s) => bq_decimal_compat(*p, *s),
129        DataType::Decimal256(p, s) => bq_decimal256_compat(*p, *s),
130
131        DataType::Date32 | DataType::Date64 => TargetCompat::ok("DATE"),
132        DataType::Time32(_) | DataType::Time64(_) => TargetCompat::ok("TIME"),
133
134        DataType::Timestamp(_, Some(_)) => TargetCompat::ok("TIMESTAMP"),
135        DataType::Timestamp(_, None) => TargetCompat::ok("DATETIME"),
136
137        DataType::Utf8 | DataType::LargeUtf8 => TargetCompat::ok("STRING"),
138        DataType::Binary | DataType::LargeBinary => TargetCompat::ok("BYTES"),
139
140        // BigQuery supports INTERVAL since 2022 but with limited arithmetic.
141        DataType::Interval(_) => TargetCompat::warn(
142            "INTERVAL",
143            "BigQuery INTERVAL has limited arithmetic support; verify downstream queries",
144        ),
145
146        // Arrow List maps to BigQuery REPEATED fields in Parquet loads.
147        DataType::List(field_ref) | DataType::LargeList(field_ref) => {
148            let inner_compat = bq_compat(Some(field_ref.data_type()));
149            match inner_compat.status {
150                TargetStatus::Fail => TargetCompat::fail(format!(
151                    "REPEATED {} (inner type unsupported: {})",
152                    inner_compat.target_type,
153                    inner_compat.note.unwrap_or_default()
154                )),
155                TargetStatus::Warn => TargetCompat::warn(
156                    format!("REPEATED {}", inner_compat.target_type),
157                    inner_compat.note.unwrap_or_default(),
158                ),
159                TargetStatus::Ok => {
160                    TargetCompat::ok(format!("REPEATED {}", inner_compat.target_type))
161                }
162            }
163        }
164
165        other => TargetCompat::fail(format!(
166            "Arrow type {other:?} has no supported BigQuery mapping"
167        )),
168    }
169}
170
171fn bq_decimal_compat(p: u8, s: i8) -> TargetCompat {
172    // Negative scale (e.g. numeric(5,-2)) is not supported by BigQuery NUMERIC/BIGNUMERIC.
173    if s < 0 {
174        return TargetCompat::fail(format!(
175            "BigQuery does not support negative scale; \
176             Decimal128({p},{s}) would need to be cast to STRING or INT64"
177        ));
178    }
179    if p <= BQ_NUMERIC_MAX_P && s <= BQ_NUMERIC_MAX_S {
180        return TargetCompat::ok("NUMERIC");
181    }
182    // Fits in BIGNUMERIC?
183    if p <= BQ_BIGNUMERIC_MAX_P && s <= BQ_BIGNUMERIC_MAX_S {
184        return TargetCompat::ok("BIGNUMERIC");
185    }
186    TargetCompat::fail(format!(
187        "Decimal128({p},{s}) exceeds BigQuery BIGNUMERIC limits \
188         (max precision 76, max scale 38)"
189    ))
190}
191
192fn bq_decimal256_compat(p: u8, s: i8) -> TargetCompat {
193    if s < 0 {
194        return TargetCompat::fail(format!(
195            "BigQuery does not support negative scale; \
196             Decimal256({p},{s}) cannot be loaded directly"
197        ));
198    }
199    if p <= BQ_BIGNUMERIC_MAX_P && s <= BQ_BIGNUMERIC_MAX_S {
200        // Close to the limit — warn if within 5 of the ceiling.
201        if p > BQ_BIGNUMERIC_MAX_P - 5 || s > BQ_BIGNUMERIC_MAX_S - 5 {
202            return TargetCompat::warn(
203                "BIGNUMERIC",
204                format!(
205                    "Decimal256({p},{s}) is near BigQuery BIGNUMERIC limits \
206                     (max 76,38); verify no overflow at load time"
207                ),
208            );
209        }
210        return TargetCompat::ok("BIGNUMERIC");
211    }
212    TargetCompat::fail(format!(
213        "Decimal256({p},{s}) exceeds BigQuery BIGNUMERIC limits \
214         (max precision 76, max scale 38)"
215    ))
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use arrow::datatypes::TimeUnit as ArrowTimeUnit;
222
223    fn bq(dt: DataType) -> TargetCompat {
224        check_target_compat(Some(&dt), ExportTarget::BigQuery)
225    }
226
227    #[test]
228    fn numeric_within_limits_maps_to_numeric() {
229        let c = bq(DataType::Decimal128(18, 2));
230        assert_eq!(c.target_type, "NUMERIC");
231        assert_eq!(c.status, TargetStatus::Ok);
232    }
233
234    #[test]
235    fn numeric_above_29_precision_escalates_to_bignumeric() {
236        let c = bq(DataType::Decimal128(38, 9));
237        assert_eq!(c.target_type, "BIGNUMERIC");
238        assert_eq!(c.status, TargetStatus::Ok);
239    }
240
241    #[test]
242    fn numeric_scale_above_9_goes_bignumeric() {
243        let c = bq(DataType::Decimal128(18, 12));
244        assert_eq!(c.target_type, "BIGNUMERIC");
245        assert_eq!(c.status, TargetStatus::Ok);
246    }
247
248    #[test]
249    fn decimal256_normal_maps_to_bignumeric() {
250        let c = bq(DataType::Decimal256(60, 20));
251        assert_eq!(c.target_type, "BIGNUMERIC");
252        assert_eq!(c.status, TargetStatus::Ok);
253    }
254
255    #[test]
256    fn decimal256_near_limit_warns() {
257        let c = bq(DataType::Decimal256(75, 35));
258        assert_eq!(c.target_type, "BIGNUMERIC");
259        assert_eq!(c.status, TargetStatus::Warn);
260    }
261
262    #[test]
263    fn decimal_negative_scale_fails() {
264        let c = bq(DataType::Decimal128(5, -2));
265        assert_eq!(c.status, TargetStatus::Fail);
266    }
267
268    #[test]
269    fn timestamptz_maps_to_timestamp() {
270        let c = bq(DataType::Timestamp(
271            ArrowTimeUnit::Microsecond,
272            Some("UTC".into()),
273        ));
274        assert_eq!(c.target_type, "TIMESTAMP");
275        assert_eq!(c.status, TargetStatus::Ok);
276    }
277
278    #[test]
279    fn timestamp_no_tz_maps_to_datetime() {
280        let c = bq(DataType::Timestamp(ArrowTimeUnit::Microsecond, None));
281        assert_eq!(c.target_type, "DATETIME");
282        assert_eq!(c.status, TargetStatus::Ok);
283    }
284
285    #[test]
286    fn none_arrow_type_fails() {
287        let c = check_target_compat(None, ExportTarget::BigQuery);
288        assert_eq!(c.status, TargetStatus::Fail);
289    }
290
291    #[test]
292    fn uint64_warns_about_overflow() {
293        let c = bq(DataType::UInt64);
294        assert_eq!(c.target_type, "INT64");
295        assert_eq!(c.status, TargetStatus::Warn);
296    }
297
298    #[test]
299    fn standard_types_are_ok() {
300        for (dt, expected_bq) in [
301            (DataType::Boolean, "BOOL"),
302            (DataType::Int64, "INT64"),
303            (DataType::Float64, "FLOAT64"),
304            (DataType::Date32, "DATE"),
305            (DataType::Utf8, "STRING"),
306            (DataType::Binary, "BYTES"),
307        ] {
308            let c = bq(dt);
309            assert_eq!(
310                c.status,
311                TargetStatus::Ok,
312                "Arrow {:?} should be ok",
313                c.target_type
314            );
315            assert_eq!(c.target_type, expected_bq);
316        }
317    }
318}