1use std::sync::Arc;
16
17use super::{Interval, NumberValue, Value};
18use crate::error::{ConvertError, Error};
19use crate::value::base::GeoValue;
20use arrow_array::{
21 Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Decimal256Array,
22 Decimal64Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
23 LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
24 StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array,
25 UInt8Array,
26};
27use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
28use databend_client::schema::{
29 DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
30 ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
31 ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
32 EXTENSION_KEY,
33};
34use databend_client::ResultFormatSettings;
35use ethnum::i256;
36use jiff::{tz, Timestamp};
37use jsonb::RawJsonb;
38
39#[allow(non_camel_case_types)]
41#[repr(C)]
42struct months_days_micros(pub i128);
43
44const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
46const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
48
49const TIMESTAMP_MAX: i64 = 253402207200000000;
52const TIMESTAMP_MIN: i64 = -377705023201000000;
54
55fn clamp_ts(ts: i64) -> i64 {
57 ts.clamp(TIMESTAMP_MIN, TIMESTAMP_MAX)
58}
59
60impl months_days_micros {
61 #[inline]
62 pub fn months(&self) -> i32 {
63 ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
64 }
65
66 #[inline]
67 pub fn days(&self) -> i32 {
68 ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
69 }
70
71 #[inline]
72 pub fn microseconds(&self) -> i64 {
73 (self.0 & MICROS_MASK) as i64
74 }
75}
76
77impl
78 TryFrom<(
79 &ArrowField,
80 &Arc<dyn ArrowArray>,
81 usize,
82 &ResultFormatSettings,
83 )> for Value
84{
85 type Error = Error;
86 fn try_from(
87 (field, array, seq, settings): (
88 &ArrowField,
89 &Arc<dyn ArrowArray>,
90 usize,
91 &ResultFormatSettings,
92 ),
93 ) -> std::result::Result<Self, Self::Error> {
94 if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
95 return match extend_type.as_str() {
96 ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
97 ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
98 ARROW_EXT_TYPE_VARIANT => {
99 if field.is_nullable() && array.is_null(seq) {
100 return Ok(Value::Null);
101 }
102 match array.data_type() {
103 ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
104 Some(array) => Ok(Value::Variant(array.value(seq).to_string())),
105 None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
106 },
107 ArrowDataType::LargeUtf8 => {
108 match array.as_any().downcast_ref::<LargeStringArray>() {
109 Some(array) => Ok(Value::Variant(array.value(seq).to_string())),
110 None => {
111 Err(ConvertError::new("variant", format!("{array:?}")).into())
112 }
113 }
114 }
115 ArrowDataType::LargeBinary => {
116 match array.as_any().downcast_ref::<LargeBinaryArray>() {
117 Some(array) => {
118 if settings.arrow_result_version.unwrap_or_default() > 1 {
119 Ok(Value::Variant(
120 String::from_utf8_lossy(array.value(seq)).into_owned(),
121 ))
122 } else {
123 Ok(Value::Variant(
124 RawJsonb::new(array.value(seq)).to_string(),
125 ))
126 }
127 }
128 None => {
129 Err(ConvertError::new("variant", format!("{array:?}")).into())
130 }
131 }
132 }
133 _ => Err(ConvertError::new("variant", format!("{array:?}")).into()),
134 }
135 }
136 ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
137 if field.is_nullable() && array.is_null(seq) {
138 return Ok(Value::Null);
139 }
140 match array.as_any().downcast_ref::<Decimal128Array>() {
141 Some(array) => {
142 let v = array.value(seq);
143 let unix_ts = clamp_ts(v as u64 as i64);
144 let offset = (v >> 64) as i32;
145 let offset = tz::Offset::from_seconds(offset).map_err(|e| {
146 Error::Parsing(format!("invalid offset: {offset}, {e}"))
147 })?;
148 let time_zone = tz::TimeZone::fixed(offset);
149 let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
150 Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
151 })?;
152 Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
153 }
154 None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
155 }
156 }
157 ARROW_EXT_TYPE_INTERVAL => {
158 if field.is_nullable() && array.is_null(seq) {
159 return Ok(Value::Null);
160 }
161 match array.as_any().downcast_ref::<Decimal128Array>() {
162 Some(array) => {
163 let res = months_days_micros(array.value(seq));
164 Ok(Value::Interval(
165 Interval {
166 months: res.months(),
167 days: res.days(),
168 micros: res.microseconds(),
169 }
170 .to_string(),
171 ))
172 }
173 None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
174 }
175 }
176 ARROW_EXT_TYPE_BITMAP => {
177 if field.is_nullable() && array.is_null(seq) {
178 return Ok(Value::Null);
179 }
180 match array.as_any().downcast_ref::<LargeBinaryArray>() {
181 Some(array) => {
182 let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
183 .expect("failed to deserialize bitmap");
184 let raw = rb.into_iter().collect::<Vec<_>>();
185 let s = itertools::join(raw.iter(), ",");
186 Ok(Value::Bitmap(s))
187 }
188 None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
189 }
190 }
191 ARROW_EXT_TYPE_GEOMETRY => {
192 if field.is_nullable() && array.is_null(seq) {
193 return Ok(Value::Null);
194 }
195 match array.as_any().downcast_ref::<LargeBinaryArray>() {
196 Some(array) => Ok(Value::Geometry(GeoValue::from_binary(
197 array.value(seq).to_vec(),
198 settings.geometry_output_format,
199 )?)),
200 None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
201 }
202 }
203 ARROW_EXT_TYPE_GEOGRAPHY => {
204 if field.is_nullable() && array.is_null(seq) {
205 return Ok(Value::Null);
206 }
207 match array.as_any().downcast_ref::<LargeBinaryArray>() {
208 Some(array) => Ok(Value::Geography(GeoValue::from_binary(
209 array.value(seq).to_vec(),
210 settings.geometry_output_format,
211 )?)),
212 None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
213 }
214 }
215 ARROW_EXT_TYPE_VECTOR => {
216 if field.is_nullable() && array.is_null(seq) {
217 return Ok(Value::Null);
218 }
219 match field.data_type() {
220 ArrowDataType::FixedSizeList(_, dimension) => {
221 match array
222 .as_any()
223 .downcast_ref::<arrow_array::FixedSizeListArray>()
224 {
225 Some(inner_array) => {
226 match inner_array
227 .value(seq)
228 .as_any()
229 .downcast_ref::<Float32Array>()
230 {
231 Some(inner_array) => {
232 let dimension = *dimension as usize;
233 let mut values = Vec::with_capacity(dimension);
234 for i in 0..dimension {
235 let value = inner_array.value(i);
236 values.push(value);
237 }
238 Ok(Value::Vector(values))
239 }
240 None => Err(ConvertError::new(
241 "vector float32",
242 format!("{inner_array:?}"),
243 )
244 .into()),
245 }
246 }
247 None => {
248 Err(ConvertError::new("vector", format!("{array:?}")).into())
249 }
250 }
251 }
252 arrow_type => Err(ConvertError::new(
253 "vector",
254 format!("Unsupported Arrow type: {arrow_type:?}"),
255 )
256 .into()),
257 }
258 }
259 _ => Err(ConvertError::new(
260 "extension",
261 format!("Unsupported extension datatype for arrow field: {field:?}"),
262 )
263 .into()),
264 };
265 }
266
267 if field.is_nullable() && array.is_null(seq) {
268 return Ok(Value::Null);
269 }
270 match field.data_type() {
271 ArrowDataType::Null => Ok(Value::Null),
272 ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
273 Some(array) => Ok(Value::Boolean(array.value(seq))),
274 None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
275 },
276 ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
277 Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
278 None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
279 },
280 ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
281 Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
282 None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
283 },
284 ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
285 Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
286 None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
287 },
288 ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
289 Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
290 None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
291 },
292 ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
293 Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
294 None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
295 },
296 ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
297 Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
298 None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
299 },
300 ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
301 Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
302 None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
303 },
304 ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
305 Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
306 None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
307 },
308 ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
309 Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
310 None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
311 },
312 ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
313 Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
314 None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
315 },
316 ArrowDataType::Decimal64(p, s) => {
317 match array.as_any().downcast_ref::<Decimal64Array>() {
318 Some(array) => Ok(Value::Number(NumberValue::Decimal64(
319 array.value(seq),
320 DecimalSize {
321 precision: *p,
322 scale: *s as u8,
323 },
324 ))),
325 None => Err(ConvertError::new("Decimal64", format!("{array:?}")).into()),
326 }
327 }
328 ArrowDataType::Decimal128(p, s) => {
329 match array.as_any().downcast_ref::<Decimal128Array>() {
330 Some(array) => Ok(Value::Number(NumberValue::Decimal128(
331 array.value(seq),
332 DecimalSize {
333 precision: *p,
334 scale: *s as u8,
335 },
336 ))),
337 None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
338 }
339 }
340 ArrowDataType::Decimal256(p, s) => {
341 match array.as_any().downcast_ref::<Decimal256Array>() {
342 Some(array) => {
343 let v = array.value(seq);
344 let v = i256::from_le_bytes(v.to_le_bytes());
345 Ok(Value::Number(NumberValue::Decimal256(
346 v,
347 DecimalSize {
348 precision: *p,
349 scale: *s as u8,
350 },
351 )))
352 }
353 None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
354 }
355 }
356
357 ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
358 Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
359 None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
360 },
361 ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
362 match array.as_any().downcast_ref::<LargeBinaryArray>() {
363 Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
364 None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
365 }
366 }
367 ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
368 Some(array) => Ok(Value::String(array.value(seq).to_string())),
369 None => Err(ConvertError::new("string", format!("{array:?}")).into()),
370 },
371 ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
372 Some(array) => Ok(Value::String(array.value(seq).to_string())),
373 None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
374 },
375 ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
376 Some(array) => Ok(Value::String(array.value(seq).to_string())),
377 None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
378 },
379 ArrowDataType::Timestamp(unit, tz) => {
381 match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
382 Some(array) => {
383 if unit != &TimeUnit::Microsecond {
384 return Err(ConvertError::new("timestamp", format!("{array:?}"))
385 .with_message(format!(
386 "unsupported timestamp unit: {unit:?}, only support microsecond"
387 ))
388 .into());
389 }
390 let ts = clamp_ts(array.value(seq));
391 match tz {
392 None => {
393 let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
394 Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
395 })?;
396 let dt = timestamp.to_zoned(settings.timezone.clone());
397 Ok(Value::Timestamp(dt))
398 }
399 Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
400 .with_message(format!("non-UTC timezone not supported: {tz:?}"))
401 .into()),
402 }
403 }
404 None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
405 }
406 }
407 ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
408 Some(array) => Ok(Value::Date(array.value(seq))),
409 None => Err(ConvertError::new("date", format!("{array:?}")).into()),
410 },
411 ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
412 Some(array) => {
413 let inner_array = unsafe { array.value_unchecked(seq) };
414 let mut values = Vec::with_capacity(inner_array.len());
415 for i in 0..inner_array.len() {
416 let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
417 values.push(value);
418 }
419 Ok(Value::Array(values))
420 }
421 None => Err(ConvertError::new("list", format!("{array:?}")).into()),
422 },
423 ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
424 Some(array) => {
425 let inner_array = unsafe { array.value_unchecked(seq) };
426 let mut values = Vec::with_capacity(inner_array.len());
427 for i in 0..inner_array.len() {
428 let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
429 values.push(value);
430 }
431 Ok(Value::Array(values))
432 }
433 None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
434 },
435 ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
436 Some(array) => {
437 if let ArrowDataType::Struct(fs) = f.data_type() {
438 let inner_array = unsafe { array.value_unchecked(seq) };
439 let mut values = Vec::with_capacity(inner_array.len());
440 for i in 0..inner_array.len() {
441 let key = Value::try_from((
442 fs[0].as_ref(),
443 inner_array.column(0),
444 i,
445 settings,
446 ))?;
447 let val = Value::try_from((
448 fs[1].as_ref(),
449 inner_array.column(1),
450 i,
451 settings,
452 ))?;
453 values.push((key, val));
454 }
455 Ok(Value::Map(values))
456 } else {
457 Err(
458 ConvertError::new("invalid map inner type", format!("{array:?}"))
459 .into(),
460 )
461 }
462 }
463 None => Err(ConvertError::new("map", format!("{array:?}")).into()),
464 },
465 ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
466 Some(array) => {
467 let mut values = Vec::with_capacity(array.len());
468 for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
469 let value = Value::try_from((f.as_ref(), inner_array, seq, settings))?;
470 values.push(value);
471 }
472 Ok(Value::Tuple(values))
473 }
474 None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
475 },
476 _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
477 }
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use arrow_array::ArrayRef;
485 use std::collections::HashMap;
486
487 fn variant_field(data_type: ArrowDataType) -> ArrowField {
488 ArrowField::new("v", data_type, false).with_metadata(HashMap::from([(
489 EXTENSION_KEY.to_string(),
490 ARROW_EXT_TYPE_VARIANT.to_string(),
491 )]))
492 }
493
494 #[test]
495 fn decode_variant_from_utf8_array() {
496 let field = variant_field(ArrowDataType::Utf8);
497 let array: ArrayRef = Arc::new(StringArray::from(vec!["{\"a\":1}"]));
498
499 let value = Value::try_from((&field, &array, 0, &ResultFormatSettings::default())).unwrap();
500
501 assert_eq!(value, Value::Variant("{\"a\":1}".to_string()));
502 }
503
504 #[test]
505 fn decode_variant_from_large_utf8_array() {
506 let field = variant_field(ArrowDataType::LargeUtf8);
507 let array: ArrayRef = Arc::new(LargeStringArray::from(vec!["{\"b\":2}"]));
508
509 let value = Value::try_from((&field, &array, 0, &ResultFormatSettings::default())).unwrap();
510
511 assert_eq!(value, Value::Variant("{\"b\":2}".to_string()));
512 }
513
514 #[test]
515 fn decode_variant_from_large_binary_array_for_v2() {
516 let field = variant_field(ArrowDataType::LargeBinary);
517 let array: ArrayRef = Arc::new(LargeBinaryArray::from(vec![b"{\"c\":3}".as_slice()]));
518 let settings = ResultFormatSettings {
519 arrow_result_version: Some(2),
520 ..ResultFormatSettings::default()
521 };
522
523 let value = Value::try_from((&field, &array, 0, &settings)).unwrap();
524
525 assert_eq!(value, Value::Variant("{\"c\":3}".to_string()));
526 }
527}