Skip to main content

databend_driver_core/value/
arrow_decoder.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use super::{Interval, NumberValue, Value};
18use crate::error::{ConvertError, Error};
19use crate::value::base::GeoValue;
20use arrow_array::{
21    Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Decimal256Array,
22    Decimal64Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
23    LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
24    StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array, UInt64Array,
25    UInt8Array,
26};
27use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit};
28use databend_client::schema::{
29    DecimalSize, ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
30    ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
31    ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE, ARROW_EXT_TYPE_VARIANT, ARROW_EXT_TYPE_VECTOR,
32    EXTENSION_KEY,
33};
34use databend_client::ResultFormatSettings;
35use ethnum::i256;
36use jiff::{tz, Timestamp};
37use jsonb::RawJsonb;
38
39/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
40#[allow(non_camel_case_types)]
41#[repr(C)]
42struct months_days_micros(pub i128);
43
44/// Mask for extracting the lower 64 bits (microseconds).
45const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
46/// Mask for extracting the middle 32 bits (days or months).
47const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
48
49// 9999-12-30T22:00:00Z
50// note: max for jiff is 253402207200999999, 253402207200000000 if for compatible with old databend-query
51const TIMESTAMP_MAX: i64 = 253402207200000000;
52// -009999-01-02T01:59:59Z
53const TIMESTAMP_MIN: i64 = -377705023201000000;
54
55// required by jiff
56fn clamp_ts(ts: i64) -> i64 {
57    ts.clamp(TIMESTAMP_MIN, TIMESTAMP_MAX)
58}
59
60impl months_days_micros {
61    #[inline]
62    pub fn months(&self) -> i32 {
63        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
64    }
65
66    #[inline]
67    pub fn days(&self) -> i32 {
68        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
69    }
70
71    #[inline]
72    pub fn microseconds(&self) -> i64 {
73        (self.0 & MICROS_MASK) as i64
74    }
75}
76
77impl
78    TryFrom<(
79        &ArrowField,
80        &Arc<dyn ArrowArray>,
81        usize,
82        &ResultFormatSettings,
83    )> for Value
84{
85    type Error = Error;
86    fn try_from(
87        (field, array, seq, settings): (
88            &ArrowField,
89            &Arc<dyn ArrowArray>,
90            usize,
91            &ResultFormatSettings,
92        ),
93    ) -> std::result::Result<Self, Self::Error> {
94        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
95            return match extend_type.as_str() {
96                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
97                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
98                ARROW_EXT_TYPE_VARIANT => {
99                    if field.is_nullable() && array.is_null(seq) {
100                        return Ok(Value::Null);
101                    }
102                    match array.data_type() {
103                        ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
104                            Some(array) => Ok(Value::Variant(array.value(seq).to_string())),
105                            None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
106                        },
107                        ArrowDataType::LargeUtf8 => {
108                            match array.as_any().downcast_ref::<LargeStringArray>() {
109                                Some(array) => Ok(Value::Variant(array.value(seq).to_string())),
110                                None => {
111                                    Err(ConvertError::new("variant", format!("{array:?}")).into())
112                                }
113                            }
114                        }
115                        ArrowDataType::LargeBinary => {
116                            match array.as_any().downcast_ref::<LargeBinaryArray>() {
117                                Some(array) => {
118                                    if settings.arrow_result_version.unwrap_or_default() > 1 {
119                                        Ok(Value::Variant(
120                                            String::from_utf8_lossy(array.value(seq)).into_owned(),
121                                        ))
122                                    } else {
123                                        Ok(Value::Variant(
124                                            RawJsonb::new(array.value(seq)).to_string(),
125                                        ))
126                                    }
127                                }
128                                None => {
129                                    Err(ConvertError::new("variant", format!("{array:?}")).into())
130                                }
131                            }
132                        }
133                        _ => Err(ConvertError::new("variant", format!("{array:?}")).into()),
134                    }
135                }
136                ARROW_EXT_TYPE_TIMESTAMP_TIMEZONE => {
137                    if field.is_nullable() && array.is_null(seq) {
138                        return Ok(Value::Null);
139                    }
140                    match array.as_any().downcast_ref::<Decimal128Array>() {
141                        Some(array) => {
142                            let v = array.value(seq);
143                            let unix_ts = clamp_ts(v as u64 as i64);
144                            let offset = (v >> 64) as i32;
145                            let offset = tz::Offset::from_seconds(offset).map_err(|e| {
146                                Error::Parsing(format!("invalid offset: {offset}, {e}"))
147                            })?;
148                            let time_zone = tz::TimeZone::fixed(offset);
149                            let timestamp = Timestamp::from_microsecond(unix_ts).map_err(|e| {
150                                Error::Parsing(format!("Invalid timestamp_micros {unix_ts}: {e}"))
151                            })?;
152                            Ok(Value::TimestampTz(timestamp.to_zoned(time_zone)))
153                        }
154                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
155                    }
156                }
157                ARROW_EXT_TYPE_INTERVAL => {
158                    if field.is_nullable() && array.is_null(seq) {
159                        return Ok(Value::Null);
160                    }
161                    match array.as_any().downcast_ref::<Decimal128Array>() {
162                        Some(array) => {
163                            let res = months_days_micros(array.value(seq));
164                            Ok(Value::Interval(
165                                Interval {
166                                    months: res.months(),
167                                    days: res.days(),
168                                    micros: res.microseconds(),
169                                }
170                                .to_string(),
171                            ))
172                        }
173                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
174                    }
175                }
176                ARROW_EXT_TYPE_BITMAP => {
177                    if field.is_nullable() && array.is_null(seq) {
178                        return Ok(Value::Null);
179                    }
180                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
181                        Some(array) => {
182                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
183                                .expect("failed to deserialize bitmap");
184                            let raw = rb.into_iter().collect::<Vec<_>>();
185                            let s = itertools::join(raw.iter(), ",");
186                            Ok(Value::Bitmap(s))
187                        }
188                        None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
189                    }
190                }
191                ARROW_EXT_TYPE_GEOMETRY => {
192                    if field.is_nullable() && array.is_null(seq) {
193                        return Ok(Value::Null);
194                    }
195                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
196                        Some(array) => Ok(Value::Geometry(GeoValue::from_binary(
197                            array.value(seq).to_vec(),
198                            settings.geometry_output_format,
199                        )?)),
200                        None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
201                    }
202                }
203                ARROW_EXT_TYPE_GEOGRAPHY => {
204                    if field.is_nullable() && array.is_null(seq) {
205                        return Ok(Value::Null);
206                    }
207                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
208                        Some(array) => Ok(Value::Geography(GeoValue::from_binary(
209                            array.value(seq).to_vec(),
210                            settings.geometry_output_format,
211                        )?)),
212                        None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
213                    }
214                }
215                ARROW_EXT_TYPE_VECTOR => {
216                    if field.is_nullable() && array.is_null(seq) {
217                        return Ok(Value::Null);
218                    }
219                    match field.data_type() {
220                        ArrowDataType::FixedSizeList(_, dimension) => {
221                            match array
222                                .as_any()
223                                .downcast_ref::<arrow_array::FixedSizeListArray>()
224                            {
225                                Some(inner_array) => {
226                                    match inner_array
227                                        .value(seq)
228                                        .as_any()
229                                        .downcast_ref::<Float32Array>()
230                                    {
231                                        Some(inner_array) => {
232                                            let dimension = *dimension as usize;
233                                            let mut values = Vec::with_capacity(dimension);
234                                            for i in 0..dimension {
235                                                let value = inner_array.value(i);
236                                                values.push(value);
237                                            }
238                                            Ok(Value::Vector(values))
239                                        }
240                                        None => Err(ConvertError::new(
241                                            "vector float32",
242                                            format!("{inner_array:?}"),
243                                        )
244                                        .into()),
245                                    }
246                                }
247                                None => {
248                                    Err(ConvertError::new("vector", format!("{array:?}")).into())
249                                }
250                            }
251                        }
252                        arrow_type => Err(ConvertError::new(
253                            "vector",
254                            format!("Unsupported Arrow type: {arrow_type:?}"),
255                        )
256                        .into()),
257                    }
258                }
259                _ => Err(ConvertError::new(
260                    "extension",
261                    format!("Unsupported extension datatype for arrow field: {field:?}"),
262                )
263                .into()),
264            };
265        }
266
267        if field.is_nullable() && array.is_null(seq) {
268            return Ok(Value::Null);
269        }
270        match field.data_type() {
271            ArrowDataType::Null => Ok(Value::Null),
272            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
273                Some(array) => Ok(Value::Boolean(array.value(seq))),
274                None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
275            },
276            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
277                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
278                None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
279            },
280            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
281                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
282                None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
283            },
284            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
285                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
286                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
287            },
288            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
289                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
290                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
291            },
292            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
293                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
294                None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
295            },
296            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
297                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
298                None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
299            },
300            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
301                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
302                None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
303            },
304            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
305                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
306                None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
307            },
308            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
309                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
310                None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
311            },
312            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
313                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
314                None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
315            },
316            ArrowDataType::Decimal64(p, s) => {
317                match array.as_any().downcast_ref::<Decimal64Array>() {
318                    Some(array) => Ok(Value::Number(NumberValue::Decimal64(
319                        array.value(seq),
320                        DecimalSize {
321                            precision: *p,
322                            scale: *s as u8,
323                        },
324                    ))),
325                    None => Err(ConvertError::new("Decimal64", format!("{array:?}")).into()),
326                }
327            }
328            ArrowDataType::Decimal128(p, s) => {
329                match array.as_any().downcast_ref::<Decimal128Array>() {
330                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
331                        array.value(seq),
332                        DecimalSize {
333                            precision: *p,
334                            scale: *s as u8,
335                        },
336                    ))),
337                    None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
338                }
339            }
340            ArrowDataType::Decimal256(p, s) => {
341                match array.as_any().downcast_ref::<Decimal256Array>() {
342                    Some(array) => {
343                        let v = array.value(seq);
344                        let v = i256::from_le_bytes(v.to_le_bytes());
345                        Ok(Value::Number(NumberValue::Decimal256(
346                            v,
347                            DecimalSize {
348                                precision: *p,
349                                scale: *s as u8,
350                            },
351                        )))
352                    }
353                    None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
354                }
355            }
356
357            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
358                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
359                None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
360            },
361            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
362                match array.as_any().downcast_ref::<LargeBinaryArray>() {
363                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
364                    None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
365                }
366            }
367            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
368                Some(array) => Ok(Value::String(array.value(seq).to_string())),
369                None => Err(ConvertError::new("string", format!("{array:?}")).into()),
370            },
371            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
372                Some(array) => Ok(Value::String(array.value(seq).to_string())),
373                None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
374            },
375            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
376                Some(array) => Ok(Value::String(array.value(seq).to_string())),
377                None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
378            },
379            // we only support timestamp in microsecond in databend
380            ArrowDataType::Timestamp(unit, tz) => {
381                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
382                    Some(array) => {
383                        if unit != &TimeUnit::Microsecond {
384                            return Err(ConvertError::new("timestamp", format!("{array:?}"))
385                                .with_message(format!(
386                                    "unsupported timestamp unit: {unit:?}, only support microsecond"
387                                ))
388                                .into());
389                        }
390                        let ts = clamp_ts(array.value(seq));
391                        match tz {
392                            None => {
393                                let timestamp = Timestamp::from_microsecond(ts).map_err(|e| {
394                                    Error::Parsing(format!("Invalid timestamp_micros {ts}: {e}"))
395                                })?;
396                                let dt = timestamp.to_zoned(settings.timezone.clone());
397                                Ok(Value::Timestamp(dt))
398                            }
399                            Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
400                                .with_message(format!("non-UTC timezone not supported: {tz:?}"))
401                                .into()),
402                        }
403                    }
404                    None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
405                }
406            }
407            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
408                Some(array) => Ok(Value::Date(array.value(seq))),
409                None => Err(ConvertError::new("date", format!("{array:?}")).into()),
410            },
411            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
412                Some(array) => {
413                    let inner_array = unsafe { array.value_unchecked(seq) };
414                    let mut values = Vec::with_capacity(inner_array.len());
415                    for i in 0..inner_array.len() {
416                        let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
417                        values.push(value);
418                    }
419                    Ok(Value::Array(values))
420                }
421                None => Err(ConvertError::new("list", format!("{array:?}")).into()),
422            },
423            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
424                Some(array) => {
425                    let inner_array = unsafe { array.value_unchecked(seq) };
426                    let mut values = Vec::with_capacity(inner_array.len());
427                    for i in 0..inner_array.len() {
428                        let value = Value::try_from((f.as_ref(), &inner_array, i, settings))?;
429                        values.push(value);
430                    }
431                    Ok(Value::Array(values))
432                }
433                None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
434            },
435            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
436                Some(array) => {
437                    if let ArrowDataType::Struct(fs) = f.data_type() {
438                        let inner_array = unsafe { array.value_unchecked(seq) };
439                        let mut values = Vec::with_capacity(inner_array.len());
440                        for i in 0..inner_array.len() {
441                            let key = Value::try_from((
442                                fs[0].as_ref(),
443                                inner_array.column(0),
444                                i,
445                                settings,
446                            ))?;
447                            let val = Value::try_from((
448                                fs[1].as_ref(),
449                                inner_array.column(1),
450                                i,
451                                settings,
452                            ))?;
453                            values.push((key, val));
454                        }
455                        Ok(Value::Map(values))
456                    } else {
457                        Err(
458                            ConvertError::new("invalid map inner type", format!("{array:?}"))
459                                .into(),
460                        )
461                    }
462                }
463                None => Err(ConvertError::new("map", format!("{array:?}")).into()),
464            },
465            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
466                Some(array) => {
467                    let mut values = Vec::with_capacity(array.len());
468                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
469                        let value = Value::try_from((f.as_ref(), inner_array, seq, settings))?;
470                        values.push(value);
471                    }
472                    Ok(Value::Tuple(values))
473                }
474                None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
475            },
476            _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
477        }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use arrow_array::ArrayRef;
485    use std::collections::HashMap;
486
487    fn variant_field(data_type: ArrowDataType) -> ArrowField {
488        ArrowField::new("v", data_type, false).with_metadata(HashMap::from([(
489            EXTENSION_KEY.to_string(),
490            ARROW_EXT_TYPE_VARIANT.to_string(),
491        )]))
492    }
493
494    #[test]
495    fn decode_variant_from_utf8_array() {
496        let field = variant_field(ArrowDataType::Utf8);
497        let array: ArrayRef = Arc::new(StringArray::from(vec!["{\"a\":1}"]));
498
499        let value = Value::try_from((&field, &array, 0, &ResultFormatSettings::default())).unwrap();
500
501        assert_eq!(value, Value::Variant("{\"a\":1}".to_string()));
502    }
503
504    #[test]
505    fn decode_variant_from_large_utf8_array() {
506        let field = variant_field(ArrowDataType::LargeUtf8);
507        let array: ArrayRef = Arc::new(LargeStringArray::from(vec!["{\"b\":2}"]));
508
509        let value = Value::try_from((&field, &array, 0, &ResultFormatSettings::default())).unwrap();
510
511        assert_eq!(value, Value::Variant("{\"b\":2}".to_string()));
512    }
513
514    #[test]
515    fn decode_variant_from_large_binary_array_for_v2() {
516        let field = variant_field(ArrowDataType::LargeBinary);
517        let array: ArrayRef = Arc::new(LargeBinaryArray::from(vec![b"{\"c\":3}".as_slice()]));
518        let settings = ResultFormatSettings {
519            arrow_result_version: Some(2),
520            ..ResultFormatSettings::default()
521        };
522
523        let value = Value::try_from((&field, &array, 0, &settings)).unwrap();
524
525        assert_eq!(value, Value::Variant("{\"c\":3}".to_string()));
526    }
527}