1use arrow::datatypes::DataType;
11use serde::Serialize;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ExportTarget {
16 BigQuery,
17}
18
19impl ExportTarget {
20 #[allow(dead_code)]
21 pub fn parse(s: &str) -> Option<Self> {
22 match s.to_lowercase().as_str() {
23 "bigquery" | "bq" => Some(Self::BigQuery),
24 _ => None,
25 }
26 }
27
28 pub fn label(self) -> &'static str {
29 match self {
30 Self::BigQuery => "bigquery",
31 }
32 }
33}
34
35#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
37#[serde(rename_all = "snake_case")]
38pub enum TargetStatus {
39 Ok,
40 Warn,
41 Fail,
42}
43
44impl TargetStatus {
45 pub fn label(&self) -> &'static str {
46 match self {
47 Self::Ok => "ok",
48 Self::Warn => "warn",
49 Self::Fail => "fail",
50 }
51 }
52}
53
54#[derive(Debug, Clone, Serialize)]
56pub struct TargetCompat {
57 pub target_type: String,
59 pub status: TargetStatus,
60 #[serde(skip_serializing_if = "Option::is_none")]
62 pub note: Option<String>,
63}
64
65impl TargetCompat {
66 fn ok(target_type: impl Into<String>) -> Self {
67 Self {
68 target_type: target_type.into(),
69 status: TargetStatus::Ok,
70 note: None,
71 }
72 }
73 fn warn(target_type: impl Into<String>, note: impl Into<String>) -> Self {
74 Self {
75 target_type: target_type.into(),
76 status: TargetStatus::Warn,
77 note: Some(note.into()),
78 }
79 }
80 fn fail(note: impl Into<String>) -> Self {
81 Self {
82 target_type: "-".into(),
83 status: TargetStatus::Fail,
84 note: Some(note.into()),
85 }
86 }
87}
88
89pub fn check_target_compat(arrow_type: Option<&DataType>, target: ExportTarget) -> TargetCompat {
94 match target {
95 ExportTarget::BigQuery => bq_compat(arrow_type),
96 }
97}
98
99const BQ_NUMERIC_MAX_P: u8 = 29;
103const BQ_NUMERIC_MAX_S: i8 = 9;
104const BQ_BIGNUMERIC_MAX_P: u8 = 76;
106const BQ_BIGNUMERIC_MAX_S: i8 = 38;
107
108fn bq_compat(arrow_type: Option<&DataType>) -> TargetCompat {
109 let Some(dt) = arrow_type else {
110 return TargetCompat::fail(
111 "no Arrow type — column is unsupported or needs a type override",
112 );
113 };
114
115 match dt {
116 DataType::Boolean => TargetCompat::ok("BOOL"),
117 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
118 TargetCompat::ok("INT64")
119 }
120 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => TargetCompat::ok("INT64"),
121 DataType::UInt64 => TargetCompat::warn(
122 "INT64",
123 "UINT64 has no direct BigQuery type; values > INT64_MAX will overflow",
124 ),
125 DataType::Float32 => TargetCompat::ok("FLOAT64"),
126 DataType::Float64 => TargetCompat::ok("FLOAT64"),
127
128 DataType::Decimal128(p, s) => bq_decimal_compat(*p, *s),
129 DataType::Decimal256(p, s) => bq_decimal256_compat(*p, *s),
130
131 DataType::Date32 | DataType::Date64 => TargetCompat::ok("DATE"),
132 DataType::Time32(_) | DataType::Time64(_) => TargetCompat::ok("TIME"),
133
134 DataType::Timestamp(_, Some(_)) => TargetCompat::ok("TIMESTAMP"),
135 DataType::Timestamp(_, None) => TargetCompat::ok("DATETIME"),
136
137 DataType::Utf8 | DataType::LargeUtf8 => TargetCompat::ok("STRING"),
138 DataType::Binary | DataType::LargeBinary => TargetCompat::ok("BYTES"),
139
140 DataType::Interval(_) => TargetCompat::warn(
142 "INTERVAL",
143 "BigQuery INTERVAL has limited arithmetic support; verify downstream queries",
144 ),
145
146 DataType::List(field_ref) | DataType::LargeList(field_ref) => {
148 let inner_compat = bq_compat(Some(field_ref.data_type()));
149 match inner_compat.status {
150 TargetStatus::Fail => TargetCompat::fail(format!(
151 "REPEATED {} (inner type unsupported: {})",
152 inner_compat.target_type,
153 inner_compat.note.unwrap_or_default()
154 )),
155 TargetStatus::Warn => TargetCompat::warn(
156 format!("REPEATED {}", inner_compat.target_type),
157 inner_compat.note.unwrap_or_default(),
158 ),
159 TargetStatus::Ok => {
160 TargetCompat::ok(format!("REPEATED {}", inner_compat.target_type))
161 }
162 }
163 }
164
165 other => TargetCompat::fail(format!(
166 "Arrow type {other:?} has no supported BigQuery mapping"
167 )),
168 }
169}
170
171fn bq_decimal_compat(p: u8, s: i8) -> TargetCompat {
172 if s < 0 {
174 return TargetCompat::fail(format!(
175 "BigQuery does not support negative scale; \
176 Decimal128({p},{s}) would need to be cast to STRING or INT64"
177 ));
178 }
179 if p <= BQ_NUMERIC_MAX_P && s <= BQ_NUMERIC_MAX_S {
180 return TargetCompat::ok("NUMERIC");
181 }
182 if p <= BQ_BIGNUMERIC_MAX_P && s <= BQ_BIGNUMERIC_MAX_S {
184 return TargetCompat::ok("BIGNUMERIC");
185 }
186 TargetCompat::fail(format!(
187 "Decimal128({p},{s}) exceeds BigQuery BIGNUMERIC limits \
188 (max precision 76, max scale 38)"
189 ))
190}
191
192fn bq_decimal256_compat(p: u8, s: i8) -> TargetCompat {
193 if s < 0 {
194 return TargetCompat::fail(format!(
195 "BigQuery does not support negative scale; \
196 Decimal256({p},{s}) cannot be loaded directly"
197 ));
198 }
199 if p <= BQ_BIGNUMERIC_MAX_P && s <= BQ_BIGNUMERIC_MAX_S {
200 if p > BQ_BIGNUMERIC_MAX_P - 5 || s > BQ_BIGNUMERIC_MAX_S - 5 {
202 return TargetCompat::warn(
203 "BIGNUMERIC",
204 format!(
205 "Decimal256({p},{s}) is near BigQuery BIGNUMERIC limits \
206 (max 76,38); verify no overflow at load time"
207 ),
208 );
209 }
210 return TargetCompat::ok("BIGNUMERIC");
211 }
212 TargetCompat::fail(format!(
213 "Decimal256({p},{s}) exceeds BigQuery BIGNUMERIC limits \
214 (max precision 76, max scale 38)"
215 ))
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use arrow::datatypes::TimeUnit as ArrowTimeUnit;
222
223 fn bq(dt: DataType) -> TargetCompat {
224 check_target_compat(Some(&dt), ExportTarget::BigQuery)
225 }
226
227 #[test]
228 fn numeric_within_limits_maps_to_numeric() {
229 let c = bq(DataType::Decimal128(18, 2));
230 assert_eq!(c.target_type, "NUMERIC");
231 assert_eq!(c.status, TargetStatus::Ok);
232 }
233
234 #[test]
235 fn numeric_above_29_precision_escalates_to_bignumeric() {
236 let c = bq(DataType::Decimal128(38, 9));
237 assert_eq!(c.target_type, "BIGNUMERIC");
238 assert_eq!(c.status, TargetStatus::Ok);
239 }
240
241 #[test]
242 fn numeric_scale_above_9_goes_bignumeric() {
243 let c = bq(DataType::Decimal128(18, 12));
244 assert_eq!(c.target_type, "BIGNUMERIC");
245 assert_eq!(c.status, TargetStatus::Ok);
246 }
247
248 #[test]
249 fn decimal256_normal_maps_to_bignumeric() {
250 let c = bq(DataType::Decimal256(60, 20));
251 assert_eq!(c.target_type, "BIGNUMERIC");
252 assert_eq!(c.status, TargetStatus::Ok);
253 }
254
255 #[test]
256 fn decimal256_near_limit_warns() {
257 let c = bq(DataType::Decimal256(75, 35));
258 assert_eq!(c.target_type, "BIGNUMERIC");
259 assert_eq!(c.status, TargetStatus::Warn);
260 }
261
262 #[test]
263 fn decimal_negative_scale_fails() {
264 let c = bq(DataType::Decimal128(5, -2));
265 assert_eq!(c.status, TargetStatus::Fail);
266 }
267
268 #[test]
269 fn timestamptz_maps_to_timestamp() {
270 let c = bq(DataType::Timestamp(
271 ArrowTimeUnit::Microsecond,
272 Some("UTC".into()),
273 ));
274 assert_eq!(c.target_type, "TIMESTAMP");
275 assert_eq!(c.status, TargetStatus::Ok);
276 }
277
278 #[test]
279 fn timestamp_no_tz_maps_to_datetime() {
280 let c = bq(DataType::Timestamp(ArrowTimeUnit::Microsecond, None));
281 assert_eq!(c.target_type, "DATETIME");
282 assert_eq!(c.status, TargetStatus::Ok);
283 }
284
285 #[test]
286 fn none_arrow_type_fails() {
287 let c = check_target_compat(None, ExportTarget::BigQuery);
288 assert_eq!(c.status, TargetStatus::Fail);
289 }
290
291 #[test]
292 fn uint64_warns_about_overflow() {
293 let c = bq(DataType::UInt64);
294 assert_eq!(c.target_type, "INT64");
295 assert_eq!(c.status, TargetStatus::Warn);
296 }
297
298 #[test]
299 fn standard_types_are_ok() {
300 for (dt, expected_bq) in [
301 (DataType::Boolean, "BOOL"),
302 (DataType::Int64, "INT64"),
303 (DataType::Float64, "FLOAT64"),
304 (DataType::Date32, "DATE"),
305 (DataType::Utf8, "STRING"),
306 (DataType::Binary, "BYTES"),
307 ] {
308 let c = bq(dt);
309 assert_eq!(
310 c.status,
311 TargetStatus::Ok,
312 "Arrow {:?} should be ok",
313 c.target_type
314 );
315 assert_eq!(c.target_type, expected_bq);
316 }
317 }
318}