databend_driver_core/value/
arrow_decoder.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use super::{Interval, NumberValue, Value};
18use crate::error::{ConvertError, Error};
19use crate::value::geo::convert_geometry;
20use arrow_array::{
21    Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Decimal256Array,
22    Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
23    LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StringViewArray,
24    StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
25};
26use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
27use databend_client::schema::{
28    DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
29    ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
30    ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
31    EXTENSION_KEY,
32};
33use databend_client::ResultFormatSettings;
34use ethnum::i256;
35use jiff::{tz, Timestamp};
36use jsonb::RawJsonb;
37
38/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
39#[allow(non_camel_case_types)]
40#[repr(C)]
41struct months_days_micros(pub i128);
42
43/// Mask for extracting the lower 64 bits (microseconds).
44const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
45/// Mask for extracting the middle 32 bits (days or months).
46const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
47
48// 9999-12-30T22:00:00Z
49// note: max for jiff is 253402207200999999, 253402207200000000 if for compatible with old databend-query
50const TIMESTAMP_MAX: i64 = 253402207200000000;
51// -009999-01-02T01:59:59Z
52const TIMESTAMP_MIN: i64 = -377705023201000000;
53
54// required by jiff
55fn clamp_ts(ts: i64) -> i64 {
56    ts.clamp(TIMESTAMP_MIN, TIMESTAMP_MAX)
57}
58
59impl months_days_micros {
60    #[inline]
61    pub fn months(&self) -> i32 {
62        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
63    }
64
65    #[inline]
66    pub fn days(&self) -> i32 {
67        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
68    }
69
70    #[inline]
71    pub fn microseconds(&self) -> i64 {
72        (self.0 & MICROS_MASK) as i64
73    }
74}
75
76impl
77    TryFrom<(
78        &ArrowField,
79        &Arc<dyn ArrowArray>,
80        usize,
81        &ResultFormatSettings,
82    )> for Value
83{
84    type Error = Error;
85    fn try_from(
86        (field, array, seq, settings): (
87            &ArrowField,
88            &Arc<dyn ArrowArray>,
89            usize,
90            &ResultFormatSettings,
91        ),
92    ) -> std::result::Result<Self, Self::Error> {
93        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
94            return match extend_type.as_str() {
95                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
96                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
97                ARROW_EXT_TYPE_VARIANT => {
98                    if field.is_nullable() && array.is_null(seq) {
99                        return Ok(Value::Null);
100                    }
101                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
102                        Some(array) => {
103                            Ok(Value::Variant(RawJsonb::new(array.value(seq)).to_string()))
104                        }
105                        None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
106                    }
107                }
108                ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
109                    if field.is_nullable() && array.is_null(seq) {
110                        return Ok(Value::Null);
111                    }
112                    match array.as_any().downcast_ref::<Decimal128Array>() {
113                        Some(array) => {
114                            let v = array.value(seq);
115                            let unix_ts = clamp_ts(v as u64 as i64);
116                            let offset = (v >> 64) as i32;
117                            let offset = tz::Offset::from_seconds(offset).map_err(|e| {
118                                Error::Parsing(format!("invalid offset: {offset}, {e}"))
119                            })?;
120                            let time_zone = tz::TimeZone::fixed(offset);
121                            let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
122                                Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
123                            })?;
124                            Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
125                        }
126                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
127                    }
128                }
129                ARROW_EXT_TYPE_INTERVAL => {
130                    if field.is_nullable() && array.is_null(seq) {
131                        return Ok(Value::Null);
132                    }
133                    match array.as_any().downcast_ref::<Decimal128Array>() {
134                        Some(array) => {
135                            let res = months_days_micros(array.value(seq));
136                            Ok(Value::Interval(
137                                Interval {
138                                    months: res.months(),
139                                    days: res.days(),
140                                    micros: res.microseconds(),
141                                }
142                                .to_string(),
143                            ))
144                        }
145                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
146                    }
147                }
148                ARROW_EXT_TYPE_BITMAP => {
149                    if field.is_nullable() && array.is_null(seq) {
150                        return Ok(Value::Null);
151                    }
152                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
153                        Some(array) => {
154                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
155                                .expect("failed to deserialize bitmap");
156                            let raw = rb.into_iter().collect::<Vec<_>>();
157                            let s = itertools::join(raw.iter(), ",");
158                            Ok(Value::Bitmap(s))
159                        }
160                        None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
161                    }
162                }
163                ARROW_EXT_TYPE_GEOMETRY => {
164                    if field.is_nullable() && array.is_null(seq) {
165                        return Ok(Value::Null);
166                    }
167                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
168                        Some(array) => {
169                            let value = convert_geometry(
170                                array.value(seq),
171                                settings.geometry_output_format,
172                            )?;
173                            Ok(Value::Geometry(value))
174                        }
175                        None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
176                    }
177                }
178                ARROW_EXT_TYPE_GEOGRAPHY => {
179                    if field.is_nullable() && array.is_null(seq) {
180                        return Ok(Value::Null);
181                    }
182                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
183                        Some(array) => {
184                            let value = convert_geometry(
185                                array.value(seq),
186                                settings.geometry_output_format,
187                            )?;
188                            Ok(Value::Geography(value))
189                        }
190                        None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
191                    }
192                }
193                ARROW_EXT_TYPE_VECTOR => {
194                    if field.is_nullable() && array.is_null(seq) {
195                        return Ok(Value::Null);
196                    }
197                    match field.data_type() {
198                        ArrowDataType::FixedSizeList(_, dimension) => {
199                            match array
200                                .as_any()
201                                .downcast_ref::<arrow_array::FixedSizeListArray>()
202                            {
203                                Some(inner_array) => {
204                                    match inner_array
205                                        .value(seq)
206                                        .as_any()
207                                        .downcast_ref::<Float32Array>()
208                                    {
209                                        Some(inner_array) => {
210                                            let dimension = *dimension as usize;
211                                            let mut values = Vec::with_capacity(dimension);
212                                            for i in 0..dimension {
213                                                let value = inner_array.value(i);
214                                                values.push(value);
215                                            }
216                                            Ok(Value::Vector(values))
217                                        }
218                                        None => Err(ConvertError::new(
219                                            "vector float32",
220                                            format!("{inner_array:?}"),
221                                        )
222                                        .into()),
223                                    }
224                                }
225                                None => {
226                                    Err(ConvertError::new("vector", format!("{array:?}")).into())
227                                }
228                            }
229                        }
230                        arrow_type => Err(ConvertError::new(
231                            "vector",
232                            format!("Unsupported Arrow type: {arrow_type:?}"),
233                        )
234                        .into()),
235                    }
236                }
237                _ => Err(ConvertError::new(
238                    "extension",
239                    format!("Unsupported extension datatype for arrow field: {field:?}"),
240                )
241                .into()),
242            };
243        }
244
245        if field.is_nullable() && array.is_null(seq) {
246            return Ok(Value::Null);
247        }
248        match field.data_type() {
249            ArrowDataType::Null => Ok(Value::Null),
250            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
251                Some(array) => Ok(Value::Boolean(array.value(seq))),
252                None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
253            },
254            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
255                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
256                None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
257            },
258            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
259                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
260                None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
261            },
262            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
263                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
264                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
265            },
266            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
267                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
268                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
269            },
270            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
271                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
272                None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
273            },
274            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
275                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
276                None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
277            },
278            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
279                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
280                None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
281            },
282            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
283                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
284                None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
285            },
286            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
287                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
288                None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
289            },
290            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
291                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
292                None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
293            },
294
295            ArrowDataType::Decimal128(p, s) => {
296                match array.as_any().downcast_ref::<Decimal128Array>() {
297                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
298                        array.value(seq),
299                        DecimalSize {
300                            precision: *p,
301                            scale: *s as u8,
302                        },
303                    ))),
304                    None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
305                }
306            }
307            ArrowDataType::Decimal256(p, s) => {
308                match array.as_any().downcast_ref::<Decimal256Array>() {
309                    Some(array) => {
310                        let v = array.value(seq);
311                        let v = i256::from_le_bytes(v.to_le_bytes());
312                        Ok(Value::Number(NumberValue::Decimal256(
313                            v,
314                            DecimalSize {
315                                precision: *p,
316                                scale: *s as u8,
317                            },
318                        )))
319                    }
320                    None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
321                }
322            }
323
324            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
325                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
326                None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
327            },
328            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
329                match array.as_any().downcast_ref::<LargeBinaryArray>() {
330                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
331                    None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
332                }
333            }
334            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
335                Some(array) => Ok(Value::String(array.value(seq).to_string())),
336                None => Err(ConvertError::new("string", format!("{array:?}")).into()),
337            },
338            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
339                Some(array) => Ok(Value::String(array.value(seq).to_string())),
340                None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
341            },
342            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
343                Some(array) => Ok(Value::String(array.value(seq).to_string())),
344                None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
345            },
346            // we only support timestamp in microsecond in databend
347            ArrowDataType::Timestamp(unit, tz) => {
348                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
349                    Some(array) => {
350                        if unit != &TimeUnit::Microsecond {
351                            return Err(ConvertError::new("timestamp", format!("{array:?}"))
352                                .with_message(format!(
353                                    "unsupported timestamp unit: {unit:?}, only support microsecond"
354                                ))
355                                .into());
356                        }
357                        let ts = clamp_ts(array.value(seq));
358                        match tz {
359                            None => {
360                                let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
361                                    Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
362                                })?;
363                                let dt = timestamp.to_zoned(settings.timezone.clone());
364                                Ok(Value::Timestamp(dt))
365                            }
366                            Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
367                                .with_message(format!("non-UTC timezone not supported: {tz:?}"))
368                                .into()),
369                        }
370                    }
371                    None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
372                }
373            }
374            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
375                Some(array) => Ok(Value::Date(array.value(seq))),
376                None => Err(ConvertError::new("date", format!("{array:?}")).into()),
377            },
378            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
379                Some(array) => {
380                    let inner_array = unsafe { array.value_unchecked(seq) };
381                    let mut values = Vec::with_capacity(inner_array.len());
382                    for i in 0..inner_array.len() {
383                        let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
384                        values.push(value);
385                    }
386                    Ok(Value::Array(values))
387                }
388                None => Err(ConvertError::new("list", format!("{array:?}")).into()),
389            },
390            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
391                Some(array) => {
392                    let inner_array = unsafe { array.value_unchecked(seq) };
393                    let mut values = Vec::with_capacity(inner_array.len());
394                    for i in 0..inner_array.len() {
395                        let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
396                        values.push(value);
397                    }
398                    Ok(Value::Array(values))
399                }
400                None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
401            },
402            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
403                Some(array) => {
404                    if let ArrowDataType::Struct(fs) = f.data_type() {
405                        let inner_array = unsafe { array.value_unchecked(seq) };
406                        let mut values = Vec::with_capacity(inner_array.len());
407                        for i in 0..inner_array.len() {
408                            let key = Value::try_from((
409                                fs[0].as_ref(),
410                                inner_array.column(0),
411                                i,
412                                settings,
413                            ))?;
414                            let val = Value::try_from((
415                                fs[1].as_ref(),
416                                inner_array.column(1),
417                                i,
418                                settings,
419                            ))?;
420                            values.push((key, val));
421                        }
422                        Ok(Value::Map(values))
423                    } else {
424                        Err(
425                            ConvertError::new("invalid map inner type", format!("{array:?}"))
426                                .into(),
427                        )
428                    }
429                }
430                None => Err(ConvertError::new("map", format!("{array:?}")).into()),
431            },
432            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
433                Some(array) => {
434                    let mut values = Vec::with_capacity(array.len());
435                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
436                        let value = Value::try_from((f.as_ref(), inner_array, seq, settings))?;
437                        values.push(value);
438                    }
439                    Ok(Value::Tuple(values))
440                }
441                None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
442            },
443            _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
444        }
445    }
446}