databend_driver_core/
value.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::hash::Hash;
17use std::io::BufRead;
18use std::io::Cursor;
19
20use arrow::datatypes::{i256, ArrowNativeTypeOp};
21use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime};
22
23use crate::cursor_ext::{
24    collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
25    ReadNumberExt,
26};
27
28use crate::{
29    error::{ConvertError, Error, Result},
30    schema::{DecimalDataType, DecimalSize},
31};
32
33use geozero::wkb::FromWkb;
34use geozero::wkb::WkbDialect;
35use geozero::wkt::Ewkt;
36use std::fmt::{Display, Formatter, Write};
37
38// Thu 1970-01-01 is R.D. 719163
39const DAYS_FROM_CE: i32 = 719_163;
40const NULL_VALUE: &str = "NULL";
41const TRUE_VALUE: &str = "1";
42const FALSE_VALUE: &str = "0";
43const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f";
44
45#[cfg(feature = "flight-sql")]
46use {
47    crate::schema::{
48        ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
49        ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
50        ARROW_EXT_TYPE_VARIANT, EXTENSION_KEY,
51    },
52    arrow_array::{
53        Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
54        Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
55        LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
56        StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array,
57        UInt64Array, UInt8Array,
58    },
59    arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit},
60    std::sync::Arc,
61};
62
63use crate::schema::{DataType, NumberDataType};
64
65#[derive(Clone, Debug, PartialEq)]
66pub enum NumberValue {
67    Int8(i8),
68    Int16(i16),
69    Int32(i32),
70    Int64(i64),
71    UInt8(u8),
72    UInt16(u16),
73    UInt32(u32),
74    UInt64(u64),
75    Float32(f32),
76    Float64(f64),
77    Decimal128(i128, DecimalSize),
78    Decimal256(i256, DecimalSize),
79}
80
81#[derive(Clone, Debug, PartialEq)]
82pub enum Value {
83    Null,
84    EmptyArray,
85    EmptyMap,
86    Boolean(bool),
87    Binary(Vec<u8>),
88    String(String),
89    Number(NumberValue),
90    /// Microseconds from 1970-01-01 00:00:00 UTC
91    Timestamp(i64),
92    Date(i32),
93    Array(Vec<Value>),
94    Map(Vec<(Value, Value)>),
95    Tuple(Vec<Value>),
96    Bitmap(String),
97    Variant(String),
98    Geometry(String),
99    Geography(String),
100    Interval(String),
101}
102
103impl Value {
104    pub fn get_type(&self) -> DataType {
105        match self {
106            Self::Null => DataType::Null,
107            Self::EmptyArray => DataType::EmptyArray,
108            Self::EmptyMap => DataType::EmptyMap,
109            Self::Boolean(_) => DataType::Boolean,
110            Self::Binary(_) => DataType::Binary,
111            Self::String(_) => DataType::String,
112            Self::Number(n) => match n {
113                NumberValue::Int8(_) => DataType::Number(NumberDataType::Int8),
114                NumberValue::Int16(_) => DataType::Number(NumberDataType::Int16),
115                NumberValue::Int32(_) => DataType::Number(NumberDataType::Int32),
116                NumberValue::Int64(_) => DataType::Number(NumberDataType::Int64),
117                NumberValue::UInt8(_) => DataType::Number(NumberDataType::UInt8),
118                NumberValue::UInt16(_) => DataType::Number(NumberDataType::UInt16),
119                NumberValue::UInt32(_) => DataType::Number(NumberDataType::UInt32),
120                NumberValue::UInt64(_) => DataType::Number(NumberDataType::UInt64),
121                NumberValue::Float32(_) => DataType::Number(NumberDataType::Float32),
122                NumberValue::Float64(_) => DataType::Number(NumberDataType::Float64),
123                NumberValue::Decimal128(_, s) => DataType::Decimal(DecimalDataType::Decimal128(*s)),
124                NumberValue::Decimal256(_, s) => DataType::Decimal(DecimalDataType::Decimal256(*s)),
125            },
126            Self::Timestamp(_) => DataType::Timestamp,
127
128            Self::Date(_) => DataType::Date,
129            Self::Interval(_) => DataType::Interval,
130            Self::Array(vals) => {
131                if vals.is_empty() {
132                    DataType::EmptyArray
133                } else {
134                    DataType::Array(Box::new(vals[0].get_type()))
135                }
136            }
137            Self::Map(kvs) => {
138                if kvs.is_empty() {
139                    DataType::EmptyMap
140                } else {
141                    let inner_ty = DataType::Tuple(vec![kvs[0].0.get_type(), kvs[0].1.get_type()]);
142                    DataType::Map(Box::new(inner_ty))
143                }
144            }
145            Self::Tuple(vals) => {
146                let inner_tys = vals.iter().map(|v| v.get_type()).collect::<Vec<_>>();
147                DataType::Tuple(inner_tys)
148            }
149            Self::Bitmap(_) => DataType::Bitmap,
150            Self::Variant(_) => DataType::Variant,
151            Self::Geometry(_) => DataType::Geometry,
152            Self::Geography(_) => DataType::Geography,
153        }
154    }
155}
156
157impl TryFrom<(&DataType, Option<&str>)> for Value {
158    type Error = Error;
159
160    fn try_from((t, v): (&DataType, Option<&str>)) -> Result<Self> {
161        match v {
162            Some(v) => Self::try_from((t, v)),
163            None => match t {
164                DataType::Null => Ok(Self::Null),
165                DataType::Nullable(_) => Ok(Self::Null),
166                _ => Err(Error::InvalidResponse(
167                    "NULL value for non-nullable field".to_string(),
168                )),
169            },
170        }
171    }
172}
173
174impl TryFrom<(&DataType, &str)> for Value {
175    type Error = Error;
176
177    fn try_from((t, v): (&DataType, &str)) -> Result<Self> {
178        match t {
179            DataType::Null => Ok(Self::Null),
180            DataType::EmptyArray => Ok(Self::EmptyArray),
181            DataType::EmptyMap => Ok(Self::EmptyMap),
182            DataType::Boolean => Ok(Self::Boolean(v == "1")),
183            DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
184            DataType::String => Ok(Self::String(v.to_string())),
185            DataType::Number(NumberDataType::Int8) => {
186                Ok(Self::Number(NumberValue::Int8(v.parse()?)))
187            }
188            DataType::Number(NumberDataType::Int16) => {
189                Ok(Self::Number(NumberValue::Int16(v.parse()?)))
190            }
191            DataType::Number(NumberDataType::Int32) => {
192                Ok(Self::Number(NumberValue::Int32(v.parse()?)))
193            }
194            DataType::Number(NumberDataType::Int64) => {
195                Ok(Self::Number(NumberValue::Int64(v.parse()?)))
196            }
197            DataType::Number(NumberDataType::UInt8) => {
198                Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
199            }
200            DataType::Number(NumberDataType::UInt16) => {
201                Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
202            }
203            DataType::Number(NumberDataType::UInt32) => {
204                Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
205            }
206            DataType::Number(NumberDataType::UInt64) => {
207                Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
208            }
209            DataType::Number(NumberDataType::Float32) => {
210                Ok(Self::Number(NumberValue::Float32(v.parse()?)))
211            }
212            DataType::Number(NumberDataType::Float64) => {
213                Ok(Self::Number(NumberValue::Float64(v.parse()?)))
214            }
215            DataType::Decimal(DecimalDataType::Decimal128(size)) => {
216                let d = parse_decimal(v, *size)?;
217                Ok(Self::Number(d))
218            }
219            DataType::Decimal(DecimalDataType::Decimal256(size)) => {
220                let d = parse_decimal(v, *size)?;
221                Ok(Self::Number(d))
222            }
223            DataType::Timestamp => Ok(Self::Timestamp(
224                NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")?
225                    .and_utc()
226                    .timestamp_micros(),
227            )),
228            DataType::Date => Ok(Self::Date(
229                NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE,
230            )),
231            DataType::Bitmap => Ok(Self::Bitmap(v.to_string())),
232            DataType::Variant => Ok(Self::Variant(v.to_string())),
233            DataType::Geometry => Ok(Self::Geometry(v.to_string())),
234            DataType::Geography => Ok(Self::Geography(v.to_string())),
235            DataType::Interval => Ok(Self::Interval(v.to_string())),
236            DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) => {
237                let mut reader = Cursor::new(v);
238                let decoder = ValueDecoder {};
239                decoder.read_field(t, &mut reader)
240            }
241            DataType::Nullable(inner) => match inner.as_ref() {
242                DataType::String => Ok(Self::String(v.to_string())),
243                _ => {
244                    // not string type, try to check if it is NULL
245                    // for compatible with old version server
246                    if v == NULL_VALUE {
247                        Ok(Self::Null)
248                    } else {
249                        Self::try_from((inner.as_ref(), v))
250                    }
251                }
252            },
253        }
254    }
255}
256
257#[cfg(feature = "flight-sql")]
258impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize)> for Value {
259    type Error = Error;
260    fn try_from(
261        (field, array, seq): (&ArrowField, &Arc<dyn ArrowArray>, usize),
262    ) -> std::result::Result<Self, Self::Error> {
263        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
264            return match extend_type.as_str() {
265                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
266                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
267                ARROW_EXT_TYPE_VARIANT => {
268                    if field.is_nullable() && array.is_null(seq) {
269                        return Ok(Value::Null);
270                    }
271                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
272                        Some(array) => Ok(Value::Variant(jsonb::to_string(array.value(seq)))),
273                        None => Err(ConvertError::new("variant", format!("{:?}", array)).into()),
274                    }
275                }
276                ARROW_EXT_TYPE_INTERVAL => {
277                    if field.is_nullable() && array.is_null(seq) {
278                        return Ok(Value::Null);
279                    }
280                    match array.as_any().downcast_ref::<Decimal128Array>() {
281                        Some(array) => {
282                            let res = months_days_micros(array.value(seq));
283                            Ok(Value::Interval(
284                                Interval {
285                                    months: res.months(),
286                                    days: res.days(),
287                                    micros: res.microseconds(),
288                                }
289                                .to_string(),
290                            ))
291                        }
292                        None => Err(ConvertError::new("Interval", format!("{:?}", array)).into()),
293                    }
294                }
295                ARROW_EXT_TYPE_BITMAP => {
296                    if field.is_nullable() && array.is_null(seq) {
297                        return Ok(Value::Null);
298                    }
299                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
300                        Some(array) => {
301                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
302                                .expect("failed to deserialize bitmap");
303                            let raw = rb.into_iter().collect::<Vec<_>>();
304                            let s = itertools::join(raw.iter(), ",");
305                            Ok(Value::Bitmap(s))
306                        }
307                        None => Err(ConvertError::new("bitmap", format!("{:?}", array)).into()),
308                    }
309                }
310                ARROW_EXT_TYPE_GEOMETRY => {
311                    if field.is_nullable() && array.is_null(seq) {
312                        return Ok(Value::Null);
313                    }
314                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
315                        Some(array) => {
316                            let wkt = parse_geometry(array.value(seq))?;
317                            Ok(Value::Geometry(wkt))
318                        }
319                        None => Err(ConvertError::new("geometry", format!("{:?}", array)).into()),
320                    }
321                }
322                ARROW_EXT_TYPE_GEOGRAPHY => {
323                    if field.is_nullable() && array.is_null(seq) {
324                        return Ok(Value::Null);
325                    }
326                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
327                        Some(array) => {
328                            let wkt = parse_geometry(array.value(seq))?;
329                            Ok(Value::Geography(wkt))
330                        }
331                        None => Err(ConvertError::new("geography", format!("{:?}", array)).into()),
332                    }
333                }
334                _ => Err(ConvertError::new(
335                    "extension",
336                    format!(
337                        "Unsupported extension datatype for arrow field: {:?}",
338                        field
339                    ),
340                )
341                .into()),
342            };
343        }
344
345        if field.is_nullable() && array.is_null(seq) {
346            return Ok(Value::Null);
347        }
348        match field.data_type() {
349            ArrowDataType::Null => Ok(Value::Null),
350            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
351                Some(array) => Ok(Value::Boolean(array.value(seq))),
352                None => Err(ConvertError::new("bool", format!("{:?}", array)).into()),
353            },
354            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
355                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
356                None => Err(ConvertError::new("int8", format!("{:?}", array)).into()),
357            },
358            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
359                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
360                None => Err(ConvertError::new("int16", format!("{:?}", array)).into()),
361            },
362            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
363                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
364                None => Err(ConvertError::new("int64", format!("{:?}", array)).into()),
365            },
366            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
367                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
368                None => Err(ConvertError::new("int64", format!("{:?}", array)).into()),
369            },
370            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
371                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
372                None => Err(ConvertError::new("uint8", format!("{:?}", array)).into()),
373            },
374            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
375                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
376                None => Err(ConvertError::new("uint16", format!("{:?}", array)).into()),
377            },
378            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
379                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
380                None => Err(ConvertError::new("uint32", format!("{:?}", array)).into()),
381            },
382            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
383                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
384                None => Err(ConvertError::new("uint64", format!("{:?}", array)).into()),
385            },
386            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
387                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
388                None => Err(ConvertError::new("float32", format!("{:?}", array)).into()),
389            },
390            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
391                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
392                None => Err(ConvertError::new("float64", format!("{:?}", array)).into()),
393            },
394
395            ArrowDataType::Decimal128(p, s) => {
396                match array.as_any().downcast_ref::<Decimal128Array>() {
397                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
398                        array.value(seq),
399                        DecimalSize {
400                            precision: *p,
401                            scale: *s as u8,
402                        },
403                    ))),
404                    None => Err(ConvertError::new("Decimal128", format!("{:?}", array)).into()),
405                }
406            }
407            ArrowDataType::Decimal256(p, s) => {
408                match array.as_any().downcast_ref::<Decimal256Array>() {
409                    Some(array) => Ok(Value::Number(NumberValue::Decimal256(
410                        array.value(seq),
411                        DecimalSize {
412                            precision: *p,
413                            scale: *s as u8,
414                        },
415                    ))),
416                    None => Err(ConvertError::new("Decimal256", format!("{:?}", array)).into()),
417                }
418            }
419
420            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
421                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
422                None => Err(ConvertError::new("binary", format!("{:?}", array)).into()),
423            },
424            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
425                match array.as_any().downcast_ref::<LargeBinaryArray>() {
426                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
427                    None => Err(ConvertError::new("large binary", format!("{:?}", array)).into()),
428                }
429            }
430            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
431                Some(array) => Ok(Value::String(array.value(seq).to_string())),
432                None => Err(ConvertError::new("string", format!("{:?}", array)).into()),
433            },
434            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
435                Some(array) => Ok(Value::String(array.value(seq).to_string())),
436                None => Err(ConvertError::new("large string", format!("{:?}", array)).into()),
437            },
438            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
439                Some(array) => Ok(Value::String(array.value(seq).to_string())),
440                None => Err(ConvertError::new("string view", format!("{:?}", array)).into()),
441            },
442            // we only support timestamp in microsecond in databend
443            ArrowDataType::Timestamp(unit, tz) => {
444                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
445                    Some(array) => {
446                        if unit != &TimeUnit::Microsecond {
447                            return Err(ConvertError::new("timestamp", format!("{:?}", array))
448                                .with_message(format!(
449                                    "unsupported timestamp unit: {:?}, only support microsecond",
450                                    unit
451                                ))
452                                .into());
453                        }
454                        let ts = array.value(seq);
455                        match tz {
456                            None => Ok(Value::Timestamp(ts)),
457                            Some(tz) => Err(ConvertError::new("timestamp", format!("{:?}", array))
458                                .with_message(format!("non-UTC timezone not supported: {:?}", tz))
459                                .into()),
460                        }
461                    }
462                    None => Err(ConvertError::new("timestamp", format!("{:?}", array)).into()),
463                }
464            }
465            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
466                Some(array) => Ok(Value::Date(array.value(seq))),
467                None => Err(ConvertError::new("date", format!("{:?}", array)).into()),
468            },
469            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
470                Some(array) => {
471                    let inner_array = unsafe { array.value_unchecked(seq) };
472                    let mut values = Vec::with_capacity(inner_array.len());
473                    for i in 0..inner_array.len() {
474                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
475                        values.push(value);
476                    }
477                    Ok(Value::Array(values))
478                }
479                None => Err(ConvertError::new("list", format!("{:?}", array)).into()),
480            },
481            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
482                Some(array) => {
483                    let inner_array = unsafe { array.value_unchecked(seq) };
484                    let mut values = Vec::with_capacity(inner_array.len());
485                    for i in 0..inner_array.len() {
486                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
487                        values.push(value);
488                    }
489                    Ok(Value::Array(values))
490                }
491                None => Err(ConvertError::new("large list", format!("{:?}", array)).into()),
492            },
493            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
494                Some(array) => {
495                    if let ArrowDataType::Struct(fs) = f.data_type() {
496                        let inner_array = unsafe { array.value_unchecked(seq) };
497                        let mut values = Vec::with_capacity(inner_array.len());
498                        for i in 0..inner_array.len() {
499                            let key = Value::try_from((fs[0].as_ref(), inner_array.column(0), i))?;
500                            let val = Value::try_from((fs[1].as_ref(), inner_array.column(1), i))?;
501                            values.push((key, val));
502                        }
503                        Ok(Value::Map(values))
504                    } else {
505                        Err(
506                            ConvertError::new("invalid map inner type", format!("{:?}", array))
507                                .into(),
508                        )
509                    }
510                }
511                None => Err(ConvertError::new("map", format!("{:?}", array)).into()),
512            },
513            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
514                Some(array) => {
515                    let mut values = Vec::with_capacity(array.len());
516                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
517                        let value = Value::try_from((f.as_ref(), inner_array, seq))?;
518                        values.push(value);
519                    }
520                    Ok(Value::Tuple(values))
521                }
522                None => Err(ConvertError::new("struct", format!("{:?}", array)).into()),
523            },
524            _ => Err(ConvertError::new("unsupported data type", format!("{:?}", array)).into()),
525        }
526    }
527}
528
529impl TryFrom<Value> for String {
530    type Error = Error;
531    fn try_from(val: Value) -> Result<Self> {
532        match val {
533            Value::String(s) => Ok(s),
534            Value::Bitmap(s) => Ok(s),
535            Value::Number(NumberValue::Decimal128(v, s)) => Ok(display_decimal_128(v, s.scale)),
536            Value::Number(NumberValue::Decimal256(v, s)) => Ok(display_decimal_256(v, s.scale)),
537            Value::Geometry(s) => Ok(s),
538            Value::Geography(s) => Ok(s),
539            Value::Interval(s) => Ok(s),
540            Value::Variant(s) => Ok(s),
541            _ => Err(ConvertError::new("string", format!("{:?}", val)).into()),
542        }
543    }
544}
545
546impl TryFrom<Value> for bool {
547    type Error = Error;
548    fn try_from(val: Value) -> Result<Self> {
549        match val {
550            Value::Boolean(b) => Ok(b),
551            Value::Number(n) => Ok(n != NumberValue::Int8(0)),
552            _ => Err(ConvertError::new("bool", format!("{:?}", val)).into()),
553        }
554    }
555}
556
557// This macro implements TryFrom for NumberValue
558macro_rules! impl_try_from_number_value {
559    ($($t:ty),*) => {
560        $(
561            impl TryFrom<Value> for $t {
562                type Error = Error;
563                fn try_from(val: Value) -> Result<Self> {
564                    match val {
565                        Value::Number(NumberValue::Int8(i)) => Ok(i as $t),
566                        Value::Number(NumberValue::Int16(i)) => Ok(i as $t),
567                        Value::Number(NumberValue::Int32(i)) => Ok(i as $t),
568                        Value::Number(NumberValue::Int64(i)) => Ok(i as $t),
569                        Value::Number(NumberValue::UInt8(i)) => Ok(i as $t),
570                        Value::Number(NumberValue::UInt16(i)) => Ok(i as $t),
571                        Value::Number(NumberValue::UInt32(i)) => Ok(i as $t),
572                        Value::Number(NumberValue::UInt64(i)) => Ok(i as $t),
573                        Value::Number(NumberValue::Float32(i)) => Ok(i as $t),
574                        Value::Number(NumberValue::Float64(i)) => Ok(i as $t),
575                        Value::Date(i) => Ok(i as $t),
576                        Value::Timestamp(i) => Ok(i as $t),
577                        _ => Err(ConvertError::new("number", format!("{:?}", val)).into()),
578                    }
579                }
580            }
581        )*
582    };
583}
584
585impl_try_from_number_value!(u8);
586impl_try_from_number_value!(u16);
587impl_try_from_number_value!(u32);
588impl_try_from_number_value!(u64);
589impl_try_from_number_value!(i8);
590impl_try_from_number_value!(i16);
591impl_try_from_number_value!(i32);
592impl_try_from_number_value!(i64);
593impl_try_from_number_value!(f32);
594impl_try_from_number_value!(f64);
595
596impl TryFrom<Value> for NaiveDateTime {
597    type Error = Error;
598    fn try_from(val: Value) -> Result<Self> {
599        match val {
600            Value::Timestamp(i) => {
601                let secs = i / 1_000_000;
602                let nanos = ((i % 1_000_000) * 1000) as u32;
603                match DateTime::from_timestamp(secs, nanos) {
604                    Some(t) => Ok(t.naive_utc()),
605                    None => Err(ConvertError::new("NaiveDateTime", "".to_string()).into()),
606                }
607            }
608            _ => Err(ConvertError::new("NaiveDateTime", format!("{}", val)).into()),
609        }
610    }
611}
612
613impl TryFrom<Value> for NaiveDate {
614    type Error = Error;
615    fn try_from(val: Value) -> Result<Self> {
616        match val {
617            Value::Date(i) => {
618                let days = i + DAYS_FROM_CE;
619                match NaiveDate::from_num_days_from_ce_opt(days) {
620                    Some(d) => Ok(d),
621                    None => Err(ConvertError::new("NaiveDate", "".to_string()).into()),
622                }
623            }
624            _ => Err(ConvertError::new("NaiveDate", format!("{}", val)).into()),
625        }
626    }
627}
628
629impl<V> TryFrom<Value> for Vec<V>
630where
631    V: TryFrom<Value, Error = Error>,
632{
633    type Error = Error;
634    fn try_from(val: Value) -> Result<Self> {
635        match val {
636            Value::Binary(vals) => vals
637                .into_iter()
638                .map(|v| V::try_from(Value::Number(NumberValue::UInt8(v))))
639                .collect(),
640            Value::Array(vals) => vals.into_iter().map(V::try_from).collect(),
641            Value::EmptyArray => Ok(vec![]),
642            _ => Err(ConvertError::new("Vec", format!("{}", val)).into()),
643        }
644    }
645}
646
647impl<K, V> TryFrom<Value> for HashMap<K, V>
648where
649    K: TryFrom<Value, Error = Error> + Eq + Hash,
650    V: TryFrom<Value, Error = Error>,
651{
652    type Error = Error;
653    fn try_from(val: Value) -> Result<Self> {
654        match val {
655            Value::Map(kvs) => {
656                let mut map = HashMap::new();
657                for (k, v) in kvs {
658                    let k = K::try_from(k)?;
659                    let v = V::try_from(v)?;
660                    map.insert(k, v);
661                }
662                Ok(map)
663            }
664            Value::EmptyMap => Ok(HashMap::new()),
665            _ => Err(ConvertError::new("HashMap", format!("{}", val)).into()),
666        }
667    }
668}
669
670macro_rules! replace_expr {
671    ($_t:tt $sub:expr) => {
672        $sub
673    };
674}
675
676// This macro implements TryFrom for tuple of types
677macro_rules! impl_tuple_from_value {
678    ( $($Ti:tt),+ ) => {
679        impl<$($Ti),+> TryFrom<Value> for ($($Ti,)+)
680        where
681            $($Ti: TryFrom<Value>),+
682        {
683            type Error = String;
684            fn try_from(val: Value) -> Result<Self, String> {
685                // It is not possible yet to get the number of metavariable repetitions
686                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
687                // This is a workaround
688                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
689
690                match val {
691                    Value::Tuple(vals) => {
692                        if expected_len != vals.len() {
693                            return Err(format!("value tuple size mismatch: expected {} columns, got {}", expected_len, vals.len()));
694                        }
695                        let mut vals_iter = vals.into_iter().enumerate();
696
697                        Ok((
698                            $(
699                                {
700                                    let (col_ix, col_value) = vals_iter
701                                        .next()
702                                        .unwrap(); // vals_iter size is checked before this code is reached,
703                                                   // so it is safe to unwrap
704                                    let t = col_value.get_type();
705                                    $Ti::try_from(col_value)
706                                        .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
707                                }
708                            ,)+
709                        ))
710                    }
711                    _ => Err(format!("expected tuple, got {:?}", val)),
712                }
713            }
714        }
715    }
716}
717
718// Implement From Value for tuples of size up to 16
719impl_tuple_from_value!(T1);
720impl_tuple_from_value!(T1, T2);
721impl_tuple_from_value!(T1, T2, T3);
722impl_tuple_from_value!(T1, T2, T3, T4);
723impl_tuple_from_value!(T1, T2, T3, T4, T5);
724impl_tuple_from_value!(T1, T2, T3, T4, T5, T6);
725impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7);
726impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8);
727impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
728impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
729impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
730impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
731impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
732impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
733impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
734impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
735impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17);
736impl_tuple_from_value!(
737    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18
738);
739impl_tuple_from_value!(
740    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19
741);
742impl_tuple_from_value!(
743    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20
744);
745impl_tuple_from_value!(
746    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21
747);
748impl_tuple_from_value!(
749    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21,
750    T22
751);
752
753// This macro implements TryFrom to Option for Nullable column
754macro_rules! impl_try_from_to_option {
755    ($($t:ty),*) => {
756        $(
757            impl TryFrom<Value> for Option<$t> {
758                type Error = Error;
759                fn try_from(val: Value) -> Result<Self> {
760                    match val {
761                        Value::Null => Ok(None),
762                        _ => {
763                            let inner: $t = val.try_into()?;
764                            Ok(Some(inner))
765                        },
766                    }
767
768                }
769            }
770        )*
771    };
772}
773
774impl_try_from_to_option!(String);
775impl_try_from_to_option!(bool);
776impl_try_from_to_option!(u8);
777impl_try_from_to_option!(u16);
778impl_try_from_to_option!(u32);
779impl_try_from_to_option!(u64);
780impl_try_from_to_option!(i8);
781impl_try_from_to_option!(i16);
782impl_try_from_to_option!(i32);
783impl_try_from_to_option!(i64);
784impl_try_from_to_option!(f32);
785impl_try_from_to_option!(f64);
786impl_try_from_to_option!(NaiveDateTime);
787impl_try_from_to_option!(NaiveDate);
788
789impl std::fmt::Display for NumberValue {
790    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
791        match self {
792            NumberValue::Int8(i) => write!(f, "{}", i),
793            NumberValue::Int16(i) => write!(f, "{}", i),
794            NumberValue::Int32(i) => write!(f, "{}", i),
795            NumberValue::Int64(i) => write!(f, "{}", i),
796            NumberValue::UInt8(i) => write!(f, "{}", i),
797            NumberValue::UInt16(i) => write!(f, "{}", i),
798            NumberValue::UInt32(i) => write!(f, "{}", i),
799            NumberValue::UInt64(i) => write!(f, "{}", i),
800            NumberValue::Float32(i) => write!(f, "{}", i),
801            NumberValue::Float64(i) => write!(f, "{}", i),
802            NumberValue::Decimal128(v, s) => write!(f, "{}", display_decimal_128(*v, s.scale)),
803            NumberValue::Decimal256(v, s) => write!(f, "{}", display_decimal_256(*v, s.scale)),
804        }
805    }
806}
807
808impl std::fmt::Display for Value {
809    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810        encode_value(f, self, true)
811    }
812}
813
814// Compatible with Databend, inner values of nested types are quoted.
815fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result {
816    match val {
817        Value::Null => write!(f, "NULL"),
818        Value::EmptyArray => write!(f, "[]"),
819        Value::EmptyMap => write!(f, "{{}}"),
820        Value::Boolean(b) => {
821            if *b {
822                write!(f, "true")
823            } else {
824                write!(f, "false")
825            }
826        }
827        Value::Number(n) => write!(f, "{}", n),
828        Value::Binary(s) => write!(f, "{}", hex::encode_upper(s)),
829        Value::String(s)
830        | Value::Bitmap(s)
831        | Value::Variant(s)
832        | Value::Interval(s)
833        | Value::Geometry(s)
834        | Value::Geography(s) => {
835            if raw {
836                write!(f, "{}", s)
837            } else {
838                write!(f, "'{}'", s)
839            }
840        }
841        Value::Timestamp(micros) => {
842            let (mut secs, mut nanos) = (*micros / 1_000_000, (*micros % 1_000_000) * 1_000);
843            if nanos < 0 {
844                secs -= 1;
845                nanos += 1_000_000_000;
846            }
847            let t = DateTime::from_timestamp(secs, nanos as _).unwrap_or_default();
848            let t = t.naive_utc();
849            if raw {
850                write!(f, "{}", t.format(TIMESTAMP_FORMAT))
851            } else {
852                write!(f, "'{}'", t.format(TIMESTAMP_FORMAT))
853            }
854        }
855        Value::Date(i) => {
856            let days = i + DAYS_FROM_CE;
857            let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
858            if raw {
859                write!(f, "{}", d)
860            } else {
861                write!(f, "'{}'", d)
862            }
863        }
864        Value::Array(vals) => {
865            write!(f, "[")?;
866            for (i, val) in vals.iter().enumerate() {
867                if i > 0 {
868                    write!(f, ",")?;
869                }
870                encode_value(f, val, false)?;
871            }
872            write!(f, "]")?;
873            Ok(())
874        }
875        Value::Map(kvs) => {
876            write!(f, "{{")?;
877            for (i, (key, val)) in kvs.iter().enumerate() {
878                if i > 0 {
879                    write!(f, ",")?;
880                }
881                encode_value(f, key, false)?;
882                write!(f, ":")?;
883                encode_value(f, val, false)?;
884            }
885            write!(f, "}}")?;
886            Ok(())
887        }
888        Value::Tuple(vals) => {
889            write!(f, "(")?;
890            for (i, val) in vals.iter().enumerate() {
891                if i > 0 {
892                    write!(f, ",")?;
893                }
894                encode_value(f, val, false)?;
895            }
896            write!(f, ")")?;
897            Ok(())
898        }
899    }
900}
901
902pub fn display_decimal_128(num: i128, scale: u8) -> String {
903    let mut buf = String::new();
904    if scale == 0 {
905        write!(buf, "{}", num).unwrap();
906    } else {
907        let pow_scale = 10_i128.pow(scale as u32);
908        if num >= 0 {
909            write!(
910                buf,
911                "{}.{:0>width$}",
912                num / pow_scale,
913                (num % pow_scale).abs(),
914                width = scale as usize
915            )
916            .unwrap();
917        } else {
918            write!(
919                buf,
920                "-{}.{:0>width$}",
921                -num / pow_scale,
922                (num % pow_scale).abs(),
923                width = scale as usize
924            )
925            .unwrap();
926        }
927    }
928    buf
929}
930
931pub fn display_decimal_256(num: i256, scale: u8) -> String {
932    let mut buf = String::new();
933    if scale == 0 {
934        write!(buf, "{}", num).unwrap();
935    } else {
936        let pow_scale = i256::from_i128(10i128).pow_wrapping(scale as u32);
937        let width = scale as usize;
938        // -1/10 = 0
939        let (int_part, neg) = if num >= i256::ZERO {
940            (num / pow_scale, "")
941        } else {
942            (-num / pow_scale, "-")
943        };
944        let frac_part = (num % pow_scale).wrapping_abs();
945
946        match frac_part.to_i128() {
947            Some(frac_part) => {
948                write!(
949                    buf,
950                    "{}{}.{:0>width$}",
951                    neg,
952                    int_part,
953                    frac_part,
954                    width = width
955                )
956                .unwrap();
957            }
958            None => {
959                // fractional part is too big for display,
960                // split it into two parts.
961                let pow = i256::from_i128(10i128).pow_wrapping(38);
962                let frac_high_part = frac_part / pow;
963                let frac_low_part = frac_part % pow;
964                let frac_width = (scale - 38) as usize;
965
966                write!(
967                    buf,
968                    "{}{}.{:0>width$}{}",
969                    neg,
970                    int_part,
971                    frac_high_part.to_i128().unwrap(),
972                    frac_low_part.to_i128().unwrap(),
973                    width = frac_width
974                )
975                .unwrap();
976            }
977        }
978    }
979    buf
980}
981
982/// assume text is from
983/// used only for expr, so put more weight on readability
984pub fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
985    let mut start = 0;
986    let bytes = text.as_bytes();
987    let mut is_negative = false;
988
989    // Check if the number is negative
990    if bytes[start] == b'-' {
991        is_negative = true;
992        start += 1;
993    }
994
995    while start < text.len() && bytes[start] == b'0' {
996        start += 1
997    }
998    let text = &text[start..];
999    let point_pos = text.find('.');
1000    let e_pos = text.find(|c| ['E', 'e'].contains(&c));
1001    let (i_part, f_part, e_part) = match (point_pos, e_pos) {
1002        (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
1003        (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
1004        (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
1005        (None, None) => (text, "", None),
1006    };
1007    let exp = match e_part {
1008        Some(s) => s.parse::<i32>()?,
1009        None => 0,
1010    };
1011    if i_part.len() as i32 + exp > 76 {
1012        Err(ConvertError::new("decimal", format!("{:?}", text)).into())
1013    } else {
1014        let mut digits = Vec::with_capacity(76);
1015        digits.extend_from_slice(i_part.as_bytes());
1016        digits.extend_from_slice(f_part.as_bytes());
1017        if digits.is_empty() {
1018            digits.push(b'0')
1019        }
1020        let scale = f_part.len() as i32 - exp;
1021        if scale < 0 {
1022            // e.g 123.1e3
1023            for _ in 0..(-scale) {
1024                digits.push(b'0')
1025            }
1026        };
1027
1028        let precision = std::cmp::min(digits.len(), 76);
1029        let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
1030
1031        let result = if size.precision > 38 {
1032            NumberValue::Decimal256(i256::from_string(digits).unwrap(), size)
1033        } else {
1034            NumberValue::Decimal128(digits.parse::<i128>()?, size)
1035        };
1036
1037        // If the number was negative, negate the result
1038        if is_negative {
1039            match result {
1040                NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
1041                NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
1042                _ => Ok(result),
1043            }
1044        } else {
1045            Ok(result)
1046        }
1047    }
1048}
1049
1050#[derive(Debug, Copy, Clone, PartialEq, Default)]
1051pub struct Interval {
1052    pub months: i32,
1053    pub days: i32,
1054    pub micros: i64,
1055}
1056
1057impl Display for Interval {
1058    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1059        let mut buffer = [0u8; 70];
1060        let len = IntervalToStringCast::format(*self, &mut buffer);
1061        write!(f, "{}", String::from_utf8_lossy(&buffer[..len]))
1062    }
1063}
1064
1065struct IntervalToStringCast;
1066
1067impl IntervalToStringCast {
1068    fn format_signed_number(value: i64, buffer: &mut [u8], length: &mut usize) {
1069        let s = value.to_string();
1070        let bytes = s.as_bytes();
1071        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1072        *length += bytes.len();
1073    }
1074
1075    fn format_two_digits(value: i64, buffer: &mut [u8], length: &mut usize) {
1076        let s = format!("{:02}", value.abs());
1077        let bytes = s.as_bytes();
1078        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1079        *length += bytes.len();
1080    }
1081
1082    fn format_interval_value(value: i32, buffer: &mut [u8], length: &mut usize, name: &str) {
1083        if value == 0 {
1084            return;
1085        }
1086        if *length != 0 {
1087            buffer[*length] = b' ';
1088            *length += 1;
1089        }
1090        Self::format_signed_number(value as i64, buffer, length);
1091        let name_bytes = name.as_bytes();
1092        buffer[*length..*length + name_bytes.len()].copy_from_slice(name_bytes);
1093        *length += name_bytes.len();
1094        if value != 1 && value != -1 {
1095            buffer[*length] = b's';
1096            *length += 1;
1097        }
1098    }
1099
1100    fn format_micros(mut micros: i64, buffer: &mut [u8], length: &mut usize) {
1101        if micros < 0 {
1102            micros = -micros;
1103        }
1104        let s = format!("{:06}", micros);
1105        let bytes = s.as_bytes();
1106        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1107        *length += bytes.len();
1108
1109        while *length > 0 && buffer[*length - 1] == b'0' {
1110            *length -= 1;
1111        }
1112    }
1113
1114    pub fn format(interval: Interval, buffer: &mut [u8]) -> usize {
1115        let mut length = 0;
1116        if interval.months != 0 {
1117            let years = interval.months / 12;
1118            let months = interval.months - years * 12;
1119            Self::format_interval_value(years, buffer, &mut length, " year");
1120            Self::format_interval_value(months, buffer, &mut length, " month");
1121        }
1122        if interval.days != 0 {
1123            Self::format_interval_value(interval.days, buffer, &mut length, " day");
1124        }
1125        if interval.micros != 0 {
1126            if length != 0 {
1127                buffer[length] = b' ';
1128                length += 1;
1129            }
1130            let mut micros = interval.micros;
1131            if micros < 0 {
1132                buffer[length] = b'-';
1133                length += 1;
1134                micros = -micros;
1135            }
1136            let hour = micros / MINROS_PER_HOUR;
1137            micros -= hour * MINROS_PER_HOUR;
1138            let min = micros / MICROS_PER_MINUTE;
1139            micros -= min * MICROS_PER_MINUTE;
1140            let sec = micros / MICROS_PER_SEC;
1141            micros -= sec * MICROS_PER_SEC;
1142
1143            Self::format_signed_number(hour, buffer, &mut length);
1144            buffer[length] = b':';
1145            length += 1;
1146            Self::format_two_digits(min, buffer, &mut length);
1147            buffer[length] = b':';
1148            length += 1;
1149            Self::format_two_digits(sec, buffer, &mut length);
1150            if micros != 0 {
1151                buffer[length] = b'.';
1152                length += 1;
1153                Self::format_micros(micros, buffer, &mut length);
1154            }
1155        } else if length == 0 {
1156            buffer[..8].copy_from_slice(b"00:00:00");
1157            return 8;
1158        }
1159        length
1160    }
1161}
1162
1163impl Interval {
1164    pub fn from_string(str: &str) -> Result<Self> {
1165        Self::from_cstring(str.as_bytes())
1166    }
1167
1168    pub fn from_cstring(str: &[u8]) -> Result<Self> {
1169        let mut result = Interval::default();
1170        let mut pos = 0;
1171        let len = str.len();
1172        let mut found_any = false;
1173
1174        if len == 0 {
1175            return Err(Error::BadArgument("Empty string".to_string()));
1176        }
1177        match str[pos] {
1178            b'@' => {
1179                pos += 1;
1180            }
1181            b'P' | b'p' => {
1182                return Err(Error::BadArgument(
1183                    "Posix intervals not supported yet".to_string(),
1184                ));
1185            }
1186            _ => {}
1187        }
1188
1189        while pos < len {
1190            match str[pos] {
1191                b' ' | b'\t' | b'\n' => {
1192                    pos += 1;
1193                    continue;
1194                }
1195                b'0'..=b'9' => {
1196                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1197                    pos += next_pos;
1198                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1199
1200                    pos += next_pos;
1201                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1202                    found_any = true;
1203                }
1204                b'-' => {
1205                    pos += 1;
1206                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1207                    let number = -number;
1208                    let fraction = -fraction;
1209
1210                    pos += next_pos;
1211
1212                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1213
1214                    pos += next_pos;
1215                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1216                    found_any = true;
1217                }
1218                b'a' | b'A' => {
1219                    if len - pos < 3
1220                        || str[pos + 1] != b'g' && str[pos + 1] != b'G'
1221                        || str[pos + 2] != b'o' && str[pos + 2] != b'O'
1222                    {
1223                        return Err(Error::BadArgument("Invalid 'ago' specifier".to_string()));
1224                    }
1225                    pos += 3;
1226                    while pos < len {
1227                        match str[pos] {
1228                            b' ' | b'\t' | b'\n' => {
1229                                pos += 1;
1230                            }
1231                            _ => {
1232                                return Err(Error::BadArgument(
1233                                    "Trailing characters after 'ago'".to_string(),
1234                                ));
1235                            }
1236                        }
1237                    }
1238                    result.months = -result.months;
1239                    result.days = -result.days;
1240                    result.micros = -result.micros;
1241                    return Ok(result);
1242                }
1243                _ => {
1244                    return Err(Error::BadArgument(format!(
1245                        "Unexpected character at position {}",
1246                        pos
1247                    )));
1248                }
1249            }
1250        }
1251
1252        if !found_any {
1253            return Err(Error::BadArgument(
1254                "No interval specifiers found".to_string(),
1255            ));
1256        }
1257        Ok(result)
1258    }
1259}
1260
1261fn parse_number(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1262    let mut number: i64 = 0;
1263    let mut fraction: i64 = 0;
1264    let mut pos = 0;
1265
1266    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1267        number = number
1268            .checked_mul(10)
1269            .ok_or(Error::BadArgument("Number too large".to_string()))?
1270            + (bytes[pos] - b'0') as i64;
1271        pos += 1;
1272    }
1273
1274    if pos < bytes.len() && bytes[pos] == b'.' {
1275        pos += 1;
1276        let mut mult: i64 = 100000;
1277        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1278            if mult > 0 {
1279                fraction += (bytes[pos] - b'0') as i64 * mult;
1280            }
1281            mult /= 10;
1282            pos += 1;
1283        }
1284    }
1285    if pos < bytes.len() && bytes[pos] == b':' {
1286        // parse time format HH:MM:SS[.FFFFFF]
1287        let time_bytes = &bytes[pos..];
1288        let mut time_pos = 0;
1289        let mut total_micros: i64 = number * 60 * 60 * MICROS_PER_SEC;
1290        let mut colon_count = 0;
1291
1292        while colon_count < 2 && time_bytes.len() > time_pos {
1293            let (minute, _, next_pos) = parse_time_part(&time_bytes[time_pos..])?;
1294            let minute_micros = minute * 60 * MICROS_PER_SEC;
1295            total_micros += minute_micros;
1296            time_pos += next_pos;
1297
1298            if time_bytes.len() > time_pos && time_bytes[time_pos] == b':' {
1299                time_pos += 1;
1300                colon_count += 1;
1301            } else {
1302                break;
1303            }
1304        }
1305        if time_bytes.len() > time_pos {
1306            let (seconds, nanos, next_pos) = parse_time_part_with_nanos(&time_bytes[time_pos..])?;
1307            total_micros += seconds * MICROS_PER_SEC + nanos;
1308            time_pos += next_pos;
1309        }
1310        return Ok((total_micros, 0, pos + time_pos));
1311    }
1312
1313    if pos == 0 {
1314        return Err(Error::BadArgument("Expected number".to_string()));
1315    }
1316
1317    Ok((number, fraction, pos))
1318}
1319
1320fn parse_time_part(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1321    let mut number: i64 = 0;
1322    let mut pos = 0;
1323    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1324        number = number
1325            .checked_mul(10)
1326            .ok_or(Error::BadArgument("Number too large".to_string()))?
1327            + (bytes[pos] - b'0') as i64;
1328        pos += 1;
1329    }
1330    Ok((number, 0, pos))
1331}
1332
1333fn parse_time_part_with_nanos(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1334    let mut number: i64 = 0;
1335    let mut fraction: i64 = 0;
1336    let mut pos = 0;
1337
1338    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1339        number = number
1340            .checked_mul(10)
1341            .ok_or(Error::BadArgument("Number too large".to_string()))?
1342            + (bytes[pos] - b'0') as i64;
1343        pos += 1;
1344    }
1345
1346    if pos < bytes.len() && bytes[pos] == b'.' {
1347        pos += 1;
1348        let mut mult: i64 = 100000000;
1349        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1350            if mult > 0 {
1351                fraction += (bytes[pos] - b'0') as i64 * mult;
1352            }
1353            mult /= 10;
1354            pos += 1;
1355        }
1356    }
1357
1358    Ok((number, fraction, pos))
1359}
1360
1361fn parse_identifier(s: &[u8]) -> (String, usize) {
1362    let mut pos = 0;
1363    while pos < s.len() && (s[pos] == b' ' || s[pos] == b'\t' || s[pos] == b'\n') {
1364        pos += 1;
1365    }
1366    let start_pos = pos;
1367    while pos < s.len() && (s[pos].is_ascii_alphabetic()) {
1368        pos += 1;
1369    }
1370
1371    if pos == start_pos {
1372        return ("".to_string(), pos);
1373    }
1374
1375    let identifier = String::from_utf8_lossy(&s[start_pos..pos]).to_string();
1376    (identifier, pos)
1377}
1378
1379#[derive(Debug, PartialEq, Eq)]
1380enum DatePartSpecifier {
1381    Millennium,
1382    Century,
1383    Decade,
1384    Year,
1385    Quarter,
1386    Month,
1387    Day,
1388    Week,
1389    Microseconds,
1390    Milliseconds,
1391    Second,
1392    Minute,
1393    Hour,
1394}
1395
1396fn try_get_date_part_specifier(specifier_str: &str) -> Result<DatePartSpecifier> {
1397    match specifier_str.to_lowercase().as_str() {
1398        "millennium" | "millennia" => Ok(DatePartSpecifier::Millennium),
1399        "century" | "centuries" => Ok(DatePartSpecifier::Century),
1400        "decade" | "decades" => Ok(DatePartSpecifier::Decade),
1401        "year" | "years" | "y" => Ok(DatePartSpecifier::Year),
1402        "quarter" | "quarters" => Ok(DatePartSpecifier::Quarter),
1403        "month" | "months" | "mon" => Ok(DatePartSpecifier::Month),
1404        "day" | "days" | "d" => Ok(DatePartSpecifier::Day),
1405        "week" | "weeks" | "w" => Ok(DatePartSpecifier::Week),
1406        "microsecond" | "microseconds" | "us" => Ok(DatePartSpecifier::Microseconds),
1407        "millisecond" | "milliseconds" | "ms" => Ok(DatePartSpecifier::Milliseconds),
1408        "second" | "seconds" | "s" => Ok(DatePartSpecifier::Second),
1409        "minute" | "minutes" | "m" => Ok(DatePartSpecifier::Minute),
1410        "hour" | "hours" | "h" => Ok(DatePartSpecifier::Hour),
1411        _ => Err(Error::BadArgument(format!(
1412            "Invalid date part specifier: {}",
1413            specifier_str
1414        ))),
1415    }
1416}
1417
1418const MICROS_PER_SEC: i64 = 1_000_000;
1419const MICROS_PER_MSEC: i64 = 1_000;
1420const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SEC;
1421const MINROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE;
1422const DAYS_PER_WEEK: i32 = 7;
1423const MONTHS_PER_QUARTER: i32 = 3;
1424const MONTHS_PER_YEAR: i32 = 12;
1425const MONTHS_PER_DECADE: i32 = 120;
1426const MONTHS_PER_CENTURY: i32 = 1200;
1427const MONTHS_PER_MILLENNIUM: i32 = 12000;
1428
1429fn apply_specifier(
1430    result: &mut Interval,
1431    number: i64,
1432    fraction: i64,
1433    specifier_str: &str,
1434) -> Result<()> {
1435    if specifier_str.is_empty() {
1436        result.micros = result
1437            .micros
1438            .checked_add(number)
1439            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1440        result.micros = result
1441            .micros
1442            .checked_add(fraction)
1443            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1444        return Ok(());
1445    }
1446
1447    let specifier = try_get_date_part_specifier(specifier_str)?;
1448    match specifier {
1449        DatePartSpecifier::Millennium => {
1450            result.months = result
1451                .months
1452                .checked_add(
1453                    number
1454                        .checked_mul(MONTHS_PER_MILLENNIUM as i64)
1455                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1456                        .try_into()
1457                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1458                )
1459                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1460        }
1461        DatePartSpecifier::Century => {
1462            result.months = result
1463                .months
1464                .checked_add(
1465                    number
1466                        .checked_mul(MONTHS_PER_CENTURY as i64)
1467                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1468                        .try_into()
1469                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1470                )
1471                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1472        }
1473        DatePartSpecifier::Decade => {
1474            result.months = result
1475                .months
1476                .checked_add(
1477                    number
1478                        .checked_mul(MONTHS_PER_DECADE as i64)
1479                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1480                        .try_into()
1481                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1482                )
1483                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1484        }
1485        DatePartSpecifier::Year => {
1486            result.months = result
1487                .months
1488                .checked_add(
1489                    number
1490                        .checked_mul(MONTHS_PER_YEAR as i64)
1491                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1492                        .try_into()
1493                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1494                )
1495                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1496        }
1497        DatePartSpecifier::Quarter => {
1498            result.months = result
1499                .months
1500                .checked_add(
1501                    number
1502                        .checked_mul(MONTHS_PER_QUARTER as i64)
1503                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1504                        .try_into()
1505                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1506                )
1507                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1508        }
1509        DatePartSpecifier::Month => {
1510            result.months = result
1511                .months
1512                .checked_add(
1513                    number
1514                        .try_into()
1515                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1516                )
1517                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1518        }
1519        DatePartSpecifier::Day => {
1520            result.days = result
1521                .days
1522                .checked_add(
1523                    number
1524                        .try_into()
1525                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1526                )
1527                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1528        }
1529        DatePartSpecifier::Week => {
1530            result.days = result
1531                .days
1532                .checked_add(
1533                    number
1534                        .checked_mul(DAYS_PER_WEEK as i64)
1535                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1536                        .try_into()
1537                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1538                )
1539                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1540        }
1541        DatePartSpecifier::Microseconds => {
1542            result.micros = result
1543                .micros
1544                .checked_add(number)
1545                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1546        }
1547        DatePartSpecifier::Milliseconds => {
1548            result.micros = result
1549                .micros
1550                .checked_add(
1551                    number
1552                        .checked_mul(MICROS_PER_MSEC)
1553                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1554                )
1555                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1556        }
1557        DatePartSpecifier::Second => {
1558            result.micros = result
1559                .micros
1560                .checked_add(
1561                    number
1562                        .checked_mul(MICROS_PER_SEC)
1563                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1564                )
1565                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1566        }
1567        DatePartSpecifier::Minute => {
1568            result.micros = result
1569                .micros
1570                .checked_add(
1571                    number
1572                        .checked_mul(MICROS_PER_MINUTE)
1573                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1574                )
1575                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1576        }
1577        DatePartSpecifier::Hour => {
1578            result.micros = result
1579                .micros
1580                .checked_add(
1581                    number
1582                        .checked_mul(MINROS_PER_HOUR)
1583                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1584                )
1585                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1586        }
1587    }
1588    Ok(())
1589}
1590
1591pub fn parse_geometry(raw_data: &[u8]) -> Result<String> {
1592    let mut data = Cursor::new(raw_data);
1593    let wkt = Ewkt::from_wkb(&mut data, WkbDialect::Ewkb)?;
1594    Ok(wkt.0)
1595}
1596
1597struct ValueDecoder {}
1598
1599impl ValueDecoder {
1600    fn read_field<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1601        match ty {
1602            DataType::Null => self.read_null(reader),
1603            DataType::EmptyArray => self.read_empty_array(reader),
1604            DataType::EmptyMap => self.read_empty_map(reader),
1605            DataType::Boolean => self.read_bool(reader),
1606            DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
1607            DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
1608            DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
1609            DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
1610            DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
1611            DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
1612            DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
1613            DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
1614            DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
1615            DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
1616            DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
1617            DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
1618            DataType::String => self.read_string(reader),
1619            DataType::Binary => self.read_binary(reader),
1620            DataType::Timestamp => self.read_timestamp(reader),
1621            DataType::Date => self.read_date(reader),
1622            DataType::Bitmap => self.read_bitmap(reader),
1623            DataType::Variant => self.read_variant(reader),
1624            DataType::Geometry => self.read_geometry(reader),
1625            DataType::Interval => self.read_interval(reader),
1626            DataType::Geography => self.read_geography(reader),
1627            DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
1628            DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
1629            DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
1630            DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
1631        }
1632    }
1633
1634    fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
1635        let pos = reader.checkpoint();
1636        if reader.ignore_bytes(bs) {
1637            true
1638        } else {
1639            reader.rollback(pos);
1640            false
1641        }
1642    }
1643
1644    fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1645        if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
1646            Ok(Value::Null)
1647        } else {
1648            let buf = reader.fill_buf()?;
1649            Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
1650        }
1651    }
1652
1653    fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1654        if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
1655            Ok(Value::Boolean(true))
1656        } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
1657            Ok(Value::Boolean(false))
1658        } else {
1659            let buf = reader.fill_buf()?;
1660            Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
1661        }
1662    }
1663
1664    fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1665        let v: i8 = reader.read_int_text()?;
1666        Ok(Value::Number(NumberValue::Int8(v)))
1667    }
1668
1669    fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1670        let v: i16 = reader.read_int_text()?;
1671        Ok(Value::Number(NumberValue::Int16(v)))
1672    }
1673
1674    fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1675        let v: i32 = reader.read_int_text()?;
1676        Ok(Value::Number(NumberValue::Int32(v)))
1677    }
1678
1679    fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1680        let v: i64 = reader.read_int_text()?;
1681        Ok(Value::Number(NumberValue::Int64(v)))
1682    }
1683
1684    fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1685        let v: u8 = reader.read_int_text()?;
1686        Ok(Value::Number(NumberValue::UInt8(v)))
1687    }
1688
1689    fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1690        let v: u16 = reader.read_int_text()?;
1691        Ok(Value::Number(NumberValue::UInt16(v)))
1692    }
1693
1694    fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1695        let v: u32 = reader.read_int_text()?;
1696        Ok(Value::Number(NumberValue::UInt32(v)))
1697    }
1698
1699    fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1700        let v: u64 = reader.read_int_text()?;
1701        Ok(Value::Number(NumberValue::UInt64(v)))
1702    }
1703
1704    fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1705        let v: f32 = reader.read_float_text()?;
1706        Ok(Value::Number(NumberValue::Float32(v)))
1707    }
1708
1709    fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1710        let v: f64 = reader.read_float_text()?;
1711        Ok(Value::Number(NumberValue::Float64(v)))
1712    }
1713
1714    fn read_decimal<R: AsRef<[u8]>>(
1715        &self,
1716        size: &DecimalSize,
1717        reader: &mut Cursor<R>,
1718    ) -> Result<Value> {
1719        let buf = reader.fill_buf()?;
1720        // parser decimal need fractional part.
1721        // 10.00 and 10 is different value.
1722        let (n_in, _) = collect_number(buf);
1723        let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
1724        let d = parse_decimal(v, *size)?;
1725        reader.consume(n_in);
1726        Ok(Value::Number(d))
1727    }
1728
1729    fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1730        let mut buf = Vec::new();
1731        reader.read_quoted_text(&mut buf, b'\'')?;
1732        Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
1733    }
1734
1735    fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1736        let buf = reader.fill_buf()?;
1737        let n = collect_binary_number(buf);
1738        let v = buf[..n].to_vec();
1739        reader.consume(n);
1740        Ok(Value::Binary(hex::decode(v)?))
1741    }
1742
1743    fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1744        let mut buf = Vec::new();
1745        reader.read_quoted_text(&mut buf, b'\'')?;
1746        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1747        let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
1748        Ok(Value::Date(days))
1749    }
1750
1751    fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1752        let mut buf = Vec::new();
1753        reader.read_quoted_text(&mut buf, b'\'')?;
1754        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1755        let ts = NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")?
1756            .and_utc()
1757            .timestamp_micros();
1758        Ok(Value::Timestamp(ts))
1759    }
1760
1761    fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1762        let mut buf = Vec::new();
1763        reader.read_quoted_text(&mut buf, b'\'')?;
1764        Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
1765    }
1766
1767    fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1768        let mut buf = Vec::new();
1769        reader.read_quoted_text(&mut buf, b'\'')?;
1770        Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
1771    }
1772
1773    fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1774        let mut buf = Vec::new();
1775        reader.read_quoted_text(&mut buf, b'\'')?;
1776        Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
1777    }
1778
1779    fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1780        let mut buf = Vec::new();
1781        reader.read_quoted_text(&mut buf, b'\'')?;
1782        Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
1783    }
1784
1785    fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1786        let mut buf = Vec::new();
1787        reader.read_quoted_text(&mut buf, b'\'')?;
1788        Ok(Value::Geography(unsafe {
1789            String::from_utf8_unchecked(buf)
1790        }))
1791    }
1792
1793    fn read_nullable<R: AsRef<[u8]>>(
1794        &self,
1795        ty: &DataType,
1796        reader: &mut Cursor<R>,
1797    ) -> Result<Value> {
1798        match self.read_null(reader) {
1799            Ok(val) => Ok(val),
1800            Err(_) => self.read_field(ty, reader),
1801        }
1802    }
1803
1804    fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1805        reader.must_ignore_byte(b'[')?;
1806        reader.must_ignore_byte(b']')?;
1807        Ok(Value::EmptyArray)
1808    }
1809
1810    fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1811        reader.must_ignore_byte(b'{')?;
1812        reader.must_ignore_byte(b'}')?;
1813        Ok(Value::EmptyArray)
1814    }
1815
1816    fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1817        let mut vals = Vec::new();
1818        reader.must_ignore_byte(b'[')?;
1819        for idx in 0.. {
1820            let _ = reader.ignore_white_spaces();
1821            if reader.ignore_byte(b']') {
1822                break;
1823            }
1824            if idx != 0 {
1825                reader.must_ignore_byte(b',')?;
1826            }
1827            let _ = reader.ignore_white_spaces();
1828            let val = self.read_field(ty, reader)?;
1829            vals.push(val);
1830        }
1831        Ok(Value::Array(vals))
1832    }
1833
1834    fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1835        const KEY: usize = 0;
1836        const VALUE: usize = 1;
1837        let mut kvs = Vec::new();
1838        reader.must_ignore_byte(b'{')?;
1839        match ty {
1840            DataType::Tuple(inner_tys) => {
1841                for idx in 0.. {
1842                    let _ = reader.ignore_white_spaces();
1843                    if reader.ignore_byte(b'}') {
1844                        break;
1845                    }
1846                    if idx != 0 {
1847                        reader.must_ignore_byte(b',')?;
1848                    }
1849                    let _ = reader.ignore_white_spaces();
1850                    let key = self.read_field(&inner_tys[KEY], reader)?;
1851                    let _ = reader.ignore_white_spaces();
1852                    reader.must_ignore_byte(b':')?;
1853                    let _ = reader.ignore_white_spaces();
1854                    let val = self.read_field(&inner_tys[VALUE], reader)?;
1855                    kvs.push((key, val));
1856                }
1857                Ok(Value::Map(kvs))
1858            }
1859            _ => unreachable!(),
1860        }
1861    }
1862
1863    fn read_tuple<R: AsRef<[u8]>>(
1864        &self,
1865        tys: &[DataType],
1866        reader: &mut Cursor<R>,
1867    ) -> Result<Value> {
1868        let mut vals = Vec::new();
1869        reader.must_ignore_byte(b'(')?;
1870        for (idx, ty) in tys.iter().enumerate() {
1871            let _ = reader.ignore_white_spaces();
1872            if idx != 0 {
1873                reader.must_ignore_byte(b',')?;
1874            }
1875            let _ = reader.ignore_white_spaces();
1876            let val = self.read_field(ty, reader)?;
1877            vals.push(val);
1878        }
1879        reader.must_ignore_byte(b')')?;
1880        Ok(Value::Tuple(vals))
1881    }
1882}
1883
1884/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
1885#[derive(Debug, Copy, Clone, Default, PartialEq, PartialOrd, Ord, Eq, Hash)]
1886#[allow(non_camel_case_types)]
1887#[repr(C)]
1888pub struct months_days_micros(pub i128);
1889
1890/// Mask for extracting the lower 64 bits (microseconds).
1891pub const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
1892/// Mask for extracting the middle 32 bits (days or months).
1893pub const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
1894
1895impl months_days_micros {
1896    /// A new [`months_days_micros`].
1897    pub fn new(months: i32, days: i32, microseconds: i64) -> Self {
1898        let months_bits = (months as i128) << 96;
1899        // converting to u32 before i128 ensures we’re working with the raw, unsigned bit pattern of the i32 value,
1900        // preventing unwanted sign extension when that value is later used within the i128.
1901        let days_bits = ((days as u32) as i128) << 64;
1902        let micros_bits = (microseconds as u64) as i128;
1903
1904        Self(months_bits | days_bits | micros_bits)
1905    }
1906
1907    #[inline]
1908    pub fn months(&self) -> i32 {
1909        // Decoding logic
1910        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
1911    }
1912
1913    #[inline]
1914    pub fn days(&self) -> i32 {
1915        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
1916    }
1917
1918    #[inline]
1919    pub fn microseconds(&self) -> i64 {
1920        (self.0 & MICROS_MASK) as i64
1921    }
1922}