databend_driver_core/value/
arrow_decoder.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use super::{Interval, NumberValue, Value};
18use crate::error::{ConvertError, Error};
19use crate::value::geo::convert_geometry;
20use arrow_array::{
21    Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Decimal256Array,
22    Decimal64Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
23    LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
24    StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array,
25    UInt8Array,
26};
27use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
28use databend_client::schema::{
29    DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
30    ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
31    ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
32    EXTENSION_KEY,
33};
34use databend_client::ResultFormatSettings;
35use ethnum::i256;
36use jiff::{tz, Timestamp};
37use jsonb::RawJsonb;
38
39/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
40#[allow(non_camel_case_types)]
41#[repr(C)]
42struct months_days_micros(pub i128);
43
44/// Mask for extracting the lower 64 bits (microseconds).
45const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
46/// Mask for extracting the middle 32 bits (days or months).
47const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
48
49// 9999-12-30T22:00:00Z
50// note: max for jiff is 253402207200999999, 253402207200000000 if for compatible with old databend-query
51const TIMESTAMP_MAX: i64 = 253402207200000000;
52// -009999-01-02T01:59:59Z
53const TIMESTAMP_MIN: i64 = -377705023201000000;
54
55// required by jiff
56fn clamp_ts(ts: i64) -> i64 {
57    ts.clamp(TIMESTAMP_MIN, TIMESTAMP_MAX)
58}
59
60impl months_days_micros {
61    #[inline]
62    pub fn months(&self) -> i32 {
63        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
64    }
65
66    #[inline]
67    pub fn days(&self) -> i32 {
68        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
69    }
70
71    #[inline]
72    pub fn microseconds(&self) -> i64 {
73        (self.0 & MICROS_MASK) as i64
74    }
75}
76
77impl
78    TryFrom<(
79        &ArrowField,
80        &Arc<dyn ArrowArray>,
81        usize,
82        &ResultFormatSettings,
83    )> for Value
84{
85    type Error = Error;
86    fn try_from(
87        (field, array, seq, settings): (
88            &ArrowField,
89            &Arc<dyn ArrowArray>,
90            usize,
91            &ResultFormatSettings,
92        ),
93    ) -> std::result::Result<Self, Self::Error> {
94        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
95            return match extend_type.as_str() {
96                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
97                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
98                ARROW_EXT_TYPE_VARIANT => {
99                    if field.is_nullable() && array.is_null(seq) {
100                        return Ok(Value::Null);
101                    }
102                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
103                        Some(array) => {
104                            Ok(Value::Variant(RawJsonb::new(array.value(seq)).to_string()))
105                        }
106                        None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
107                    }
108                }
109                ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
110                    if field.is_nullable() && array.is_null(seq) {
111                        return Ok(Value::Null);
112                    }
113                    match array.as_any().downcast_ref::<Decimal128Array>() {
114                        Some(array) => {
115                            let v = array.value(seq);
116                            let unix_ts = clamp_ts(v as u64 as i64);
117                            let offset = (v >> 64) as i32;
118                            let offset = tz::Offset::from_seconds(offset).map_err(|e| {
119                                Error::Parsing(format!("invalid offset: {offset}, {e}"))
120                            })?;
121                            let time_zone = tz::TimeZone::fixed(offset);
122                            let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
123                                Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
124                            })?;
125                            Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
126                        }
127                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
128                    }
129                }
130                ARROW_EXT_TYPE_INTERVAL => {
131                    if field.is_nullable() && array.is_null(seq) {
132                        return Ok(Value::Null);
133                    }
134                    match array.as_any().downcast_ref::<Decimal128Array>() {
135                        Some(array) => {
136                            let res = months_days_micros(array.value(seq));
137                            Ok(Value::Interval(
138                                Interval {
139                                    months: res.months(),
140                                    days: res.days(),
141                                    micros: res.microseconds(),
142                                }
143                                .to_string(),
144                            ))
145                        }
146                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
147                    }
148                }
149                ARROW_EXT_TYPE_BITMAP => {
150                    if field.is_nullable() && array.is_null(seq) {
151                        return Ok(Value::Null);
152                    }
153                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
154                        Some(array) => {
155                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
156                                .expect("failed to deserialize bitmap");
157                            let raw = rb.into_iter().collect::<Vec<_>>();
158                            let s = itertools::join(raw.iter(), ",");
159                            Ok(Value::Bitmap(s))
160                        }
161                        None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
162                    }
163                }
164                ARROW_EXT_TYPE_GEOMETRY => {
165                    if field.is_nullable() && array.is_null(seq) {
166                        return Ok(Value::Null);
167                    }
168                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
169                        Some(array) => {
170                            let value = convert_geometry(
171                                array.value(seq),
172                                settings.geometry_output_format,
173                            )?;
174                            Ok(Value::Geometry(value))
175                        }
176                        None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
177                    }
178                }
179                ARROW_EXT_TYPE_GEOGRAPHY => {
180                    if field.is_nullable() && array.is_null(seq) {
181                        return Ok(Value::Null);
182                    }
183                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
184                        Some(array) => {
185                            let value = convert_geometry(
186                                array.value(seq),
187                                settings.geometry_output_format,
188                            )?;
189                            Ok(Value::Geography(value))
190                        }
191                        None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
192                    }
193                }
194                ARROW_EXT_TYPE_VECTOR => {
195                    if field.is_nullable() && array.is_null(seq) {
196                        return Ok(Value::Null);
197                    }
198                    match field.data_type() {
199                        ArrowDataType::FixedSizeList(_, dimension) => {
200                            match array
201                                .as_any()
202                                .downcast_ref::<arrow_array::FixedSizeListArray>()
203                            {
204                                Some(inner_array) => {
205                                    match inner_array
206                                        .value(seq)
207                                        .as_any()
208                                        .downcast_ref::<Float32Array>()
209                                    {
210                                        Some(inner_array) => {
211                                            let dimension = *dimension as usize;
212                                            let mut values = Vec::with_capacity(dimension);
213                                            for i in 0..dimension {
214                                                let value = inner_array.value(i);
215                                                values.push(value);
216                                            }
217                                            Ok(Value::Vector(values))
218                                        }
219                                        None => Err(ConvertError::new(
220                                            "vector float32",
221                                            format!("{inner_array:?}"),
222                                        )
223                                        .into()),
224                                    }
225                                }
226                                None => {
227                                    Err(ConvertError::new("vector", format!("{array:?}")).into())
228                                }
229                            }
230                        }
231                        arrow_type => Err(ConvertError::new(
232                            "vector",
233                            format!("Unsupported Arrow type: {arrow_type:?}"),
234                        )
235                        .into()),
236                    }
237                }
238                _ => Err(ConvertError::new(
239                    "extension",
240                    format!("Unsupported extension datatype for arrow field: {field:?}"),
241                )
242                .into()),
243            };
244        }
245
246        if field.is_nullable() && array.is_null(seq) {
247            return Ok(Value::Null);
248        }
249        match field.data_type() {
250            ArrowDataType::Null => Ok(Value::Null),
251            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
252                Some(array) => Ok(Value::Boolean(array.value(seq))),
253                None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
254            },
255            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
256                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
257                None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
258            },
259            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
260                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
261                None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
262            },
263            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
264                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
265                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
266            },
267            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
268                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
269                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
270            },
271            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
272                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
273                None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
274            },
275            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
276                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
277                None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
278            },
279            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
280                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
281                None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
282            },
283            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
284                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
285                None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
286            },
287            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
288                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
289                None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
290            },
291            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
292                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
293                None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
294            },
295            ArrowDataType::Decimal64(p, s) => {
296                match array.as_any().downcast_ref::<Decimal64Array>() {
297                    Some(array) => Ok(Value::Number(NumberValue::Decimal64(
298                        array.value(seq),
299                        DecimalSize {
300                            precision: *p,
301                            scale: *s as u8,
302                        },
303                    ))),
304                    None => Err(ConvertError::new("Decimal64", format!("{array:?}")).into()),
305                }
306            }
307            ArrowDataType::Decimal128(p, s) => {
308                match array.as_any().downcast_ref::<Decimal128Array>() {
309                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
310                        array.value(seq),
311                        DecimalSize {
312                            precision: *p,
313                            scale: *s as u8,
314                        },
315                    ))),
316                    None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
317                }
318            }
319            ArrowDataType::Decimal256(p, s) => {
320                match array.as_any().downcast_ref::<Decimal256Array>() {
321                    Some(array) => {
322                        let v = array.value(seq);
323                        let v = i256::from_le_bytes(v.to_le_bytes());
324                        Ok(Value::Number(NumberValue::Decimal256(
325                            v,
326                            DecimalSize {
327                                precision: *p,
328                                scale: *s as u8,
329                            },
330                        )))
331                    }
332                    None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
333                }
334            }
335
336            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
337                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
338                None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
339            },
340            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
341                match array.as_any().downcast_ref::<LargeBinaryArray>() {
342                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
343                    None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
344                }
345            }
346            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
347                Some(array) => Ok(Value::String(array.value(seq).to_string())),
348                None => Err(ConvertError::new("string", format!("{array:?}")).into()),
349            },
350            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
351                Some(array) => Ok(Value::String(array.value(seq).to_string())),
352                None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
353            },
354            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
355                Some(array) => Ok(Value::String(array.value(seq).to_string())),
356                None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
357            },
358            // we only support timestamp in microsecond in databend
359            ArrowDataType::Timestamp(unit, tz) => {
360                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
361                    Some(array) => {
362                        if unit != &TimeUnit::Microsecond {
363                            return Err(ConvertError::new("timestamp", format!("{array:?}"))
364                                .with_message(format!(
365                                    "unsupported timestamp unit: {unit:?}, only support microsecond"
366                                ))
367                                .into());
368                        }
369                        let ts = clamp_ts(array.value(seq));
370                        match tz {
371                            None => {
372                                let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
373                                    Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
374                                })?;
375                                let dt = timestamp.to_zoned(settings.timezone.clone());
376                                Ok(Value::Timestamp(dt))
377                            }
378                            Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
379                                .with_message(format!("non-UTC timezone not supported: {tz:?}"))
380                                .into()),
381                        }
382                    }
383                    None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
384                }
385            }
386            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
387                Some(array) => Ok(Value::Date(array.value(seq))),
388                None => Err(ConvertError::new("date", format!("{array:?}")).into()),
389            },
390            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
391                Some(array) => {
392                    let inner_array = unsafe { array.value_unchecked(seq) };
393                    let mut values = Vec::with_capacity(inner_array.len());
394                    for i in 0..inner_array.len() {
395                        let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
396                        values.push(value);
397                    }
398                    Ok(Value::Array(values))
399                }
400                None => Err(ConvertError::new("list", format!("{array:?}")).into()),
401            },
402            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
403                Some(array) => {
404                    let inner_array = unsafe { array.value_unchecked(seq) };
405                    let mut values = Vec::with_capacity(inner_array.len());
406                    for i in 0..inner_array.len() {
407                        let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
408                        values.push(value);
409                    }
410                    Ok(Value::Array(values))
411                }
412                None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
413            },
414            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
415                Some(array) => {
416                    if let ArrowDataType::Struct(fs) = f.data_type() {
417                        let inner_array = unsafe { array.value_unchecked(seq) };
418                        let mut values = Vec::with_capacity(inner_array.len());
419                        for i in 0..inner_array.len() {
420                            let key = Value::try_from((
421                                fs[0].as_ref(),
422                                inner_array.column(0),
423                                i,
424                                settings,
425                            ))?;
426                            let val = Value::try_from((
427                                fs[1].as_ref(),
428                                inner_array.column(1),
429                                i,
430                                settings,
431                            ))?;
432                            values.push((key, val));
433                        }
434                        Ok(Value::Map(values))
435                    } else {
436                        Err(
437                            ConvertError::new("invalid map inner type", format!("{array:?}"))
438                                .into(),
439                        )
440                    }
441                }
442                None => Err(ConvertError::new("map", format!("{array:?}")).into()),
443            },
444            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
445                Some(array) => {
446                    let mut values = Vec::with_capacity(array.len());
447                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
448                        let value = Value::try_from((f.as_ref(), inner_array, seq, settings))?;
449                        values.push(value);
450                    }
451                    Ok(Value::Tuple(values))
452                }
453                None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
454            },
455            _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
456        }
457    }
458}