bigbytes_driver_core/
value.rs

1// Copyright 2024 Digitrans Inc
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::hash::Hash;
17use std::io::BufRead;
18use std::io::Cursor;
19
20use arrow::datatypes::{i256, ArrowNativeTypeOp};
21use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime};
22
23use crate::cursor_ext::{
24    collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
25    ReadNumberExt,
26};
27
28use crate::{
29    error::{ConvertError, Error, Result},
30    schema::{DecimalDataType, DecimalSize},
31};
32
33use geozero::wkb::FromWkb;
34use geozero::wkb::WkbDialect;
35use geozero::wkt::Ewkt;
36use std::fmt::{Display, Formatter, Write};
37
38// Thu 1970-01-01 is R.D. 719163
39const DAYS_FROM_CE: i32 = 719_163;
40const NULL_VALUE: &str = "NULL";
41const TRUE_VALUE: &str = "1";
42const FALSE_VALUE: &str = "0";
43
44#[cfg(feature = "flight-sql")]
45use {
46    crate::schema::{
47        ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
48        ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
49        ARROW_EXT_TYPE_VARIANT, EXTENSION_KEY,
50    },
51    arrow_array::{
52        Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
53        Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
54        LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
55        StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array,
56        UInt64Array, UInt8Array,
57    },
58    arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit},
59    std::sync::Arc,
60};
61
62use crate::schema::{DataType, NumberDataType};
63
64#[derive(Clone, Debug, PartialEq)]
65pub enum NumberValue {
66    Int8(i8),
67    Int16(i16),
68    Int32(i32),
69    Int64(i64),
70    UInt8(u8),
71    UInt16(u16),
72    UInt32(u32),
73    UInt64(u64),
74    Float32(f32),
75    Float64(f64),
76    Decimal128(i128, DecimalSize),
77    Decimal256(i256, DecimalSize),
78}
79
80#[derive(Clone, Debug, PartialEq)]
81pub enum Value {
82    Null,
83    EmptyArray,
84    EmptyMap,
85    Boolean(bool),
86    Binary(Vec<u8>),
87    String(String),
88    Number(NumberValue),
89    /// Microseconds from 1970-01-01 00:00:00 UTC
90    Timestamp(i64),
91    Date(i32),
92    Array(Vec<Value>),
93    Map(Vec<(Value, Value)>),
94    Tuple(Vec<Value>),
95    Bitmap(String),
96    Variant(String),
97    Geometry(String),
98    Geography(String),
99    Interval(String),
100}
101
102impl Value {
103    pub fn get_type(&self) -> DataType {
104        match self {
105            Self::Null => DataType::Null,
106            Self::EmptyArray => DataType::EmptyArray,
107            Self::EmptyMap => DataType::EmptyMap,
108            Self::Boolean(_) => DataType::Boolean,
109            Self::Binary(_) => DataType::Binary,
110            Self::String(_) => DataType::String,
111            Self::Number(n) => match n {
112                NumberValue::Int8(_) => DataType::Number(NumberDataType::Int8),
113                NumberValue::Int16(_) => DataType::Number(NumberDataType::Int16),
114                NumberValue::Int32(_) => DataType::Number(NumberDataType::Int32),
115                NumberValue::Int64(_) => DataType::Number(NumberDataType::Int64),
116                NumberValue::UInt8(_) => DataType::Number(NumberDataType::UInt8),
117                NumberValue::UInt16(_) => DataType::Number(NumberDataType::UInt16),
118                NumberValue::UInt32(_) => DataType::Number(NumberDataType::UInt32),
119                NumberValue::UInt64(_) => DataType::Number(NumberDataType::UInt64),
120                NumberValue::Float32(_) => DataType::Number(NumberDataType::Float32),
121                NumberValue::Float64(_) => DataType::Number(NumberDataType::Float64),
122                NumberValue::Decimal128(_, s) => DataType::Decimal(DecimalDataType::Decimal128(*s)),
123                NumberValue::Decimal256(_, s) => DataType::Decimal(DecimalDataType::Decimal256(*s)),
124            },
125            Self::Timestamp(_) => DataType::Timestamp,
126
127            Self::Date(_) => DataType::Date,
128            Self::Interval(_) => DataType::Interval,
129            Self::Array(vals) => {
130                if vals.is_empty() {
131                    DataType::EmptyArray
132                } else {
133                    DataType::Array(Box::new(vals[0].get_type()))
134                }
135            }
136            Self::Map(kvs) => {
137                if kvs.is_empty() {
138                    DataType::EmptyMap
139                } else {
140                    let inner_ty = DataType::Tuple(vec![kvs[0].0.get_type(), kvs[0].1.get_type()]);
141                    DataType::Map(Box::new(inner_ty))
142                }
143            }
144            Self::Tuple(vals) => {
145                let inner_tys = vals.iter().map(|v| v.get_type()).collect::<Vec<_>>();
146                DataType::Tuple(inner_tys)
147            }
148            Self::Bitmap(_) => DataType::Bitmap,
149            Self::Variant(_) => DataType::Variant,
150            Self::Geometry(_) => DataType::Geometry,
151            Self::Geography(_) => DataType::Geography,
152        }
153    }
154}
155
156impl TryFrom<(&DataType, Option<&str>)> for Value {
157    type Error = Error;
158
159    fn try_from((t, v): (&DataType, Option<&str>)) -> Result<Self> {
160        match v {
161            Some(v) => Self::try_from((t, v)),
162            None => match t {
163                DataType::Null => Ok(Self::Null),
164                DataType::Nullable(_) => Ok(Self::Null),
165                _ => Err(Error::InvalidResponse(
166                    "NULL value for non-nullable field".to_string(),
167                )),
168            },
169        }
170    }
171}
172
173impl TryFrom<(&DataType, &str)> for Value {
174    type Error = Error;
175
176    fn try_from((t, v): (&DataType, &str)) -> Result<Self> {
177        match t {
178            DataType::Null => Ok(Self::Null),
179            DataType::EmptyArray => Ok(Self::EmptyArray),
180            DataType::EmptyMap => Ok(Self::EmptyMap),
181            DataType::Boolean => Ok(Self::Boolean(v == "1")),
182            DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
183            DataType::String => Ok(Self::String(v.to_string())),
184            DataType::Number(NumberDataType::Int8) => {
185                Ok(Self::Number(NumberValue::Int8(v.parse()?)))
186            }
187            DataType::Number(NumberDataType::Int16) => {
188                Ok(Self::Number(NumberValue::Int16(v.parse()?)))
189            }
190            DataType::Number(NumberDataType::Int32) => {
191                Ok(Self::Number(NumberValue::Int32(v.parse()?)))
192            }
193            DataType::Number(NumberDataType::Int64) => {
194                Ok(Self::Number(NumberValue::Int64(v.parse()?)))
195            }
196            DataType::Number(NumberDataType::UInt8) => {
197                Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
198            }
199            DataType::Number(NumberDataType::UInt16) => {
200                Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
201            }
202            DataType::Number(NumberDataType::UInt32) => {
203                Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
204            }
205            DataType::Number(NumberDataType::UInt64) => {
206                Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
207            }
208            DataType::Number(NumberDataType::Float32) => {
209                Ok(Self::Number(NumberValue::Float32(v.parse()?)))
210            }
211            DataType::Number(NumberDataType::Float64) => {
212                Ok(Self::Number(NumberValue::Float64(v.parse()?)))
213            }
214            DataType::Decimal(DecimalDataType::Decimal128(size)) => {
215                let d = parse_decimal(v, *size)?;
216                Ok(Self::Number(d))
217            }
218            DataType::Decimal(DecimalDataType::Decimal256(size)) => {
219                let d = parse_decimal(v, *size)?;
220                Ok(Self::Number(d))
221            }
222            DataType::Timestamp => Ok(Self::Timestamp(
223                NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")?
224                    .and_utc()
225                    .timestamp_micros(),
226            )),
227            DataType::Date => Ok(Self::Date(
228                NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE,
229            )),
230            DataType::Bitmap => Ok(Self::Bitmap(v.to_string())),
231            DataType::Variant => Ok(Self::Variant(v.to_string())),
232            DataType::Geometry => Ok(Self::Geometry(v.to_string())),
233            DataType::Geography => Ok(Self::Geography(v.to_string())),
234            DataType::Interval => Ok(Self::Interval(v.to_string())),
235            DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) => {
236                let mut reader = Cursor::new(v);
237                let decoder = ValueDecoder {};
238                decoder.read_field(t, &mut reader)
239            }
240            DataType::Nullable(inner) => match inner.as_ref() {
241                DataType::String => Ok(Self::String(v.to_string())),
242                _ => {
243                    // not string type, try to check if it is NULL
244                    // for compatible with old version server
245                    if v == NULL_VALUE {
246                        Ok(Self::Null)
247                    } else {
248                        Self::try_from((inner.as_ref(), v))
249                    }
250                }
251            },
252        }
253    }
254}
255
256#[cfg(feature = "flight-sql")]
257impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize)> for Value {
258    type Error = Error;
259    fn try_from(
260        (field, array, seq): (&ArrowField, &Arc<dyn ArrowArray>, usize),
261    ) -> std::result::Result<Self, Self::Error> {
262        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
263            return match extend_type.as_str() {
264                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
265                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
266                ARROW_EXT_TYPE_VARIANT => {
267                    if field.is_nullable() && array.is_null(seq) {
268                        return Ok(Value::Null);
269                    }
270                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
271                        Some(array) => Ok(Value::Variant(jsonb::to_string(array.value(seq)))),
272                        None => Err(ConvertError::new("variant", format!("{:?}", array)).into()),
273                    }
274                }
275                ARROW_EXT_TYPE_INTERVAL => {
276                    if field.is_nullable() && array.is_null(seq) {
277                        return Ok(Value::Null);
278                    }
279                    match array.as_any().downcast_ref::<Decimal128Array>() {
280                        Some(array) => {
281                            let res = months_days_micros(array.value(seq));
282                            Ok(Value::Interval(
283                                Interval {
284                                    months: res.months(),
285                                    days: res.days(),
286                                    micros: res.microseconds(),
287                                }
288                                .to_string(),
289                            ))
290                        }
291                        None => Err(ConvertError::new("Interval", format!("{:?}", array)).into()),
292                    }
293                }
294                ARROW_EXT_TYPE_BITMAP => {
295                    if field.is_nullable() && array.is_null(seq) {
296                        return Ok(Value::Null);
297                    }
298                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
299                        Some(array) => {
300                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
301                                .expect("failed to deserialize bitmap");
302                            let raw = rb.into_iter().collect::<Vec<_>>();
303                            let s = itertools::join(raw.iter(), ",");
304                            Ok(Value::Bitmap(s))
305                        }
306                        None => Err(ConvertError::new("bitmap", format!("{:?}", array)).into()),
307                    }
308                }
309                ARROW_EXT_TYPE_GEOMETRY => {
310                    if field.is_nullable() && array.is_null(seq) {
311                        return Ok(Value::Null);
312                    }
313                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
314                        Some(array) => {
315                            let wkt = parse_geometry(array.value(seq))?;
316                            Ok(Value::Geometry(wkt))
317                        }
318                        None => Err(ConvertError::new("geometry", format!("{:?}", array)).into()),
319                    }
320                }
321                ARROW_EXT_TYPE_GEOGRAPHY => {
322                    if field.is_nullable() && array.is_null(seq) {
323                        return Ok(Value::Null);
324                    }
325                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
326                        Some(array) => {
327                            let wkt = parse_geometry(array.value(seq))?;
328                            Ok(Value::Geography(wkt))
329                        }
330                        None => Err(ConvertError::new("geography", format!("{:?}", array)).into()),
331                    }
332                }
333                _ => Err(ConvertError::new(
334                    "extension",
335                    format!(
336                        "Unsupported extension datatype for arrow field: {:?}",
337                        field
338                    ),
339                )
340                .into()),
341            };
342        }
343
344        if field.is_nullable() && array.is_null(seq) {
345            return Ok(Value::Null);
346        }
347        match field.data_type() {
348            ArrowDataType::Null => Ok(Value::Null),
349            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
350                Some(array) => Ok(Value::Boolean(array.value(seq))),
351                None => Err(ConvertError::new("bool", format!("{:?}", array)).into()),
352            },
353            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
354                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
355                None => Err(ConvertError::new("int8", format!("{:?}", array)).into()),
356            },
357            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
358                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
359                None => Err(ConvertError::new("int16", format!("{:?}", array)).into()),
360            },
361            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
362                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
363                None => Err(ConvertError::new("int64", format!("{:?}", array)).into()),
364            },
365            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
366                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
367                None => Err(ConvertError::new("int64", format!("{:?}", array)).into()),
368            },
369            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
370                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
371                None => Err(ConvertError::new("uint8", format!("{:?}", array)).into()),
372            },
373            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
374                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
375                None => Err(ConvertError::new("uint16", format!("{:?}", array)).into()),
376            },
377            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
378                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
379                None => Err(ConvertError::new("uint32", format!("{:?}", array)).into()),
380            },
381            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
382                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
383                None => Err(ConvertError::new("uint64", format!("{:?}", array)).into()),
384            },
385            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
386                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
387                None => Err(ConvertError::new("float32", format!("{:?}", array)).into()),
388            },
389            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
390                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
391                None => Err(ConvertError::new("float64", format!("{:?}", array)).into()),
392            },
393
394            ArrowDataType::Decimal128(p, s) => {
395                match array.as_any().downcast_ref::<Decimal128Array>() {
396                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
397                        array.value(seq),
398                        DecimalSize {
399                            precision: *p,
400                            scale: *s as u8,
401                        },
402                    ))),
403                    None => Err(ConvertError::new("Decimal128", format!("{:?}", array)).into()),
404                }
405            }
406            ArrowDataType::Decimal256(p, s) => {
407                match array.as_any().downcast_ref::<Decimal256Array>() {
408                    Some(array) => Ok(Value::Number(NumberValue::Decimal256(
409                        array.value(seq),
410                        DecimalSize {
411                            precision: *p,
412                            scale: *s as u8,
413                        },
414                    ))),
415                    None => Err(ConvertError::new("Decimal256", format!("{:?}", array)).into()),
416                }
417            }
418
419            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
420                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
421                None => Err(ConvertError::new("binary", format!("{:?}", array)).into()),
422            },
423            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
424                match array.as_any().downcast_ref::<LargeBinaryArray>() {
425                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
426                    None => Err(ConvertError::new("large binary", format!("{:?}", array)).into()),
427                }
428            }
429            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
430                Some(array) => Ok(Value::String(array.value(seq).to_string())),
431                None => Err(ConvertError::new("string", format!("{:?}", array)).into()),
432            },
433            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
434                Some(array) => Ok(Value::String(array.value(seq).to_string())),
435                None => Err(ConvertError::new("large string", format!("{:?}", array)).into()),
436            },
437            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
438                Some(array) => Ok(Value::String(array.value(seq).to_string())),
439                None => Err(ConvertError::new("string view", format!("{:?}", array)).into()),
440            },
441            // we only support timestamp in microsecond in databend
442            ArrowDataType::Timestamp(unit, tz) => {
443                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
444                    Some(array) => {
445                        if unit != &TimeUnit::Microsecond {
446                            return Err(ConvertError::new("timestamp", format!("{:?}", array))
447                                .with_message(format!(
448                                    "unsupported timestamp unit: {:?}, only support microsecond",
449                                    unit
450                                ))
451                                .into());
452                        }
453                        let ts = array.value(seq);
454                        match tz {
455                            None => Ok(Value::Timestamp(ts)),
456                            Some(tz) => Err(ConvertError::new("timestamp", format!("{:?}", array))
457                                .with_message(format!("non-UTC timezone not supported: {:?}", tz))
458                                .into()),
459                        }
460                    }
461                    None => Err(ConvertError::new("timestamp", format!("{:?}", array)).into()),
462                }
463            }
464            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
465                Some(array) => Ok(Value::Date(array.value(seq))),
466                None => Err(ConvertError::new("date", format!("{:?}", array)).into()),
467            },
468            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
469                Some(array) => {
470                    let inner_array = unsafe { array.value_unchecked(seq) };
471                    let mut values = Vec::with_capacity(inner_array.len());
472                    for i in 0..inner_array.len() {
473                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
474                        values.push(value);
475                    }
476                    Ok(Value::Array(values))
477                }
478                None => Err(ConvertError::new("list", format!("{:?}", array)).into()),
479            },
480            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
481                Some(array) => {
482                    let inner_array = unsafe { array.value_unchecked(seq) };
483                    let mut values = Vec::with_capacity(inner_array.len());
484                    for i in 0..inner_array.len() {
485                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
486                        values.push(value);
487                    }
488                    Ok(Value::Array(values))
489                }
490                None => Err(ConvertError::new("large list", format!("{:?}", array)).into()),
491            },
492            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
493                Some(array) => {
494                    if let ArrowDataType::Struct(fs) = f.data_type() {
495                        let inner_array = unsafe { array.value_unchecked(seq) };
496                        let mut values = Vec::with_capacity(inner_array.len());
497                        for i in 0..inner_array.len() {
498                            let key = Value::try_from((fs[0].as_ref(), inner_array.column(0), i))?;
499                            let val = Value::try_from((fs[1].as_ref(), inner_array.column(1), i))?;
500                            values.push((key, val));
501                        }
502                        Ok(Value::Map(values))
503                    } else {
504                        Err(
505                            ConvertError::new("invalid map inner type", format!("{:?}", array))
506                                .into(),
507                        )
508                    }
509                }
510                None => Err(ConvertError::new("map", format!("{:?}", array)).into()),
511            },
512            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
513                Some(array) => {
514                    let mut values = Vec::with_capacity(array.len());
515                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
516                        let value = Value::try_from((f.as_ref(), inner_array, seq))?;
517                        values.push(value);
518                    }
519                    Ok(Value::Tuple(values))
520                }
521                None => Err(ConvertError::new("struct", format!("{:?}", array)).into()),
522            },
523            _ => Err(ConvertError::new("unsupported data type", format!("{:?}", array)).into()),
524        }
525    }
526}
527
528impl TryFrom<Value> for String {
529    type Error = Error;
530    fn try_from(val: Value) -> Result<Self> {
531        match val {
532            Value::String(s) => Ok(s),
533            Value::Bitmap(s) => Ok(s),
534            Value::Number(NumberValue::Decimal128(v, s)) => Ok(display_decimal_128(v, s.scale)),
535            Value::Number(NumberValue::Decimal256(v, s)) => Ok(display_decimal_256(v, s.scale)),
536            Value::Geometry(s) => Ok(s),
537            Value::Geography(s) => Ok(s),
538            Value::Interval(s) => Ok(s),
539            Value::Variant(s) => Ok(s),
540            _ => Err(ConvertError::new("string", format!("{:?}", val)).into()),
541        }
542    }
543}
544
545impl TryFrom<Value> for bool {
546    type Error = Error;
547    fn try_from(val: Value) -> Result<Self> {
548        match val {
549            Value::Boolean(b) => Ok(b),
550            Value::Number(n) => Ok(n != NumberValue::Int8(0)),
551            _ => Err(ConvertError::new("bool", format!("{:?}", val)).into()),
552        }
553    }
554}
555
556// This macro implements TryFrom for NumberValue
557macro_rules! impl_try_from_number_value {
558    ($($t:ty),*) => {
559        $(
560            impl TryFrom<Value> for $t {
561                type Error = Error;
562                fn try_from(val: Value) -> Result<Self> {
563                    match val {
564                        Value::Number(NumberValue::Int8(i)) => Ok(i as $t),
565                        Value::Number(NumberValue::Int16(i)) => Ok(i as $t),
566                        Value::Number(NumberValue::Int32(i)) => Ok(i as $t),
567                        Value::Number(NumberValue::Int64(i)) => Ok(i as $t),
568                        Value::Number(NumberValue::UInt8(i)) => Ok(i as $t),
569                        Value::Number(NumberValue::UInt16(i)) => Ok(i as $t),
570                        Value::Number(NumberValue::UInt32(i)) => Ok(i as $t),
571                        Value::Number(NumberValue::UInt64(i)) => Ok(i as $t),
572                        Value::Number(NumberValue::Float32(i)) => Ok(i as $t),
573                        Value::Number(NumberValue::Float64(i)) => Ok(i as $t),
574                        Value::Date(i) => Ok(i as $t),
575                        Value::Timestamp(i) => Ok(i as $t),
576                        _ => Err(ConvertError::new("number", format!("{:?}", val)).into()),
577                    }
578                }
579            }
580        )*
581    };
582}
583
584impl_try_from_number_value!(u8);
585impl_try_from_number_value!(u16);
586impl_try_from_number_value!(u32);
587impl_try_from_number_value!(u64);
588impl_try_from_number_value!(i8);
589impl_try_from_number_value!(i16);
590impl_try_from_number_value!(i32);
591impl_try_from_number_value!(i64);
592impl_try_from_number_value!(f32);
593impl_try_from_number_value!(f64);
594
595impl TryFrom<Value> for NaiveDateTime {
596    type Error = Error;
597    fn try_from(val: Value) -> Result<Self> {
598        match val {
599            Value::Timestamp(i) => {
600                let secs = i / 1_000_000;
601                let nanos = ((i % 1_000_000) * 1000) as u32;
602                match DateTime::from_timestamp(secs, nanos) {
603                    Some(t) => Ok(t.naive_utc()),
604                    None => Err(ConvertError::new("NaiveDateTime", "".to_string()).into()),
605                }
606            }
607            _ => Err(ConvertError::new("NaiveDateTime", format!("{}", val)).into()),
608        }
609    }
610}
611
612impl TryFrom<Value> for NaiveDate {
613    type Error = Error;
614    fn try_from(val: Value) -> Result<Self> {
615        match val {
616            Value::Date(i) => {
617                let days = i + DAYS_FROM_CE;
618                match NaiveDate::from_num_days_from_ce_opt(days) {
619                    Some(d) => Ok(d),
620                    None => Err(ConvertError::new("NaiveDate", "".to_string()).into()),
621                }
622            }
623            _ => Err(ConvertError::new("NaiveDate", format!("{}", val)).into()),
624        }
625    }
626}
627
628impl<V> TryFrom<Value> for Vec<V>
629where
630    V: TryFrom<Value, Error = Error>,
631{
632    type Error = Error;
633    fn try_from(val: Value) -> Result<Self> {
634        match val {
635            Value::Binary(vals) => vals
636                .into_iter()
637                .map(|v| V::try_from(Value::Number(NumberValue::UInt8(v))))
638                .collect(),
639            Value::Array(vals) => vals.into_iter().map(V::try_from).collect(),
640            Value::EmptyArray => Ok(vec![]),
641            _ => Err(ConvertError::new("Vec", format!("{}", val)).into()),
642        }
643    }
644}
645
646impl<K, V> TryFrom<Value> for HashMap<K, V>
647where
648    K: TryFrom<Value, Error = Error> + Eq + Hash,
649    V: TryFrom<Value, Error = Error>,
650{
651    type Error = Error;
652    fn try_from(val: Value) -> Result<Self> {
653        match val {
654            Value::Map(kvs) => {
655                let mut map = HashMap::new();
656                for (k, v) in kvs {
657                    let k = K::try_from(k)?;
658                    let v = V::try_from(v)?;
659                    map.insert(k, v);
660                }
661                Ok(map)
662            }
663            Value::EmptyMap => Ok(HashMap::new()),
664            _ => Err(ConvertError::new("HashMap", format!("{}", val)).into()),
665        }
666    }
667}
668
669macro_rules! replace_expr {
670    ($_t:tt $sub:expr) => {
671        $sub
672    };
673}
674
675// This macro implements TryFrom for tuple of types
676macro_rules! impl_tuple_from_value {
677    ( $($Ti:tt),+ ) => {
678        impl<$($Ti),+> TryFrom<Value> for ($($Ti,)+)
679        where
680            $($Ti: TryFrom<Value>),+
681        {
682            type Error = String;
683            fn try_from(val: Value) -> Result<Self, String> {
684                // It is not possible yet to get the number of metavariable repetitions
685                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
686                // This is a workaround
687                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
688
689                match val {
690                    Value::Tuple(vals) => {
691                        if expected_len != vals.len() {
692                            return Err(format!("value tuple size mismatch: expected {} columns, got {}", expected_len, vals.len()));
693                        }
694                        let mut vals_iter = vals.into_iter().enumerate();
695
696                        Ok((
697                            $(
698                                {
699                                    let (col_ix, col_value) = vals_iter
700                                        .next()
701                                        .unwrap(); // vals_iter size is checked before this code is reached,
702                                                   // so it is safe to unwrap
703                                    let t = col_value.get_type();
704                                    $Ti::try_from(col_value)
705                                        .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
706                                }
707                            ,)+
708                        ))
709                    }
710                    _ => Err(format!("expected tuple, got {:?}", val)),
711                }
712            }
713        }
714    }
715}
716
717// Implement From Value for tuples of size up to 16
718impl_tuple_from_value!(T1);
719impl_tuple_from_value!(T1, T2);
720impl_tuple_from_value!(T1, T2, T3);
721impl_tuple_from_value!(T1, T2, T3, T4);
722impl_tuple_from_value!(T1, T2, T3, T4, T5);
723impl_tuple_from_value!(T1, T2, T3, T4, T5, T6);
724impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7);
725impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8);
726impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
727impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
728impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
729impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
730impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
731impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
732impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
733impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
734impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17);
735impl_tuple_from_value!(
736    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18
737);
738impl_tuple_from_value!(
739    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19
740);
741impl_tuple_from_value!(
742    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20
743);
744impl_tuple_from_value!(
745    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21
746);
747impl_tuple_from_value!(
748    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21,
749    T22
750);
751
752// This macro implements TryFrom to Option for Nullable column
753macro_rules! impl_try_from_to_option {
754    ($($t:ty),*) => {
755        $(
756            impl TryFrom<Value> for Option<$t> {
757                type Error = Error;
758                fn try_from(val: Value) -> Result<Self> {
759                    match val {
760                        Value::Null => Ok(None),
761                        _ => {
762                            let inner: $t = val.try_into()?;
763                            Ok(Some(inner))
764                        },
765                    }
766
767                }
768            }
769        )*
770    };
771}
772
773impl_try_from_to_option!(String);
774impl_try_from_to_option!(bool);
775impl_try_from_to_option!(u8);
776impl_try_from_to_option!(u16);
777impl_try_from_to_option!(u32);
778impl_try_from_to_option!(u64);
779impl_try_from_to_option!(i8);
780impl_try_from_to_option!(i16);
781impl_try_from_to_option!(i32);
782impl_try_from_to_option!(i64);
783impl_try_from_to_option!(f32);
784impl_try_from_to_option!(f64);
785impl_try_from_to_option!(NaiveDateTime);
786impl_try_from_to_option!(NaiveDate);
787
788impl std::fmt::Display for NumberValue {
789    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
790        match self {
791            NumberValue::Int8(i) => write!(f, "{}", i),
792            NumberValue::Int16(i) => write!(f, "{}", i),
793            NumberValue::Int32(i) => write!(f, "{}", i),
794            NumberValue::Int64(i) => write!(f, "{}", i),
795            NumberValue::UInt8(i) => write!(f, "{}", i),
796            NumberValue::UInt16(i) => write!(f, "{}", i),
797            NumberValue::UInt32(i) => write!(f, "{}", i),
798            NumberValue::UInt64(i) => write!(f, "{}", i),
799            NumberValue::Float32(i) => write!(f, "{}", i),
800            NumberValue::Float64(i) => write!(f, "{}", i),
801            NumberValue::Decimal128(v, s) => write!(f, "{}", display_decimal_128(*v, s.scale)),
802            NumberValue::Decimal256(v, s) => write!(f, "{}", display_decimal_256(*v, s.scale)),
803        }
804    }
805}
806
807impl std::fmt::Display for Value {
808    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
809        encode_value(f, self, true)
810    }
811}
812
813// Compatible with Databend, inner values of nested types are quoted.
814fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result {
815    match val {
816        Value::Null => write!(f, "NULL"),
817        Value::EmptyArray => write!(f, "[]"),
818        Value::EmptyMap => write!(f, "{{}}"),
819        Value::Boolean(b) => {
820            if *b {
821                write!(f, "true")
822            } else {
823                write!(f, "false")
824            }
825        }
826        Value::Number(n) => write!(f, "{}", n),
827        Value::Binary(s) => write!(f, "{}", hex::encode_upper(s)),
828        Value::String(s)
829        | Value::Bitmap(s)
830        | Value::Variant(s)
831        | Value::Interval(s)
832        | Value::Geometry(s)
833        | Value::Geography(s) => {
834            if raw {
835                write!(f, "{}", s)
836            } else {
837                write!(f, "'{}'", s)
838            }
839        }
840        Value::Timestamp(i) => {
841            let secs = i / 1_000_000;
842            let nanos = ((i % 1_000_000) * 1000) as u32;
843            let t = DateTime::from_timestamp(secs, nanos).unwrap_or_default();
844            let t = t.naive_utc();
845            if raw {
846                write!(f, "{}", t)
847            } else {
848                write!(f, "'{}'", t)
849            }
850        }
851        Value::Date(i) => {
852            let days = i + DAYS_FROM_CE;
853            let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
854            if raw {
855                write!(f, "{}", d)
856            } else {
857                write!(f, "'{}'", d)
858            }
859        }
860        Value::Array(vals) => {
861            write!(f, "[")?;
862            for (i, val) in vals.iter().enumerate() {
863                if i > 0 {
864                    write!(f, ",")?;
865                }
866                encode_value(f, val, false)?;
867            }
868            write!(f, "]")?;
869            Ok(())
870        }
871        Value::Map(kvs) => {
872            write!(f, "{{")?;
873            for (i, (key, val)) in kvs.iter().enumerate() {
874                if i > 0 {
875                    write!(f, ",")?;
876                }
877                encode_value(f, key, false)?;
878                write!(f, ":")?;
879                encode_value(f, val, false)?;
880            }
881            write!(f, "}}")?;
882            Ok(())
883        }
884        Value::Tuple(vals) => {
885            write!(f, "(")?;
886            for (i, val) in vals.iter().enumerate() {
887                if i > 0 {
888                    write!(f, ",")?;
889                }
890                encode_value(f, val, false)?;
891            }
892            write!(f, ")")?;
893            Ok(())
894        }
895    }
896}
897
898pub fn display_decimal_128(num: i128, scale: u8) -> String {
899    let mut buf = String::new();
900    if scale == 0 {
901        write!(buf, "{}", num).unwrap();
902    } else {
903        let pow_scale = 10_i128.pow(scale as u32);
904        if num >= 0 {
905            write!(
906                buf,
907                "{}.{:0>width$}",
908                num / pow_scale,
909                (num % pow_scale).abs(),
910                width = scale as usize
911            )
912            .unwrap();
913        } else {
914            write!(
915                buf,
916                "-{}.{:0>width$}",
917                -num / pow_scale,
918                (num % pow_scale).abs(),
919                width = scale as usize
920            )
921            .unwrap();
922        }
923    }
924    buf
925}
926
927pub fn display_decimal_256(num: i256, scale: u8) -> String {
928    let mut buf = String::new();
929    if scale == 0 {
930        write!(buf, "{}", num).unwrap();
931    } else {
932        let pow_scale = i256::from_i128(10i128).pow_wrapping(scale as u32);
933        let width = scale as usize;
934        // -1/10 = 0
935        let (int_part, neg) = if num >= i256::ZERO {
936            (num / pow_scale, "")
937        } else {
938            (-num / pow_scale, "-")
939        };
940        let frac_part = (num % pow_scale).wrapping_abs();
941
942        match frac_part.to_i128() {
943            Some(frac_part) => {
944                write!(
945                    buf,
946                    "{}{}.{:0>width$}",
947                    neg,
948                    int_part,
949                    frac_part,
950                    width = width
951                )
952                .unwrap();
953            }
954            None => {
955                // fractional part is too big for display,
956                // split it into two parts.
957                let pow = i256::from_i128(10i128).pow_wrapping(38);
958                let frac_high_part = frac_part / pow;
959                let frac_low_part = frac_part % pow;
960                let frac_width = (scale - 38) as usize;
961
962                write!(
963                    buf,
964                    "{}{}.{:0>width$}{}",
965                    neg,
966                    int_part,
967                    frac_high_part.to_i128().unwrap(),
968                    frac_low_part.to_i128().unwrap(),
969                    width = frac_width
970                )
971                .unwrap();
972            }
973        }
974    }
975    buf
976}
977
978/// assume text is from
979/// used only for expr, so put more weight on readability
980pub fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
981    let mut start = 0;
982    let bytes = text.as_bytes();
983    let mut is_negative = false;
984
985    // Check if the number is negative
986    if bytes[start] == b'-' {
987        is_negative = true;
988        start += 1;
989    }
990
991    while start < text.len() && bytes[start] == b'0' {
992        start += 1
993    }
994    let text = &text[start..];
995    let point_pos = text.find('.');
996    let e_pos = text.find(|c| ['E', 'e'].contains(&c));
997    let (i_part, f_part, e_part) = match (point_pos, e_pos) {
998        (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
999        (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
1000        (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
1001        (None, None) => (text, "", None),
1002    };
1003    let exp = match e_part {
1004        Some(s) => s.parse::<i32>()?,
1005        None => 0,
1006    };
1007    if i_part.len() as i32 + exp > 76 {
1008        Err(ConvertError::new("decimal", format!("{:?}", text)).into())
1009    } else {
1010        let mut digits = Vec::with_capacity(76);
1011        digits.extend_from_slice(i_part.as_bytes());
1012        digits.extend_from_slice(f_part.as_bytes());
1013        if digits.is_empty() {
1014            digits.push(b'0')
1015        }
1016        let scale = f_part.len() as i32 - exp;
1017        if scale < 0 {
1018            // e.g 123.1e3
1019            for _ in 0..(-scale) {
1020                digits.push(b'0')
1021            }
1022        };
1023
1024        let precision = std::cmp::min(digits.len(), 76);
1025        let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
1026
1027        let result = if size.precision > 38 {
1028            NumberValue::Decimal256(i256::from_string(digits).unwrap(), size)
1029        } else {
1030            NumberValue::Decimal128(digits.parse::<i128>()?, size)
1031        };
1032
1033        // If the number was negative, negate the result
1034        if is_negative {
1035            match result {
1036                NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
1037                NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
1038                _ => Ok(result),
1039            }
1040        } else {
1041            Ok(result)
1042        }
1043    }
1044}
1045
1046#[derive(Debug, Copy, Clone, PartialEq, Default)]
1047pub struct Interval {
1048    pub months: i32,
1049    pub days: i32,
1050    pub micros: i64,
1051}
1052
1053impl Display for Interval {
1054    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1055        let mut buffer = [0u8; 70];
1056        let len = IntervalToStringCast::format(*self, &mut buffer);
1057        write!(f, "{}", String::from_utf8_lossy(&buffer[..len]))
1058    }
1059}
1060
1061struct IntervalToStringCast;
1062
1063impl IntervalToStringCast {
1064    fn format_signed_number(value: i64, buffer: &mut [u8], length: &mut usize) {
1065        let s = value.to_string();
1066        let bytes = s.as_bytes();
1067        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1068        *length += bytes.len();
1069    }
1070
1071    fn format_two_digits(value: i64, buffer: &mut [u8], length: &mut usize) {
1072        let s = format!("{:02}", value.abs());
1073        let bytes = s.as_bytes();
1074        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1075        *length += bytes.len();
1076    }
1077
1078    fn format_interval_value(value: i32, buffer: &mut [u8], length: &mut usize, name: &str) {
1079        if value == 0 {
1080            return;
1081        }
1082        if *length != 0 {
1083            buffer[*length] = b' ';
1084            *length += 1;
1085        }
1086        Self::format_signed_number(value as i64, buffer, length);
1087        let name_bytes = name.as_bytes();
1088        buffer[*length..*length + name_bytes.len()].copy_from_slice(name_bytes);
1089        *length += name_bytes.len();
1090        if value != 1 && value != -1 {
1091            buffer[*length] = b's';
1092            *length += 1;
1093        }
1094    }
1095
1096    fn format_micros(mut micros: i64, buffer: &mut [u8], length: &mut usize) {
1097        if micros < 0 {
1098            micros = -micros;
1099        }
1100        let s = format!("{:06}", micros);
1101        let bytes = s.as_bytes();
1102        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1103        *length += bytes.len();
1104
1105        while *length > 0 && buffer[*length - 1] == b'0' {
1106            *length -= 1;
1107        }
1108    }
1109
1110    pub fn format(interval: Interval, buffer: &mut [u8]) -> usize {
1111        let mut length = 0;
1112        if interval.months != 0 {
1113            let years = interval.months / 12;
1114            let months = interval.months - years * 12;
1115            Self::format_interval_value(years, buffer, &mut length, " year");
1116            Self::format_interval_value(months, buffer, &mut length, " month");
1117        }
1118        if interval.days != 0 {
1119            Self::format_interval_value(interval.days, buffer, &mut length, " day");
1120        }
1121        if interval.micros != 0 {
1122            if length != 0 {
1123                buffer[length] = b' ';
1124                length += 1;
1125            }
1126            let mut micros = interval.micros;
1127            if micros < 0 {
1128                buffer[length] = b'-';
1129                length += 1;
1130                micros = -micros;
1131            }
1132            let hour = micros / MINROS_PER_HOUR;
1133            micros -= hour * MINROS_PER_HOUR;
1134            let min = micros / MICROS_PER_MINUTE;
1135            micros -= min * MICROS_PER_MINUTE;
1136            let sec = micros / MICROS_PER_SEC;
1137            micros -= sec * MICROS_PER_SEC;
1138
1139            Self::format_signed_number(hour, buffer, &mut length);
1140            buffer[length] = b':';
1141            length += 1;
1142            Self::format_two_digits(min, buffer, &mut length);
1143            buffer[length] = b':';
1144            length += 1;
1145            Self::format_two_digits(sec, buffer, &mut length);
1146            if micros != 0 {
1147                buffer[length] = b'.';
1148                length += 1;
1149                Self::format_micros(micros, buffer, &mut length);
1150            }
1151        } else if length == 0 {
1152            buffer[..8].copy_from_slice(b"00:00:00");
1153            return 8;
1154        }
1155        length
1156    }
1157}
1158
1159impl Interval {
1160    pub fn from_string(str: &str) -> Result<Self> {
1161        Self::from_cstring(str.as_bytes())
1162    }
1163
1164    pub fn from_cstring(str: &[u8]) -> Result<Self> {
1165        let mut result = Interval::default();
1166        let mut pos = 0;
1167        let len = str.len();
1168        let mut found_any = false;
1169
1170        if len == 0 {
1171            return Err(Error::BadArgument("Empty string".to_string()));
1172        }
1173        match str[pos] {
1174            b'@' => {
1175                pos += 1;
1176            }
1177            b'P' | b'p' => {
1178                return Err(Error::BadArgument(
1179                    "Posix intervals not supported yet".to_string(),
1180                ));
1181            }
1182            _ => {}
1183        }
1184
1185        while pos < len {
1186            match str[pos] {
1187                b' ' | b'\t' | b'\n' => {
1188                    pos += 1;
1189                    continue;
1190                }
1191                b'0'..=b'9' => {
1192                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1193                    pos += next_pos;
1194                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1195
1196                    pos += next_pos;
1197                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1198                    found_any = true;
1199                }
1200                b'-' => {
1201                    pos += 1;
1202                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1203                    let number = -number;
1204                    let fraction = -fraction;
1205
1206                    pos += next_pos;
1207
1208                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1209
1210                    pos += next_pos;
1211                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1212                    found_any = true;
1213                }
1214                b'a' | b'A' => {
1215                    if len - pos < 3
1216                        || str[pos + 1] != b'g' && str[pos + 1] != b'G'
1217                        || str[pos + 2] != b'o' && str[pos + 2] != b'O'
1218                    {
1219                        return Err(Error::BadArgument("Invalid 'ago' specifier".to_string()));
1220                    }
1221                    pos += 3;
1222                    while pos < len {
1223                        match str[pos] {
1224                            b' ' | b'\t' | b'\n' => {
1225                                pos += 1;
1226                            }
1227                            _ => {
1228                                return Err(Error::BadArgument(
1229                                    "Trailing characters after 'ago'".to_string(),
1230                                ));
1231                            }
1232                        }
1233                    }
1234                    result.months = -result.months;
1235                    result.days = -result.days;
1236                    result.micros = -result.micros;
1237                    return Ok(result);
1238                }
1239                _ => {
1240                    return Err(Error::BadArgument(format!(
1241                        "Unexpected character at position {}",
1242                        pos
1243                    )));
1244                }
1245            }
1246        }
1247
1248        if !found_any {
1249            return Err(Error::BadArgument(
1250                "No interval specifiers found".to_string(),
1251            ));
1252        }
1253        Ok(result)
1254    }
1255}
1256
1257fn parse_number(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1258    let mut number: i64 = 0;
1259    let mut fraction: i64 = 0;
1260    let mut pos = 0;
1261
1262    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1263        number = number
1264            .checked_mul(10)
1265            .ok_or(Error::BadArgument("Number too large".to_string()))?
1266            + (bytes[pos] - b'0') as i64;
1267        pos += 1;
1268    }
1269
1270    if pos < bytes.len() && bytes[pos] == b'.' {
1271        pos += 1;
1272        let mut mult: i64 = 100000;
1273        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1274            if mult > 0 {
1275                fraction += (bytes[pos] - b'0') as i64 * mult;
1276            }
1277            mult /= 10;
1278            pos += 1;
1279        }
1280    }
1281    if pos < bytes.len() && bytes[pos] == b':' {
1282        // parse time format HH:MM:SS[.FFFFFF]
1283        let time_bytes = &bytes[pos..];
1284        let mut time_pos = 0;
1285        let mut total_micros: i64 = number * 60 * 60 * MICROS_PER_SEC;
1286        let mut colon_count = 0;
1287
1288        while colon_count < 2 && time_bytes.len() > time_pos {
1289            let (minute, _, next_pos) = parse_time_part(&time_bytes[time_pos..])?;
1290            let minute_micros = minute * 60 * MICROS_PER_SEC;
1291            total_micros += minute_micros;
1292            time_pos += next_pos;
1293
1294            if time_bytes.len() > time_pos && time_bytes[time_pos] == b':' {
1295                time_pos += 1;
1296                colon_count += 1;
1297            } else {
1298                break;
1299            }
1300        }
1301        if time_bytes.len() > time_pos {
1302            let (seconds, nanos, next_pos) = parse_time_part_with_nanos(&time_bytes[time_pos..])?;
1303            total_micros += seconds * MICROS_PER_SEC + nanos;
1304            time_pos += next_pos;
1305        }
1306        return Ok((total_micros, 0, pos + time_pos));
1307    }
1308
1309    if pos == 0 {
1310        return Err(Error::BadArgument("Expected number".to_string()));
1311    }
1312
1313    Ok((number, fraction, pos))
1314}
1315
1316fn parse_time_part(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1317    let mut number: i64 = 0;
1318    let mut pos = 0;
1319    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1320        number = number
1321            .checked_mul(10)
1322            .ok_or(Error::BadArgument("Number too large".to_string()))?
1323            + (bytes[pos] - b'0') as i64;
1324        pos += 1;
1325    }
1326    Ok((number, 0, pos))
1327}
1328
1329fn parse_time_part_with_nanos(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1330    let mut number: i64 = 0;
1331    let mut fraction: i64 = 0;
1332    let mut pos = 0;
1333
1334    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1335        number = number
1336            .checked_mul(10)
1337            .ok_or(Error::BadArgument("Number too large".to_string()))?
1338            + (bytes[pos] - b'0') as i64;
1339        pos += 1;
1340    }
1341
1342    if pos < bytes.len() && bytes[pos] == b'.' {
1343        pos += 1;
1344        let mut mult: i64 = 100000000;
1345        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1346            if mult > 0 {
1347                fraction += (bytes[pos] - b'0') as i64 * mult;
1348            }
1349            mult /= 10;
1350            pos += 1;
1351        }
1352    }
1353
1354    Ok((number, fraction, pos))
1355}
1356
1357fn parse_identifier(s: &[u8]) -> (String, usize) {
1358    let mut pos = 0;
1359    while pos < s.len() && (s[pos] == b' ' || s[pos] == b'\t' || s[pos] == b'\n') {
1360        pos += 1;
1361    }
1362    let start_pos = pos;
1363    while pos < s.len() && (s[pos].is_ascii_alphabetic()) {
1364        pos += 1;
1365    }
1366
1367    if pos == start_pos {
1368        return ("".to_string(), pos);
1369    }
1370
1371    let identifier = String::from_utf8_lossy(&s[start_pos..pos]).to_string();
1372    (identifier, pos)
1373}
1374
1375#[derive(Debug, PartialEq, Eq)]
1376enum DatePartSpecifier {
1377    Millennium,
1378    Century,
1379    Decade,
1380    Year,
1381    Quarter,
1382    Month,
1383    Day,
1384    Week,
1385    Microseconds,
1386    Milliseconds,
1387    Second,
1388    Minute,
1389    Hour,
1390}
1391
1392fn try_get_date_part_specifier(specifier_str: &str) -> Result<DatePartSpecifier> {
1393    match specifier_str.to_lowercase().as_str() {
1394        "millennium" | "millennia" => Ok(DatePartSpecifier::Millennium),
1395        "century" | "centuries" => Ok(DatePartSpecifier::Century),
1396        "decade" | "decades" => Ok(DatePartSpecifier::Decade),
1397        "year" | "years" | "y" => Ok(DatePartSpecifier::Year),
1398        "quarter" | "quarters" => Ok(DatePartSpecifier::Quarter),
1399        "month" | "months" | "mon" => Ok(DatePartSpecifier::Month),
1400        "day" | "days" | "d" => Ok(DatePartSpecifier::Day),
1401        "week" | "weeks" | "w" => Ok(DatePartSpecifier::Week),
1402        "microsecond" | "microseconds" | "us" => Ok(DatePartSpecifier::Microseconds),
1403        "millisecond" | "milliseconds" | "ms" => Ok(DatePartSpecifier::Milliseconds),
1404        "second" | "seconds" | "s" => Ok(DatePartSpecifier::Second),
1405        "minute" | "minutes" | "m" => Ok(DatePartSpecifier::Minute),
1406        "hour" | "hours" | "h" => Ok(DatePartSpecifier::Hour),
1407        _ => Err(Error::BadArgument(format!(
1408            "Invalid date part specifier: {}",
1409            specifier_str
1410        ))),
1411    }
1412}
1413
1414const MICROS_PER_SEC: i64 = 1_000_000;
1415const MICROS_PER_MSEC: i64 = 1_000;
1416const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SEC;
1417const MINROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE;
1418const DAYS_PER_WEEK: i32 = 7;
1419const MONTHS_PER_QUARTER: i32 = 3;
1420const MONTHS_PER_YEAR: i32 = 12;
1421const MONTHS_PER_DECADE: i32 = 120;
1422const MONTHS_PER_CENTURY: i32 = 1200;
1423const MONTHS_PER_MILLENNIUM: i32 = 12000;
1424
1425fn apply_specifier(
1426    result: &mut Interval,
1427    number: i64,
1428    fraction: i64,
1429    specifier_str: &str,
1430) -> Result<()> {
1431    if specifier_str.is_empty() {
1432        result.micros = result
1433            .micros
1434            .checked_add(number)
1435            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1436        result.micros = result
1437            .micros
1438            .checked_add(fraction)
1439            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1440        return Ok(());
1441    }
1442
1443    let specifier = try_get_date_part_specifier(specifier_str)?;
1444    match specifier {
1445        DatePartSpecifier::Millennium => {
1446            result.months = result
1447                .months
1448                .checked_add(
1449                    number
1450                        .checked_mul(MONTHS_PER_MILLENNIUM as i64)
1451                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1452                        .try_into()
1453                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1454                )
1455                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1456        }
1457        DatePartSpecifier::Century => {
1458            result.months = result
1459                .months
1460                .checked_add(
1461                    number
1462                        .checked_mul(MONTHS_PER_CENTURY as i64)
1463                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1464                        .try_into()
1465                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1466                )
1467                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1468        }
1469        DatePartSpecifier::Decade => {
1470            result.months = result
1471                .months
1472                .checked_add(
1473                    number
1474                        .checked_mul(MONTHS_PER_DECADE as i64)
1475                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1476                        .try_into()
1477                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1478                )
1479                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1480        }
1481        DatePartSpecifier::Year => {
1482            result.months = result
1483                .months
1484                .checked_add(
1485                    number
1486                        .checked_mul(MONTHS_PER_YEAR as i64)
1487                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1488                        .try_into()
1489                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1490                )
1491                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1492        }
1493        DatePartSpecifier::Quarter => {
1494            result.months = result
1495                .months
1496                .checked_add(
1497                    number
1498                        .checked_mul(MONTHS_PER_QUARTER as i64)
1499                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1500                        .try_into()
1501                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1502                )
1503                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1504        }
1505        DatePartSpecifier::Month => {
1506            result.months = result
1507                .months
1508                .checked_add(
1509                    number
1510                        .try_into()
1511                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1512                )
1513                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1514        }
1515        DatePartSpecifier::Day => {
1516            result.days = result
1517                .days
1518                .checked_add(
1519                    number
1520                        .try_into()
1521                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1522                )
1523                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1524        }
1525        DatePartSpecifier::Week => {
1526            result.days = result
1527                .days
1528                .checked_add(
1529                    number
1530                        .checked_mul(DAYS_PER_WEEK as i64)
1531                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1532                        .try_into()
1533                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1534                )
1535                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1536        }
1537        DatePartSpecifier::Microseconds => {
1538            result.micros = result
1539                .micros
1540                .checked_add(number)
1541                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1542        }
1543        DatePartSpecifier::Milliseconds => {
1544            result.micros = result
1545                .micros
1546                .checked_add(
1547                    number
1548                        .checked_mul(MICROS_PER_MSEC)
1549                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1550                )
1551                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1552        }
1553        DatePartSpecifier::Second => {
1554            result.micros = result
1555                .micros
1556                .checked_add(
1557                    number
1558                        .checked_mul(MICROS_PER_SEC)
1559                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1560                )
1561                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1562        }
1563        DatePartSpecifier::Minute => {
1564            result.micros = result
1565                .micros
1566                .checked_add(
1567                    number
1568                        .checked_mul(MICROS_PER_MINUTE)
1569                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1570                )
1571                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1572        }
1573        DatePartSpecifier::Hour => {
1574            result.micros = result
1575                .micros
1576                .checked_add(
1577                    number
1578                        .checked_mul(MINROS_PER_HOUR)
1579                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1580                )
1581                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1582        }
1583    }
1584    Ok(())
1585}
1586
1587pub fn parse_geometry(raw_data: &[u8]) -> Result<String> {
1588    let mut data = Cursor::new(raw_data);
1589    let wkt = Ewkt::from_wkb(&mut data, WkbDialect::Ewkb)?;
1590    Ok(wkt.0)
1591}
1592
1593struct ValueDecoder {}
1594
1595impl ValueDecoder {
1596    fn read_field<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1597        match ty {
1598            DataType::Null => self.read_null(reader),
1599            DataType::EmptyArray => self.read_empty_array(reader),
1600            DataType::EmptyMap => self.read_empty_map(reader),
1601            DataType::Boolean => self.read_bool(reader),
1602            DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
1603            DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
1604            DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
1605            DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
1606            DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
1607            DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
1608            DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
1609            DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
1610            DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
1611            DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
1612            DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
1613            DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
1614            DataType::String => self.read_string(reader),
1615            DataType::Binary => self.read_binary(reader),
1616            DataType::Timestamp => self.read_timestamp(reader),
1617            DataType::Date => self.read_date(reader),
1618            DataType::Bitmap => self.read_bitmap(reader),
1619            DataType::Variant => self.read_variant(reader),
1620            DataType::Geometry => self.read_geometry(reader),
1621            DataType::Interval => self.read_interval(reader),
1622            DataType::Geography => self.read_geography(reader),
1623            DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
1624            DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
1625            DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
1626            DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
1627        }
1628    }
1629
1630    fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
1631        let pos = reader.checkpoint();
1632        if reader.ignore_bytes(bs) {
1633            true
1634        } else {
1635            reader.rollback(pos);
1636            false
1637        }
1638    }
1639
1640    fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1641        if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
1642            Ok(Value::Null)
1643        } else {
1644            let buf = reader.fill_buf()?;
1645            Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
1646        }
1647    }
1648
1649    fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1650        if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
1651            Ok(Value::Boolean(true))
1652        } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
1653            Ok(Value::Boolean(false))
1654        } else {
1655            let buf = reader.fill_buf()?;
1656            Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
1657        }
1658    }
1659
1660    fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1661        let v: i8 = reader.read_int_text()?;
1662        Ok(Value::Number(NumberValue::Int8(v)))
1663    }
1664
1665    fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1666        let v: i16 = reader.read_int_text()?;
1667        Ok(Value::Number(NumberValue::Int16(v)))
1668    }
1669
1670    fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1671        let v: i32 = reader.read_int_text()?;
1672        Ok(Value::Number(NumberValue::Int32(v)))
1673    }
1674
1675    fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1676        let v: i64 = reader.read_int_text()?;
1677        Ok(Value::Number(NumberValue::Int64(v)))
1678    }
1679
1680    fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1681        let v: u8 = reader.read_int_text()?;
1682        Ok(Value::Number(NumberValue::UInt8(v)))
1683    }
1684
1685    fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1686        let v: u16 = reader.read_int_text()?;
1687        Ok(Value::Number(NumberValue::UInt16(v)))
1688    }
1689
1690    fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1691        let v: u32 = reader.read_int_text()?;
1692        Ok(Value::Number(NumberValue::UInt32(v)))
1693    }
1694
1695    fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1696        let v: u64 = reader.read_int_text()?;
1697        Ok(Value::Number(NumberValue::UInt64(v)))
1698    }
1699
1700    fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1701        let v: f32 = reader.read_float_text()?;
1702        Ok(Value::Number(NumberValue::Float32(v)))
1703    }
1704
1705    fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1706        let v: f64 = reader.read_float_text()?;
1707        Ok(Value::Number(NumberValue::Float64(v)))
1708    }
1709
1710    fn read_decimal<R: AsRef<[u8]>>(
1711        &self,
1712        size: &DecimalSize,
1713        reader: &mut Cursor<R>,
1714    ) -> Result<Value> {
1715        let buf = reader.fill_buf()?;
1716        // parser decimal need fractional part.
1717        // 10.00 and 10 is different value.
1718        let (n_in, _) = collect_number(buf);
1719        let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
1720        let d = parse_decimal(v, *size)?;
1721        reader.consume(n_in);
1722        Ok(Value::Number(d))
1723    }
1724
1725    fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1726        let mut buf = Vec::new();
1727        reader.read_quoted_text(&mut buf, b'\'')?;
1728        Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
1729    }
1730
1731    fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1732        let buf = reader.fill_buf()?;
1733        let n = collect_binary_number(buf);
1734        let v = buf[..n].to_vec();
1735        reader.consume(n);
1736        Ok(Value::Binary(hex::decode(v)?))
1737    }
1738
1739    fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1740        let mut buf = Vec::new();
1741        reader.read_quoted_text(&mut buf, b'\'')?;
1742        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1743        let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
1744        Ok(Value::Date(days))
1745    }
1746
1747    fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1748        let mut buf = Vec::new();
1749        reader.read_quoted_text(&mut buf, b'\'')?;
1750        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1751        let ts = NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")?
1752            .and_utc()
1753            .timestamp_micros();
1754        Ok(Value::Timestamp(ts))
1755    }
1756
1757    fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1758        let mut buf = Vec::new();
1759        reader.read_quoted_text(&mut buf, b'\'')?;
1760        Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
1761    }
1762
1763    fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1764        let mut buf = Vec::new();
1765        reader.read_quoted_text(&mut buf, b'\'')?;
1766        Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
1767    }
1768
1769    fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1770        let mut buf = Vec::new();
1771        reader.read_quoted_text(&mut buf, b'\'')?;
1772        Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
1773    }
1774
1775    fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1776        let mut buf = Vec::new();
1777        reader.read_quoted_text(&mut buf, b'\'')?;
1778        Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
1779    }
1780
1781    fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1782        let mut buf = Vec::new();
1783        reader.read_quoted_text(&mut buf, b'\'')?;
1784        Ok(Value::Geography(unsafe {
1785            String::from_utf8_unchecked(buf)
1786        }))
1787    }
1788
1789    fn read_nullable<R: AsRef<[u8]>>(
1790        &self,
1791        ty: &DataType,
1792        reader: &mut Cursor<R>,
1793    ) -> Result<Value> {
1794        match self.read_null(reader) {
1795            Ok(val) => Ok(val),
1796            Err(_) => self.read_field(ty, reader),
1797        }
1798    }
1799
1800    fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1801        reader.must_ignore_byte(b'[')?;
1802        reader.must_ignore_byte(b']')?;
1803        Ok(Value::EmptyArray)
1804    }
1805
1806    fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1807        reader.must_ignore_byte(b'{')?;
1808        reader.must_ignore_byte(b'}')?;
1809        Ok(Value::EmptyArray)
1810    }
1811
1812    fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1813        let mut vals = Vec::new();
1814        reader.must_ignore_byte(b'[')?;
1815        for idx in 0.. {
1816            let _ = reader.ignore_white_spaces();
1817            if reader.ignore_byte(b']') {
1818                break;
1819            }
1820            if idx != 0 {
1821                reader.must_ignore_byte(b',')?;
1822            }
1823            let _ = reader.ignore_white_spaces();
1824            let val = self.read_field(ty, reader)?;
1825            vals.push(val);
1826        }
1827        Ok(Value::Array(vals))
1828    }
1829
1830    fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1831        const KEY: usize = 0;
1832        const VALUE: usize = 1;
1833        let mut kvs = Vec::new();
1834        reader.must_ignore_byte(b'{')?;
1835        match ty {
1836            DataType::Tuple(inner_tys) => {
1837                for idx in 0.. {
1838                    let _ = reader.ignore_white_spaces();
1839                    if reader.ignore_byte(b'}') {
1840                        break;
1841                    }
1842                    if idx != 0 {
1843                        reader.must_ignore_byte(b',')?;
1844                    }
1845                    let _ = reader.ignore_white_spaces();
1846                    let key = self.read_field(&inner_tys[KEY], reader)?;
1847                    let _ = reader.ignore_white_spaces();
1848                    reader.must_ignore_byte(b':')?;
1849                    let _ = reader.ignore_white_spaces();
1850                    let val = self.read_field(&inner_tys[VALUE], reader)?;
1851                    kvs.push((key, val));
1852                }
1853                Ok(Value::Map(kvs))
1854            }
1855            _ => unreachable!(),
1856        }
1857    }
1858
1859    fn read_tuple<R: AsRef<[u8]>>(
1860        &self,
1861        tys: &[DataType],
1862        reader: &mut Cursor<R>,
1863    ) -> Result<Value> {
1864        let mut vals = Vec::new();
1865        reader.must_ignore_byte(b'(')?;
1866        for (idx, ty) in tys.iter().enumerate() {
1867            let _ = reader.ignore_white_spaces();
1868            if idx != 0 {
1869                reader.must_ignore_byte(b',')?;
1870            }
1871            let _ = reader.ignore_white_spaces();
1872            let val = self.read_field(ty, reader)?;
1873            vals.push(val);
1874        }
1875        reader.must_ignore_byte(b')')?;
1876        Ok(Value::Tuple(vals))
1877    }
1878}
1879
1880/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
1881#[derive(Debug, Copy, Clone, Default, PartialEq, PartialOrd, Ord, Eq, Hash)]
1882#[allow(non_camel_case_types)]
1883#[repr(C)]
1884pub struct months_days_micros(pub i128);
1885
1886/// Mask for extracting the lower 64 bits (microseconds).
1887pub const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
1888/// Mask for extracting the middle 32 bits (days or months).
1889pub const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
1890
1891impl months_days_micros {
1892    /// A new [`months_days_micros`].
1893    pub fn new(months: i32, days: i32, microseconds: i64) -> Self {
1894        let months_bits = (months as i128) << 96;
1895        // converting to u32 before i128 ensures we’re working with the raw, unsigned bit pattern of the i32 value,
1896        // preventing unwanted sign extension when that value is later used within the i128.
1897        let days_bits = ((days as u32) as i128) << 64;
1898        let micros_bits = (microseconds as u64) as i128;
1899
1900        Self(months_bits | days_bits | micros_bits)
1901    }
1902
1903    #[inline]
1904    pub fn months(&self) -> i32 {
1905        // Decoding logic
1906        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
1907    }
1908
1909    #[inline]
1910    pub fn days(&self) -> i32 {
1911        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
1912    }
1913
1914    #[inline]
1915    pub fn microseconds(&self) -> i64 {
1916        (self.0 & MICROS_MASK) as i64
1917    }
1918}