databend_driver_core/
value.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter, Write};
17use std::hash::Hash;
18use std::io::BufRead;
19use std::io::Cursor;
20
21use arrow_buffer::i256;
22use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime};
23use geozero::wkb::FromWkb;
24use geozero::wkb::WkbDialect;
25use geozero::wkt::Ewkt;
26
27use crate::cursor_ext::{
28    collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
29    ReadNumberExt,
30};
31use crate::error::{ConvertError, Error, Result};
32use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
33
34#[cfg(feature = "flight-sql")]
35use {
36    crate::schema::{
37        ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
38        ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
39        ARROW_EXT_TYPE_VARIANT, EXTENSION_KEY,
40    },
41    arrow_array::{
42        Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
43        Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
44        LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
45        StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array,
46        UInt64Array, UInt8Array,
47    },
48    arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit},
49    jsonb::RawJsonb,
50    std::sync::Arc,
51};
52
53// Thu 1970-01-01 is R.D. 719163
54const DAYS_FROM_CE: i32 = 719_163;
55const NULL_VALUE: &str = "NULL";
56const TRUE_VALUE: &str = "1";
57const FALSE_VALUE: &str = "0";
58const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f";
59
60#[derive(Clone, Debug, PartialEq)]
61pub enum NumberValue {
62    Int8(i8),
63    Int16(i16),
64    Int32(i32),
65    Int64(i64),
66    UInt8(u8),
67    UInt16(u16),
68    UInt32(u32),
69    UInt64(u64),
70    Float32(f32),
71    Float64(f64),
72    Decimal128(i128, DecimalSize),
73    Decimal256(i256, DecimalSize),
74}
75
76#[derive(Clone, Debug, PartialEq)]
77pub enum Value {
78    Null,
79    EmptyArray,
80    EmptyMap,
81    Boolean(bool),
82    Binary(Vec<u8>),
83    String(String),
84    Number(NumberValue),
85    /// Microseconds from 1970-01-01 00:00:00 UTC
86    Timestamp(i64),
87    Date(i32),
88    Array(Vec<Value>),
89    Map(Vec<(Value, Value)>),
90    Tuple(Vec<Value>),
91    Bitmap(String),
92    Variant(String),
93    Geometry(String),
94    Geography(String),
95    Interval(String),
96}
97
98impl Value {
99    pub fn get_type(&self) -> DataType {
100        match self {
101            Self::Null => DataType::Null,
102            Self::EmptyArray => DataType::EmptyArray,
103            Self::EmptyMap => DataType::EmptyMap,
104            Self::Boolean(_) => DataType::Boolean,
105            Self::Binary(_) => DataType::Binary,
106            Self::String(_) => DataType::String,
107            Self::Number(n) => match n {
108                NumberValue::Int8(_) => DataType::Number(NumberDataType::Int8),
109                NumberValue::Int16(_) => DataType::Number(NumberDataType::Int16),
110                NumberValue::Int32(_) => DataType::Number(NumberDataType::Int32),
111                NumberValue::Int64(_) => DataType::Number(NumberDataType::Int64),
112                NumberValue::UInt8(_) => DataType::Number(NumberDataType::UInt8),
113                NumberValue::UInt16(_) => DataType::Number(NumberDataType::UInt16),
114                NumberValue::UInt32(_) => DataType::Number(NumberDataType::UInt32),
115                NumberValue::UInt64(_) => DataType::Number(NumberDataType::UInt64),
116                NumberValue::Float32(_) => DataType::Number(NumberDataType::Float32),
117                NumberValue::Float64(_) => DataType::Number(NumberDataType::Float64),
118                NumberValue::Decimal128(_, s) => DataType::Decimal(DecimalDataType::Decimal128(*s)),
119                NumberValue::Decimal256(_, s) => DataType::Decimal(DecimalDataType::Decimal256(*s)),
120            },
121            Self::Timestamp(_) => DataType::Timestamp,
122
123            Self::Date(_) => DataType::Date,
124            Self::Interval(_) => DataType::Interval,
125            Self::Array(vals) => {
126                if vals.is_empty() {
127                    DataType::EmptyArray
128                } else {
129                    DataType::Array(Box::new(vals[0].get_type()))
130                }
131            }
132            Self::Map(kvs) => {
133                if kvs.is_empty() {
134                    DataType::EmptyMap
135                } else {
136                    let inner_ty = DataType::Tuple(vec![kvs[0].0.get_type(), kvs[0].1.get_type()]);
137                    DataType::Map(Box::new(inner_ty))
138                }
139            }
140            Self::Tuple(vals) => {
141                let inner_tys = vals.iter().map(|v| v.get_type()).collect::<Vec<_>>();
142                DataType::Tuple(inner_tys)
143            }
144            Self::Bitmap(_) => DataType::Bitmap,
145            Self::Variant(_) => DataType::Variant,
146            Self::Geometry(_) => DataType::Geometry,
147            Self::Geography(_) => DataType::Geography,
148        }
149    }
150}
151
152impl TryFrom<(&DataType, Option<String>)> for Value {
153    type Error = Error;
154
155    fn try_from((t, v): (&DataType, Option<String>)) -> Result<Self> {
156        match v {
157            Some(v) => Self::try_from((t, v)),
158            None => match t {
159                DataType::Null => Ok(Self::Null),
160                DataType::Nullable(_) => Ok(Self::Null),
161                _ => Err(Error::InvalidResponse(
162                    "NULL value for non-nullable field".to_string(),
163                )),
164            },
165        }
166    }
167}
168
169impl TryFrom<(&DataType, String)> for Value {
170    type Error = Error;
171
172    fn try_from((t, v): (&DataType, String)) -> Result<Self> {
173        match t {
174            DataType::Null => Ok(Self::Null),
175            DataType::EmptyArray => Ok(Self::EmptyArray),
176            DataType::EmptyMap => Ok(Self::EmptyMap),
177            DataType::Boolean => Ok(Self::Boolean(v == "1")),
178            DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
179            DataType::String => Ok(Self::String(v)),
180            DataType::Number(NumberDataType::Int8) => {
181                Ok(Self::Number(NumberValue::Int8(v.parse()?)))
182            }
183            DataType::Number(NumberDataType::Int16) => {
184                Ok(Self::Number(NumberValue::Int16(v.parse()?)))
185            }
186            DataType::Number(NumberDataType::Int32) => {
187                Ok(Self::Number(NumberValue::Int32(v.parse()?)))
188            }
189            DataType::Number(NumberDataType::Int64) => {
190                Ok(Self::Number(NumberValue::Int64(v.parse()?)))
191            }
192            DataType::Number(NumberDataType::UInt8) => {
193                Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
194            }
195            DataType::Number(NumberDataType::UInt16) => {
196                Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
197            }
198            DataType::Number(NumberDataType::UInt32) => {
199                Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
200            }
201            DataType::Number(NumberDataType::UInt64) => {
202                Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
203            }
204            DataType::Number(NumberDataType::Float32) => {
205                Ok(Self::Number(NumberValue::Float32(v.parse()?)))
206            }
207            DataType::Number(NumberDataType::Float64) => {
208                Ok(Self::Number(NumberValue::Float64(v.parse()?)))
209            }
210            DataType::Decimal(DecimalDataType::Decimal128(size)) => {
211                let d = parse_decimal(v.as_str(), *size)?;
212                Ok(Self::Number(d))
213            }
214            DataType::Decimal(DecimalDataType::Decimal256(size)) => {
215                let d = parse_decimal(v.as_str(), *size)?;
216                Ok(Self::Number(d))
217            }
218            DataType::Timestamp => Ok(Self::Timestamp(
219                NaiveDateTime::parse_from_str(v.as_str(), "%Y-%m-%d %H:%M:%S%.6f")?
220                    .and_utc()
221                    .timestamp_micros(),
222            )),
223            DataType::Date => Ok(Self::Date(
224                NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
225                    - DAYS_FROM_CE,
226            )),
227            DataType::Bitmap => Ok(Self::Bitmap(v)),
228            DataType::Variant => Ok(Self::Variant(v)),
229            DataType::Geometry => Ok(Self::Geometry(v)),
230            DataType::Geography => Ok(Self::Geography(v)),
231            DataType::Interval => Ok(Self::Interval(v)),
232            DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) => {
233                let mut reader = Cursor::new(v.as_str());
234                let decoder = ValueDecoder {};
235                decoder.read_field(t, &mut reader)
236            }
237            DataType::Nullable(inner) => match inner.as_ref() {
238                DataType::String => Ok(Self::String(v.to_string())),
239                _ => {
240                    // not string type, try to check if it is NULL
241                    // for compatible with old version server
242                    if v == NULL_VALUE {
243                        Ok(Self::Null)
244                    } else {
245                        Self::try_from((inner.as_ref(), v))
246                    }
247                }
248            },
249        }
250    }
251}
252
253#[cfg(feature = "flight-sql")]
254impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize)> for Value {
255    type Error = Error;
256    fn try_from(
257        (field, array, seq): (&ArrowField, &Arc<dyn ArrowArray>, usize),
258    ) -> std::result::Result<Self, Self::Error> {
259        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
260            return match extend_type.as_str() {
261                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
262                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
263                ARROW_EXT_TYPE_VARIANT => {
264                    if field.is_nullable() && array.is_null(seq) {
265                        return Ok(Value::Null);
266                    }
267                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
268                        Some(array) => {
269                            Ok(Value::Variant(RawJsonb::new(array.value(seq)).to_string()))
270                        }
271                        None => Err(ConvertError::new("variant", format!("{array:?}")).into()),
272                    }
273                }
274                ARROW_EXT_TYPE_INTERVAL => {
275                    if field.is_nullable() && array.is_null(seq) {
276                        return Ok(Value::Null);
277                    }
278                    match array.as_any().downcast_ref::<Decimal128Array>() {
279                        Some(array) => {
280                            let res = months_days_micros(array.value(seq));
281                            Ok(Value::Interval(
282                                Interval {
283                                    months: res.months(),
284                                    days: res.days(),
285                                    micros: res.microseconds(),
286                                }
287                                .to_string(),
288                            ))
289                        }
290                        None => Err(ConvertError::new("Interval", format!("{array:?}")).into()),
291                    }
292                }
293                ARROW_EXT_TYPE_BITMAP => {
294                    if field.is_nullable() && array.is_null(seq) {
295                        return Ok(Value::Null);
296                    }
297                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
298                        Some(array) => {
299                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
300                                .expect("failed to deserialize bitmap");
301                            let raw = rb.into_iter().collect::<Vec<_>>();
302                            let s = itertools::join(raw.iter(), ",");
303                            Ok(Value::Bitmap(s))
304                        }
305                        None => Err(ConvertError::new("bitmap", format!("{array:?}")).into()),
306                    }
307                }
308                ARROW_EXT_TYPE_GEOMETRY => {
309                    if field.is_nullable() && array.is_null(seq) {
310                        return Ok(Value::Null);
311                    }
312                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
313                        Some(array) => {
314                            let wkt = parse_geometry(array.value(seq))?;
315                            Ok(Value::Geometry(wkt))
316                        }
317                        None => Err(ConvertError::new("geometry", format!("{array:?}")).into()),
318                    }
319                }
320                ARROW_EXT_TYPE_GEOGRAPHY => {
321                    if field.is_nullable() && array.is_null(seq) {
322                        return Ok(Value::Null);
323                    }
324                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
325                        Some(array) => {
326                            let wkt = parse_geometry(array.value(seq))?;
327                            Ok(Value::Geography(wkt))
328                        }
329                        None => Err(ConvertError::new("geography", format!("{array:?}")).into()),
330                    }
331                }
332                _ => Err(ConvertError::new(
333                    "extension",
334                    format!("Unsupported extension datatype for arrow field: {field:?}"),
335                )
336                .into()),
337            };
338        }
339
340        if field.is_nullable() && array.is_null(seq) {
341            return Ok(Value::Null);
342        }
343        match field.data_type() {
344            ArrowDataType::Null => Ok(Value::Null),
345            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
346                Some(array) => Ok(Value::Boolean(array.value(seq))),
347                None => Err(ConvertError::new("bool", format!("{array:?}")).into()),
348            },
349            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
350                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
351                None => Err(ConvertError::new("int8", format!("{array:?}")).into()),
352            },
353            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
354                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
355                None => Err(ConvertError::new("int16", format!("{array:?}")).into()),
356            },
357            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
358                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
359                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
360            },
361            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
362                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
363                None => Err(ConvertError::new("int64", format!("{array:?}")).into()),
364            },
365            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
366                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
367                None => Err(ConvertError::new("uint8", format!("{array:?}")).into()),
368            },
369            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
370                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
371                None => Err(ConvertError::new("uint16", format!("{array:?}")).into()),
372            },
373            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
374                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
375                None => Err(ConvertError::new("uint32", format!("{array:?}")).into()),
376            },
377            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
378                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
379                None => Err(ConvertError::new("uint64", format!("{array:?}")).into()),
380            },
381            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
382                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
383                None => Err(ConvertError::new("float32", format!("{array:?}")).into()),
384            },
385            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
386                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
387                None => Err(ConvertError::new("float64", format!("{array:?}")).into()),
388            },
389
390            ArrowDataType::Decimal128(p, s) => {
391                match array.as_any().downcast_ref::<Decimal128Array>() {
392                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
393                        array.value(seq),
394                        DecimalSize {
395                            precision: *p,
396                            scale: *s as u8,
397                        },
398                    ))),
399                    None => Err(ConvertError::new("Decimal128", format!("{array:?}")).into()),
400                }
401            }
402            ArrowDataType::Decimal256(p, s) => {
403                match array.as_any().downcast_ref::<Decimal256Array>() {
404                    Some(array) => Ok(Value::Number(NumberValue::Decimal256(
405                        array.value(seq),
406                        DecimalSize {
407                            precision: *p,
408                            scale: *s as u8,
409                        },
410                    ))),
411                    None => Err(ConvertError::new("Decimal256", format!("{array:?}")).into()),
412                }
413            }
414
415            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
416                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
417                None => Err(ConvertError::new("binary", format!("{array:?}")).into()),
418            },
419            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
420                match array.as_any().downcast_ref::<LargeBinaryArray>() {
421                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
422                    None => Err(ConvertError::new("large binary", format!("{array:?}")).into()),
423                }
424            }
425            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
426                Some(array) => Ok(Value::String(array.value(seq).to_string())),
427                None => Err(ConvertError::new("string", format!("{array:?}")).into()),
428            },
429            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
430                Some(array) => Ok(Value::String(array.value(seq).to_string())),
431                None => Err(ConvertError::new("large string", format!("{array:?}")).into()),
432            },
433            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
434                Some(array) => Ok(Value::String(array.value(seq).to_string())),
435                None => Err(ConvertError::new("string view", format!("{array:?}")).into()),
436            },
437            // we only support timestamp in microsecond in databend
438            ArrowDataType::Timestamp(unit, tz) => {
439                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
440                    Some(array) => {
441                        if unit != &TimeUnit::Microsecond {
442                            return Err(ConvertError::new("timestamp", format!("{array:?}"))
443                                .with_message(format!(
444                                    "unsupported timestamp unit: {unit:?}, only support microsecond"
445                                ))
446                                .into());
447                        }
448                        let ts = array.value(seq);
449                        match tz {
450                            None => Ok(Value::Timestamp(ts)),
451                            Some(tz) => Err(ConvertError::new("timestamp", format!("{array:?}"))
452                                .with_message(format!("non-UTC timezone not supported: {tz:?}"))
453                                .into()),
454                        }
455                    }
456                    None => Err(ConvertError::new("timestamp", format!("{array:?}")).into()),
457                }
458            }
459            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
460                Some(array) => Ok(Value::Date(array.value(seq))),
461                None => Err(ConvertError::new("date", format!("{array:?}")).into()),
462            },
463            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
464                Some(array) => {
465                    let inner_array = unsafe { array.value_unchecked(seq) };
466                    let mut values = Vec::with_capacity(inner_array.len());
467                    for i in 0..inner_array.len() {
468                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
469                        values.push(value);
470                    }
471                    Ok(Value::Array(values))
472                }
473                None => Err(ConvertError::new("list", format!("{array:?}")).into()),
474            },
475            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
476                Some(array) => {
477                    let inner_array = unsafe { array.value_unchecked(seq) };
478                    let mut values = Vec::with_capacity(inner_array.len());
479                    for i in 0..inner_array.len() {
480                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
481                        values.push(value);
482                    }
483                    Ok(Value::Array(values))
484                }
485                None => Err(ConvertError::new("large list", format!("{array:?}")).into()),
486            },
487            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
488                Some(array) => {
489                    if let ArrowDataType::Struct(fs) = f.data_type() {
490                        let inner_array = unsafe { array.value_unchecked(seq) };
491                        let mut values = Vec::with_capacity(inner_array.len());
492                        for i in 0..inner_array.len() {
493                            let key = Value::try_from((fs[0].as_ref(), inner_array.column(0), i))?;
494                            let val = Value::try_from((fs[1].as_ref(), inner_array.column(1), i))?;
495                            values.push((key, val));
496                        }
497                        Ok(Value::Map(values))
498                    } else {
499                        Err(
500                            ConvertError::new("invalid map inner type", format!("{array:?}"))
501                                .into(),
502                        )
503                    }
504                }
505                None => Err(ConvertError::new("map", format!("{array:?}")).into()),
506            },
507            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
508                Some(array) => {
509                    let mut values = Vec::with_capacity(array.len());
510                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
511                        let value = Value::try_from((f.as_ref(), inner_array, seq))?;
512                        values.push(value);
513                    }
514                    Ok(Value::Tuple(values))
515                }
516                None => Err(ConvertError::new("struct", format!("{array:?}")).into()),
517            },
518            _ => Err(ConvertError::new("unsupported data type", format!("{array:?}")).into()),
519        }
520    }
521}
522
523impl TryFrom<Value> for String {
524    type Error = Error;
525    fn try_from(val: Value) -> Result<Self> {
526        match val {
527            Value::String(s) => Ok(s),
528            Value::Bitmap(s) => Ok(s),
529            Value::Number(NumberValue::Decimal128(v, s)) => Ok(display_decimal_128(v, s.scale)),
530            Value::Number(NumberValue::Decimal256(v, s)) => Ok(display_decimal_256(v, s.scale)),
531            Value::Geometry(s) => Ok(s),
532            Value::Geography(s) => Ok(s),
533            Value::Interval(s) => Ok(s),
534            Value::Variant(s) => Ok(s),
535            _ => Err(ConvertError::new("string", format!("{val:?}")).into()),
536        }
537    }
538}
539
540impl TryFrom<Value> for bool {
541    type Error = Error;
542    fn try_from(val: Value) -> Result<Self> {
543        match val {
544            Value::Boolean(b) => Ok(b),
545            Value::Number(n) => Ok(n != NumberValue::Int8(0)),
546            _ => Err(ConvertError::new("bool", format!("{val:?}")).into()),
547        }
548    }
549}
550
551// This macro implements TryFrom for NumberValue
552macro_rules! impl_try_from_number_value {
553    ($($t:ty),*) => {
554        $(
555            impl TryFrom<Value> for $t {
556                type Error = Error;
557                fn try_from(val: Value) -> Result<Self> {
558                    match val {
559                        Value::Number(NumberValue::Int8(i)) => Ok(i as $t),
560                        Value::Number(NumberValue::Int16(i)) => Ok(i as $t),
561                        Value::Number(NumberValue::Int32(i)) => Ok(i as $t),
562                        Value::Number(NumberValue::Int64(i)) => Ok(i as $t),
563                        Value::Number(NumberValue::UInt8(i)) => Ok(i as $t),
564                        Value::Number(NumberValue::UInt16(i)) => Ok(i as $t),
565                        Value::Number(NumberValue::UInt32(i)) => Ok(i as $t),
566                        Value::Number(NumberValue::UInt64(i)) => Ok(i as $t),
567                        Value::Number(NumberValue::Float32(i)) => Ok(i as $t),
568                        Value::Number(NumberValue::Float64(i)) => Ok(i as $t),
569                        Value::Date(i) => Ok(i as $t),
570                        Value::Timestamp(i) => Ok(i as $t),
571                        _ => Err(ConvertError::new("number", format!("{:?}", val)).into()),
572                    }
573                }
574            }
575        )*
576    };
577}
578
579impl_try_from_number_value!(u8);
580impl_try_from_number_value!(u16);
581impl_try_from_number_value!(u32);
582impl_try_from_number_value!(u64);
583impl_try_from_number_value!(i8);
584impl_try_from_number_value!(i16);
585impl_try_from_number_value!(i32);
586impl_try_from_number_value!(i64);
587impl_try_from_number_value!(f32);
588impl_try_from_number_value!(f64);
589
590impl TryFrom<Value> for NaiveDateTime {
591    type Error = Error;
592    fn try_from(val: Value) -> Result<Self> {
593        match val {
594            Value::Timestamp(i) => {
595                let secs = i / 1_000_000;
596                let nanos = ((i % 1_000_000) * 1000) as u32;
597                match DateTime::from_timestamp(secs, nanos) {
598                    Some(t) => Ok(t.naive_utc()),
599                    None => Err(ConvertError::new("NaiveDateTime", "".to_string()).into()),
600                }
601            }
602            _ => Err(ConvertError::new("NaiveDateTime", format!("{val}")).into()),
603        }
604    }
605}
606
607impl TryFrom<Value> for NaiveDate {
608    type Error = Error;
609    fn try_from(val: Value) -> Result<Self> {
610        match val {
611            Value::Date(i) => {
612                let days = i + DAYS_FROM_CE;
613                match NaiveDate::from_num_days_from_ce_opt(days) {
614                    Some(d) => Ok(d),
615                    None => Err(ConvertError::new("NaiveDate", "".to_string()).into()),
616                }
617            }
618            _ => Err(ConvertError::new("NaiveDate", format!("{val}")).into()),
619        }
620    }
621}
622
623impl<V> TryFrom<Value> for Vec<V>
624where
625    V: TryFrom<Value, Error = Error>,
626{
627    type Error = Error;
628    fn try_from(val: Value) -> Result<Self> {
629        match val {
630            Value::Binary(vals) => vals
631                .into_iter()
632                .map(|v| V::try_from(Value::Number(NumberValue::UInt8(v))))
633                .collect(),
634            Value::Array(vals) => vals.into_iter().map(V::try_from).collect(),
635            Value::EmptyArray => Ok(vec![]),
636            _ => Err(ConvertError::new("Vec", format!("{val}")).into()),
637        }
638    }
639}
640
641impl<K, V> TryFrom<Value> for HashMap<K, V>
642where
643    K: TryFrom<Value, Error = Error> + Eq + Hash,
644    V: TryFrom<Value, Error = Error>,
645{
646    type Error = Error;
647    fn try_from(val: Value) -> Result<Self> {
648        match val {
649            Value::Map(kvs) => {
650                let mut map = HashMap::new();
651                for (k, v) in kvs {
652                    let k = K::try_from(k)?;
653                    let v = V::try_from(v)?;
654                    map.insert(k, v);
655                }
656                Ok(map)
657            }
658            Value::EmptyMap => Ok(HashMap::new()),
659            _ => Err(ConvertError::new("HashMap", format!("{val}")).into()),
660        }
661    }
662}
663
664macro_rules! replace_expr {
665    ($_t:tt $sub:expr) => {
666        $sub
667    };
668}
669
670// This macro implements TryFrom for tuple of types
671macro_rules! impl_tuple_from_value {
672    ( $($Ti:tt),+ ) => {
673        impl<$($Ti),+> TryFrom<Value> for ($($Ti,)+)
674        where
675            $($Ti: TryFrom<Value>),+
676        {
677            type Error = String;
678            fn try_from(val: Value) -> Result<Self, String> {
679                // It is not possible yet to get the number of metavariable repetitions
680                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
681                // This is a workaround
682                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
683
684                match val {
685                    Value::Tuple(vals) => {
686                        if expected_len != vals.len() {
687                            return Err(format!("value tuple size mismatch: expected {} columns, got {}", expected_len, vals.len()));
688                        }
689                        let mut vals_iter = vals.into_iter().enumerate();
690
691                        Ok((
692                            $(
693                                {
694                                    let (col_ix, col_value) = vals_iter
695                                        .next()
696                                        .unwrap(); // vals_iter size is checked before this code is reached,
697                                                   // so it is safe to unwrap
698                                    let t = col_value.get_type();
699                                    $Ti::try_from(col_value)
700                                        .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
701                                }
702                            ,)+
703                        ))
704                    }
705                    _ => Err(format!("expected tuple, got {:?}", val)),
706                }
707            }
708        }
709    }
710}
711
712// Implement From Value for tuples of size up to 16
713impl_tuple_from_value!(T1);
714impl_tuple_from_value!(T1, T2);
715impl_tuple_from_value!(T1, T2, T3);
716impl_tuple_from_value!(T1, T2, T3, T4);
717impl_tuple_from_value!(T1, T2, T3, T4, T5);
718impl_tuple_from_value!(T1, T2, T3, T4, T5, T6);
719impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7);
720impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8);
721impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
722impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
723impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
724impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
725impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
726impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
727impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
728impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
729impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17);
730impl_tuple_from_value!(
731    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18
732);
733impl_tuple_from_value!(
734    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19
735);
736impl_tuple_from_value!(
737    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20
738);
739impl_tuple_from_value!(
740    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21
741);
742impl_tuple_from_value!(
743    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21,
744    T22
745);
746
747// This macro implements TryFrom to Option for Nullable column
748macro_rules! impl_try_from_to_option {
749    ($($t:ty),*) => {
750        $(
751            impl TryFrom<Value> for Option<$t> {
752                type Error = Error;
753                fn try_from(val: Value) -> Result<Self> {
754                    match val {
755                        Value::Null => Ok(None),
756                        _ => {
757                            let inner: $t = val.try_into()?;
758                            Ok(Some(inner))
759                        },
760                    }
761
762                }
763            }
764        )*
765    };
766}
767
768impl_try_from_to_option!(String);
769impl_try_from_to_option!(bool);
770impl_try_from_to_option!(u8);
771impl_try_from_to_option!(u16);
772impl_try_from_to_option!(u32);
773impl_try_from_to_option!(u64);
774impl_try_from_to_option!(i8);
775impl_try_from_to_option!(i16);
776impl_try_from_to_option!(i32);
777impl_try_from_to_option!(i64);
778impl_try_from_to_option!(f32);
779impl_try_from_to_option!(f64);
780impl_try_from_to_option!(NaiveDateTime);
781impl_try_from_to_option!(NaiveDate);
782
783impl std::fmt::Display for NumberValue {
784    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
785        match self {
786            NumberValue::Int8(i) => write!(f, "{i}"),
787            NumberValue::Int16(i) => write!(f, "{i}"),
788            NumberValue::Int32(i) => write!(f, "{i}"),
789            NumberValue::Int64(i) => write!(f, "{i}"),
790            NumberValue::UInt8(i) => write!(f, "{i}"),
791            NumberValue::UInt16(i) => write!(f, "{i}"),
792            NumberValue::UInt32(i) => write!(f, "{i}"),
793            NumberValue::UInt64(i) => write!(f, "{i}"),
794            NumberValue::Float32(i) => write!(f, "{i}"),
795            NumberValue::Float64(i) => write!(f, "{i}"),
796            NumberValue::Decimal128(v, s) => write!(f, "{}", display_decimal_128(*v, s.scale)),
797            NumberValue::Decimal256(v, s) => write!(f, "{}", display_decimal_256(*v, s.scale)),
798        }
799    }
800}
801
802impl std::fmt::Display for Value {
803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804        encode_value(f, self, true)
805    }
806}
807
808// Compatible with Databend, inner values of nested types are quoted.
809fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result {
810    match val {
811        Value::Null => write!(f, "NULL"),
812        Value::EmptyArray => write!(f, "[]"),
813        Value::EmptyMap => write!(f, "{{}}"),
814        Value::Boolean(b) => {
815            if *b {
816                write!(f, "true")
817            } else {
818                write!(f, "false")
819            }
820        }
821        Value::Number(n) => write!(f, "{n}"),
822        Value::Binary(s) => write!(f, "{}", hex::encode_upper(s)),
823        Value::String(s)
824        | Value::Bitmap(s)
825        | Value::Variant(s)
826        | Value::Interval(s)
827        | Value::Geometry(s)
828        | Value::Geography(s) => {
829            if raw {
830                write!(f, "{s}")
831            } else {
832                write!(f, "'{s}'")
833            }
834        }
835        Value::Timestamp(micros) => {
836            let (mut secs, mut nanos) = (*micros / 1_000_000, (*micros % 1_000_000) * 1_000);
837            if nanos < 0 {
838                secs -= 1;
839                nanos += 1_000_000_000;
840            }
841            let t = DateTime::from_timestamp(secs, nanos as _).unwrap_or_default();
842            let t = t.naive_utc();
843            if raw {
844                write!(f, "{}", t.format(TIMESTAMP_FORMAT))
845            } else {
846                write!(f, "'{}'", t.format(TIMESTAMP_FORMAT))
847            }
848        }
849        Value::Date(i) => {
850            let days = i + DAYS_FROM_CE;
851            let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
852            if raw {
853                write!(f, "{d}")
854            } else {
855                write!(f, "'{d}'")
856            }
857        }
858        Value::Array(vals) => {
859            write!(f, "[")?;
860            for (i, val) in vals.iter().enumerate() {
861                if i > 0 {
862                    write!(f, ",")?;
863                }
864                encode_value(f, val, false)?;
865            }
866            write!(f, "]")?;
867            Ok(())
868        }
869        Value::Map(kvs) => {
870            write!(f, "{{")?;
871            for (i, (key, val)) in kvs.iter().enumerate() {
872                if i > 0 {
873                    write!(f, ",")?;
874                }
875                encode_value(f, key, false)?;
876                write!(f, ":")?;
877                encode_value(f, val, false)?;
878            }
879            write!(f, "}}")?;
880            Ok(())
881        }
882        Value::Tuple(vals) => {
883            write!(f, "(")?;
884            for (i, val) in vals.iter().enumerate() {
885                if i > 0 {
886                    write!(f, ",")?;
887                }
888                encode_value(f, val, false)?;
889            }
890            write!(f, ")")?;
891            Ok(())
892        }
893    }
894}
895
896pub fn display_decimal_128(num: i128, scale: u8) -> String {
897    let mut buf = String::new();
898    if scale == 0 {
899        write!(buf, "{num}").unwrap();
900    } else {
901        let pow_scale = 10_i128.pow(scale as u32);
902        if num >= 0 {
903            write!(
904                buf,
905                "{}.{:0>width$}",
906                num / pow_scale,
907                (num % pow_scale).abs(),
908                width = scale as usize
909            )
910            .unwrap();
911        } else {
912            write!(
913                buf,
914                "-{}.{:0>width$}",
915                -num / pow_scale,
916                (num % pow_scale).abs(),
917                width = scale as usize
918            )
919            .unwrap();
920        }
921    }
922    buf
923}
924
925pub fn display_decimal_256(num: i256, scale: u8) -> String {
926    let mut buf = String::new();
927    if scale == 0 {
928        write!(buf, "{num}").unwrap();
929    } else {
930        let pow_scale = i256::from_i128(10i128).wrapping_pow(scale as u32);
931        let width = scale as usize;
932        // -1/10 = 0
933        let (int_part, neg) = if num >= i256::ZERO {
934            (num / pow_scale, "")
935        } else {
936            (-num / pow_scale, "-")
937        };
938        let frac_part = (num % pow_scale).wrapping_abs();
939
940        match frac_part.to_i128() {
941            Some(frac_part) => {
942                write!(buf, "{neg}{int_part}.{frac_part:0>width$}").unwrap();
943            }
944            None => {
945                // fractional part is too big for display,
946                // split it into two parts.
947                let pow = i256::from_i128(10i128).wrapping_pow(38);
948                let frac_high_part = frac_part / pow;
949                let frac_low_part = frac_part % pow;
950                let frac_width = (scale - 38) as usize;
951
952                write!(
953                    buf,
954                    "{neg}{int_part}.{:0>frac_width$}{}",
955                    frac_high_part.to_i128().unwrap(),
956                    frac_low_part.to_i128().unwrap(),
957                )
958                .unwrap();
959            }
960        }
961    }
962    buf
963}
964
965/// assume text is from
966/// used only for expr, so put more weight on readability
967pub fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
968    let mut start = 0;
969    let bytes = text.as_bytes();
970    let mut is_negative = false;
971
972    // Check if the number is negative
973    if bytes[start] == b'-' {
974        is_negative = true;
975        start += 1;
976    }
977
978    while start < text.len() && bytes[start] == b'0' {
979        start += 1
980    }
981    let text = &text[start..];
982    let point_pos = text.find('.');
983    let e_pos = text.find(|c| ['E', 'e'].contains(&c));
984    let (i_part, f_part, e_part) = match (point_pos, e_pos) {
985        (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
986        (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
987        (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
988        (None, None) => (text, "", None),
989    };
990    let exp = match e_part {
991        Some(s) => s.parse::<i32>()?,
992        None => 0,
993    };
994    if i_part.len() as i32 + exp > 76 {
995        Err(ConvertError::new("decimal", format!("{text:?}")).into())
996    } else {
997        let mut digits = Vec::with_capacity(76);
998        digits.extend_from_slice(i_part.as_bytes());
999        digits.extend_from_slice(f_part.as_bytes());
1000        if digits.is_empty() {
1001            digits.push(b'0')
1002        }
1003        let scale = f_part.len() as i32 - exp;
1004        if scale < 0 {
1005            // e.g 123.1e3
1006            for _ in 0..(-scale) {
1007                digits.push(b'0')
1008            }
1009        };
1010
1011        let precision = std::cmp::min(digits.len(), 76);
1012        let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
1013
1014        let result = if size.precision > 38 {
1015            NumberValue::Decimal256(i256::from_string(digits).unwrap(), size)
1016        } else {
1017            NumberValue::Decimal128(digits.parse::<i128>()?, size)
1018        };
1019
1020        // If the number was negative, negate the result
1021        if is_negative {
1022            match result {
1023                NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
1024                NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
1025                _ => Ok(result),
1026            }
1027        } else {
1028            Ok(result)
1029        }
1030    }
1031}
1032
1033#[derive(Debug, Copy, Clone, PartialEq, Default)]
1034pub struct Interval {
1035    pub months: i32,
1036    pub days: i32,
1037    pub micros: i64,
1038}
1039
1040impl Display for Interval {
1041    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1042        let mut buffer = [0u8; 70];
1043        let len = IntervalToStringCast::format(*self, &mut buffer);
1044        write!(f, "{}", String::from_utf8_lossy(&buffer[..len]))
1045    }
1046}
1047
1048struct IntervalToStringCast;
1049
1050impl IntervalToStringCast {
1051    fn format_signed_number(value: i64, buffer: &mut [u8], length: &mut usize) {
1052        let s = value.to_string();
1053        let bytes = s.as_bytes();
1054        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1055        *length += bytes.len();
1056    }
1057
1058    fn format_two_digits(value: i64, buffer: &mut [u8], length: &mut usize) {
1059        let s = format!("{:02}", value.abs());
1060        let bytes = s.as_bytes();
1061        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1062        *length += bytes.len();
1063    }
1064
1065    fn format_interval_value(value: i32, buffer: &mut [u8], length: &mut usize, name: &str) {
1066        if value == 0 {
1067            return;
1068        }
1069        if *length != 0 {
1070            buffer[*length] = b' ';
1071            *length += 1;
1072        }
1073        Self::format_signed_number(value as i64, buffer, length);
1074        let name_bytes = name.as_bytes();
1075        buffer[*length..*length + name_bytes.len()].copy_from_slice(name_bytes);
1076        *length += name_bytes.len();
1077        if value != 1 && value != -1 {
1078            buffer[*length] = b's';
1079            *length += 1;
1080        }
1081    }
1082
1083    fn format_micros(mut micros: i64, buffer: &mut [u8], length: &mut usize) {
1084        if micros < 0 {
1085            micros = -micros;
1086        }
1087        let s = format!("{micros:06}");
1088        let bytes = s.as_bytes();
1089        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1090        *length += bytes.len();
1091
1092        while *length > 0 && buffer[*length - 1] == b'0' {
1093            *length -= 1;
1094        }
1095    }
1096
1097    pub fn format(interval: Interval, buffer: &mut [u8]) -> usize {
1098        let mut length = 0;
1099        if interval.months != 0 {
1100            let years = interval.months / 12;
1101            let months = interval.months - years * 12;
1102            Self::format_interval_value(years, buffer, &mut length, " year");
1103            Self::format_interval_value(months, buffer, &mut length, " month");
1104        }
1105        if interval.days != 0 {
1106            Self::format_interval_value(interval.days, buffer, &mut length, " day");
1107        }
1108        if interval.micros != 0 {
1109            if length != 0 {
1110                buffer[length] = b' ';
1111                length += 1;
1112            }
1113            let mut micros = interval.micros;
1114            if micros < 0 {
1115                buffer[length] = b'-';
1116                length += 1;
1117                micros = -micros;
1118            }
1119            let hour = micros / MINROS_PER_HOUR;
1120            micros -= hour * MINROS_PER_HOUR;
1121            let min = micros / MICROS_PER_MINUTE;
1122            micros -= min * MICROS_PER_MINUTE;
1123            let sec = micros / MICROS_PER_SEC;
1124            micros -= sec * MICROS_PER_SEC;
1125
1126            Self::format_signed_number(hour, buffer, &mut length);
1127            buffer[length] = b':';
1128            length += 1;
1129            Self::format_two_digits(min, buffer, &mut length);
1130            buffer[length] = b':';
1131            length += 1;
1132            Self::format_two_digits(sec, buffer, &mut length);
1133            if micros != 0 {
1134                buffer[length] = b'.';
1135                length += 1;
1136                Self::format_micros(micros, buffer, &mut length);
1137            }
1138        } else if length == 0 {
1139            buffer[..8].copy_from_slice(b"00:00:00");
1140            return 8;
1141        }
1142        length
1143    }
1144}
1145
1146impl Interval {
1147    pub fn from_string(str: &str) -> Result<Self> {
1148        Self::from_cstring(str.as_bytes())
1149    }
1150
1151    pub fn from_cstring(str: &[u8]) -> Result<Self> {
1152        let mut result = Interval::default();
1153        let mut pos = 0;
1154        let len = str.len();
1155        let mut found_any = false;
1156
1157        if len == 0 {
1158            return Err(Error::BadArgument("Empty string".to_string()));
1159        }
1160        match str[pos] {
1161            b'@' => {
1162                pos += 1;
1163            }
1164            b'P' | b'p' => {
1165                return Err(Error::BadArgument(
1166                    "Posix intervals not supported yet".to_string(),
1167                ));
1168            }
1169            _ => {}
1170        }
1171
1172        while pos < len {
1173            match str[pos] {
1174                b' ' | b'\t' | b'\n' => {
1175                    pos += 1;
1176                    continue;
1177                }
1178                b'0'..=b'9' => {
1179                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1180                    pos += next_pos;
1181                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1182
1183                    pos += next_pos;
1184                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1185                    found_any = true;
1186                }
1187                b'-' => {
1188                    pos += 1;
1189                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1190                    let number = -number;
1191                    let fraction = -fraction;
1192
1193                    pos += next_pos;
1194
1195                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1196
1197                    pos += next_pos;
1198                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1199                    found_any = true;
1200                }
1201                b'a' | b'A' => {
1202                    if len - pos < 3
1203                        || str[pos + 1] != b'g' && str[pos + 1] != b'G'
1204                        || str[pos + 2] != b'o' && str[pos + 2] != b'O'
1205                    {
1206                        return Err(Error::BadArgument("Invalid 'ago' specifier".to_string()));
1207                    }
1208                    pos += 3;
1209                    while pos < len {
1210                        match str[pos] {
1211                            b' ' | b'\t' | b'\n' => {
1212                                pos += 1;
1213                            }
1214                            _ => {
1215                                return Err(Error::BadArgument(
1216                                    "Trailing characters after 'ago'".to_string(),
1217                                ));
1218                            }
1219                        }
1220                    }
1221                    result.months = -result.months;
1222                    result.days = -result.days;
1223                    result.micros = -result.micros;
1224                    return Ok(result);
1225                }
1226                _ => {
1227                    return Err(Error::BadArgument(format!(
1228                        "Unexpected character at position {pos}"
1229                    )));
1230                }
1231            }
1232        }
1233
1234        if !found_any {
1235            return Err(Error::BadArgument(
1236                "No interval specifiers found".to_string(),
1237            ));
1238        }
1239        Ok(result)
1240    }
1241}
1242
1243fn parse_number(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1244    let mut number: i64 = 0;
1245    let mut fraction: i64 = 0;
1246    let mut pos = 0;
1247
1248    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1249        number = number
1250            .checked_mul(10)
1251            .ok_or(Error::BadArgument("Number too large".to_string()))?
1252            + (bytes[pos] - b'0') as i64;
1253        pos += 1;
1254    }
1255
1256    if pos < bytes.len() && bytes[pos] == b'.' {
1257        pos += 1;
1258        let mut mult: i64 = 100000;
1259        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1260            if mult > 0 {
1261                fraction += (bytes[pos] - b'0') as i64 * mult;
1262            }
1263            mult /= 10;
1264            pos += 1;
1265        }
1266    }
1267    if pos < bytes.len() && bytes[pos] == b':' {
1268        // parse time format HH:MM:SS[.FFFFFF]
1269        let time_bytes = &bytes[pos..];
1270        let mut time_pos = 0;
1271        let mut total_micros: i64 = number * 60 * 60 * MICROS_PER_SEC;
1272        let mut colon_count = 0;
1273
1274        while colon_count < 2 && time_bytes.len() > time_pos {
1275            let (minute, _, next_pos) = parse_time_part(&time_bytes[time_pos..])?;
1276            let minute_micros = minute * 60 * MICROS_PER_SEC;
1277            total_micros += minute_micros;
1278            time_pos += next_pos;
1279
1280            if time_bytes.len() > time_pos && time_bytes[time_pos] == b':' {
1281                time_pos += 1;
1282                colon_count += 1;
1283            } else {
1284                break;
1285            }
1286        }
1287        if time_bytes.len() > time_pos {
1288            let (seconds, nanos, next_pos) = parse_time_part_with_nanos(&time_bytes[time_pos..])?;
1289            total_micros += seconds * MICROS_PER_SEC + nanos;
1290            time_pos += next_pos;
1291        }
1292        return Ok((total_micros, 0, pos + time_pos));
1293    }
1294
1295    if pos == 0 {
1296        return Err(Error::BadArgument("Expected number".to_string()));
1297    }
1298
1299    Ok((number, fraction, pos))
1300}
1301
1302fn parse_time_part(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1303    let mut number: i64 = 0;
1304    let mut pos = 0;
1305    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1306        number = number
1307            .checked_mul(10)
1308            .ok_or(Error::BadArgument("Number too large".to_string()))?
1309            + (bytes[pos] - b'0') as i64;
1310        pos += 1;
1311    }
1312    Ok((number, 0, pos))
1313}
1314
1315fn parse_time_part_with_nanos(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1316    let mut number: i64 = 0;
1317    let mut fraction: i64 = 0;
1318    let mut pos = 0;
1319
1320    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1321        number = number
1322            .checked_mul(10)
1323            .ok_or(Error::BadArgument("Number too large".to_string()))?
1324            + (bytes[pos] - b'0') as i64;
1325        pos += 1;
1326    }
1327
1328    if pos < bytes.len() && bytes[pos] == b'.' {
1329        pos += 1;
1330        let mut mult: i64 = 100000000;
1331        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1332            if mult > 0 {
1333                fraction += (bytes[pos] - b'0') as i64 * mult;
1334            }
1335            mult /= 10;
1336            pos += 1;
1337        }
1338    }
1339
1340    Ok((number, fraction, pos))
1341}
1342
1343fn parse_identifier(s: &[u8]) -> (String, usize) {
1344    let mut pos = 0;
1345    while pos < s.len() && (s[pos] == b' ' || s[pos] == b'\t' || s[pos] == b'\n') {
1346        pos += 1;
1347    }
1348    let start_pos = pos;
1349    while pos < s.len() && (s[pos].is_ascii_alphabetic()) {
1350        pos += 1;
1351    }
1352
1353    if pos == start_pos {
1354        return ("".to_string(), pos);
1355    }
1356
1357    let identifier = String::from_utf8_lossy(&s[start_pos..pos]).to_string();
1358    (identifier, pos)
1359}
1360
1361#[derive(Debug, PartialEq, Eq)]
1362enum DatePartSpecifier {
1363    Millennium,
1364    Century,
1365    Decade,
1366    Year,
1367    Quarter,
1368    Month,
1369    Day,
1370    Week,
1371    Microseconds,
1372    Milliseconds,
1373    Second,
1374    Minute,
1375    Hour,
1376}
1377
1378fn try_get_date_part_specifier(specifier_str: &str) -> Result<DatePartSpecifier> {
1379    match specifier_str.to_lowercase().as_str() {
1380        "millennium" | "millennia" => Ok(DatePartSpecifier::Millennium),
1381        "century" | "centuries" => Ok(DatePartSpecifier::Century),
1382        "decade" | "decades" => Ok(DatePartSpecifier::Decade),
1383        "year" | "years" | "y" => Ok(DatePartSpecifier::Year),
1384        "quarter" | "quarters" => Ok(DatePartSpecifier::Quarter),
1385        "month" | "months" | "mon" => Ok(DatePartSpecifier::Month),
1386        "day" | "days" | "d" => Ok(DatePartSpecifier::Day),
1387        "week" | "weeks" | "w" => Ok(DatePartSpecifier::Week),
1388        "microsecond" | "microseconds" | "us" => Ok(DatePartSpecifier::Microseconds),
1389        "millisecond" | "milliseconds" | "ms" => Ok(DatePartSpecifier::Milliseconds),
1390        "second" | "seconds" | "s" => Ok(DatePartSpecifier::Second),
1391        "minute" | "minutes" | "m" => Ok(DatePartSpecifier::Minute),
1392        "hour" | "hours" | "h" => Ok(DatePartSpecifier::Hour),
1393        _ => Err(Error::BadArgument(format!(
1394            "Invalid date part specifier: {specifier_str}"
1395        ))),
1396    }
1397}
1398
1399const MICROS_PER_SEC: i64 = 1_000_000;
1400const MICROS_PER_MSEC: i64 = 1_000;
1401const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SEC;
1402const MINROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE;
1403const DAYS_PER_WEEK: i32 = 7;
1404const MONTHS_PER_QUARTER: i32 = 3;
1405const MONTHS_PER_YEAR: i32 = 12;
1406const MONTHS_PER_DECADE: i32 = 120;
1407const MONTHS_PER_CENTURY: i32 = 1200;
1408const MONTHS_PER_MILLENNIUM: i32 = 12000;
1409
1410fn apply_specifier(
1411    result: &mut Interval,
1412    number: i64,
1413    fraction: i64,
1414    specifier_str: &str,
1415) -> Result<()> {
1416    if specifier_str.is_empty() {
1417        result.micros = result
1418            .micros
1419            .checked_add(number)
1420            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1421        result.micros = result
1422            .micros
1423            .checked_add(fraction)
1424            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1425        return Ok(());
1426    }
1427
1428    let specifier = try_get_date_part_specifier(specifier_str)?;
1429    match specifier {
1430        DatePartSpecifier::Millennium => {
1431            result.months = result
1432                .months
1433                .checked_add(
1434                    number
1435                        .checked_mul(MONTHS_PER_MILLENNIUM as i64)
1436                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1437                        .try_into()
1438                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1439                )
1440                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1441        }
1442        DatePartSpecifier::Century => {
1443            result.months = result
1444                .months
1445                .checked_add(
1446                    number
1447                        .checked_mul(MONTHS_PER_CENTURY as i64)
1448                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1449                        .try_into()
1450                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1451                )
1452                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1453        }
1454        DatePartSpecifier::Decade => {
1455            result.months = result
1456                .months
1457                .checked_add(
1458                    number
1459                        .checked_mul(MONTHS_PER_DECADE as i64)
1460                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1461                        .try_into()
1462                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1463                )
1464                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1465        }
1466        DatePartSpecifier::Year => {
1467            result.months = result
1468                .months
1469                .checked_add(
1470                    number
1471                        .checked_mul(MONTHS_PER_YEAR as i64)
1472                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1473                        .try_into()
1474                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1475                )
1476                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1477        }
1478        DatePartSpecifier::Quarter => {
1479            result.months = result
1480                .months
1481                .checked_add(
1482                    number
1483                        .checked_mul(MONTHS_PER_QUARTER as i64)
1484                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1485                        .try_into()
1486                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1487                )
1488                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1489        }
1490        DatePartSpecifier::Month => {
1491            result.months = result
1492                .months
1493                .checked_add(
1494                    number
1495                        .try_into()
1496                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1497                )
1498                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1499        }
1500        DatePartSpecifier::Day => {
1501            result.days = result
1502                .days
1503                .checked_add(
1504                    number
1505                        .try_into()
1506                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1507                )
1508                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1509        }
1510        DatePartSpecifier::Week => {
1511            result.days = result
1512                .days
1513                .checked_add(
1514                    number
1515                        .checked_mul(DAYS_PER_WEEK as i64)
1516                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1517                        .try_into()
1518                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1519                )
1520                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1521        }
1522        DatePartSpecifier::Microseconds => {
1523            result.micros = result
1524                .micros
1525                .checked_add(number)
1526                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1527        }
1528        DatePartSpecifier::Milliseconds => {
1529            result.micros = result
1530                .micros
1531                .checked_add(
1532                    number
1533                        .checked_mul(MICROS_PER_MSEC)
1534                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1535                )
1536                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1537        }
1538        DatePartSpecifier::Second => {
1539            result.micros = result
1540                .micros
1541                .checked_add(
1542                    number
1543                        .checked_mul(MICROS_PER_SEC)
1544                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1545                )
1546                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1547        }
1548        DatePartSpecifier::Minute => {
1549            result.micros = result
1550                .micros
1551                .checked_add(
1552                    number
1553                        .checked_mul(MICROS_PER_MINUTE)
1554                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1555                )
1556                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1557        }
1558        DatePartSpecifier::Hour => {
1559            result.micros = result
1560                .micros
1561                .checked_add(
1562                    number
1563                        .checked_mul(MINROS_PER_HOUR)
1564                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1565                )
1566                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1567        }
1568    }
1569    Ok(())
1570}
1571
1572pub fn parse_geometry(raw_data: &[u8]) -> Result<String> {
1573    let mut data = Cursor::new(raw_data);
1574    let wkt = Ewkt::from_wkb(&mut data, WkbDialect::Ewkb)?;
1575    Ok(wkt.0)
1576}
1577
1578struct ValueDecoder {}
1579
1580impl ValueDecoder {
1581    fn read_field<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1582        match ty {
1583            DataType::Null => self.read_null(reader),
1584            DataType::EmptyArray => self.read_empty_array(reader),
1585            DataType::EmptyMap => self.read_empty_map(reader),
1586            DataType::Boolean => self.read_bool(reader),
1587            DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
1588            DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
1589            DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
1590            DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
1591            DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
1592            DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
1593            DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
1594            DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
1595            DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
1596            DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
1597            DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
1598            DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
1599            DataType::String => self.read_string(reader),
1600            DataType::Binary => self.read_binary(reader),
1601            DataType::Timestamp => self.read_timestamp(reader),
1602            DataType::Date => self.read_date(reader),
1603            DataType::Bitmap => self.read_bitmap(reader),
1604            DataType::Variant => self.read_variant(reader),
1605            DataType::Geometry => self.read_geometry(reader),
1606            DataType::Interval => self.read_interval(reader),
1607            DataType::Geography => self.read_geography(reader),
1608            DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
1609            DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
1610            DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
1611            DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
1612        }
1613    }
1614
1615    fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
1616        let pos = reader.checkpoint();
1617        if reader.ignore_bytes(bs) {
1618            true
1619        } else {
1620            reader.rollback(pos);
1621            false
1622        }
1623    }
1624
1625    fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1626        if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
1627            Ok(Value::Null)
1628        } else {
1629            let buf = reader.fill_buf()?;
1630            Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
1631        }
1632    }
1633
1634    fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1635        if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
1636            Ok(Value::Boolean(true))
1637        } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
1638            Ok(Value::Boolean(false))
1639        } else {
1640            let buf = reader.fill_buf()?;
1641            Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
1642        }
1643    }
1644
1645    fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1646        let v: i8 = reader.read_int_text()?;
1647        Ok(Value::Number(NumberValue::Int8(v)))
1648    }
1649
1650    fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1651        let v: i16 = reader.read_int_text()?;
1652        Ok(Value::Number(NumberValue::Int16(v)))
1653    }
1654
1655    fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1656        let v: i32 = reader.read_int_text()?;
1657        Ok(Value::Number(NumberValue::Int32(v)))
1658    }
1659
1660    fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1661        let v: i64 = reader.read_int_text()?;
1662        Ok(Value::Number(NumberValue::Int64(v)))
1663    }
1664
1665    fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1666        let v: u8 = reader.read_int_text()?;
1667        Ok(Value::Number(NumberValue::UInt8(v)))
1668    }
1669
1670    fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1671        let v: u16 = reader.read_int_text()?;
1672        Ok(Value::Number(NumberValue::UInt16(v)))
1673    }
1674
1675    fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1676        let v: u32 = reader.read_int_text()?;
1677        Ok(Value::Number(NumberValue::UInt32(v)))
1678    }
1679
1680    fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1681        let v: u64 = reader.read_int_text()?;
1682        Ok(Value::Number(NumberValue::UInt64(v)))
1683    }
1684
1685    fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1686        let v: f32 = reader.read_float_text()?;
1687        Ok(Value::Number(NumberValue::Float32(v)))
1688    }
1689
1690    fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1691        let v: f64 = reader.read_float_text()?;
1692        Ok(Value::Number(NumberValue::Float64(v)))
1693    }
1694
1695    fn read_decimal<R: AsRef<[u8]>>(
1696        &self,
1697        size: &DecimalSize,
1698        reader: &mut Cursor<R>,
1699    ) -> Result<Value> {
1700        let buf = reader.fill_buf()?;
1701        // parser decimal need fractional part.
1702        // 10.00 and 10 is different value.
1703        let (n_in, _) = collect_number(buf);
1704        let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
1705        let d = parse_decimal(v, *size)?;
1706        reader.consume(n_in);
1707        Ok(Value::Number(d))
1708    }
1709
1710    fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1711        let mut buf = Vec::new();
1712        reader.read_quoted_text(&mut buf, b'\'')?;
1713        Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
1714    }
1715
1716    fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1717        let buf = reader.fill_buf()?;
1718        let n = collect_binary_number(buf);
1719        let v = buf[..n].to_vec();
1720        reader.consume(n);
1721        Ok(Value::Binary(hex::decode(v)?))
1722    }
1723
1724    fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1725        let mut buf = Vec::new();
1726        reader.read_quoted_text(&mut buf, b'\'')?;
1727        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1728        let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
1729        Ok(Value::Date(days))
1730    }
1731
1732    fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1733        let mut buf = Vec::new();
1734        reader.read_quoted_text(&mut buf, b'\'')?;
1735        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1736        let ts = NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")?
1737            .and_utc()
1738            .timestamp_micros();
1739        Ok(Value::Timestamp(ts))
1740    }
1741
1742    fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1743        let mut buf = Vec::new();
1744        reader.read_quoted_text(&mut buf, b'\'')?;
1745        Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
1746    }
1747
1748    fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1749        let mut buf = Vec::new();
1750        reader.read_quoted_text(&mut buf, b'\'')?;
1751        Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
1752    }
1753
1754    fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1755        let mut buf = Vec::new();
1756        reader.read_quoted_text(&mut buf, b'\'')?;
1757        Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
1758    }
1759
1760    fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1761        let mut buf = Vec::new();
1762        reader.read_quoted_text(&mut buf, b'\'')?;
1763        Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
1764    }
1765
1766    fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1767        let mut buf = Vec::new();
1768        reader.read_quoted_text(&mut buf, b'\'')?;
1769        Ok(Value::Geography(unsafe {
1770            String::from_utf8_unchecked(buf)
1771        }))
1772    }
1773
1774    fn read_nullable<R: AsRef<[u8]>>(
1775        &self,
1776        ty: &DataType,
1777        reader: &mut Cursor<R>,
1778    ) -> Result<Value> {
1779        match self.read_null(reader) {
1780            Ok(val) => Ok(val),
1781            Err(_) => self.read_field(ty, reader),
1782        }
1783    }
1784
1785    fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1786        reader.must_ignore_byte(b'[')?;
1787        reader.must_ignore_byte(b']')?;
1788        Ok(Value::EmptyArray)
1789    }
1790
1791    fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1792        reader.must_ignore_byte(b'{')?;
1793        reader.must_ignore_byte(b'}')?;
1794        Ok(Value::EmptyArray)
1795    }
1796
1797    fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1798        let mut vals = Vec::new();
1799        reader.must_ignore_byte(b'[')?;
1800        for idx in 0.. {
1801            let _ = reader.ignore_white_spaces();
1802            if reader.ignore_byte(b']') {
1803                break;
1804            }
1805            if idx != 0 {
1806                reader.must_ignore_byte(b',')?;
1807            }
1808            let _ = reader.ignore_white_spaces();
1809            let val = self.read_field(ty, reader)?;
1810            vals.push(val);
1811        }
1812        Ok(Value::Array(vals))
1813    }
1814
1815    fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1816        const KEY: usize = 0;
1817        const VALUE: usize = 1;
1818        let mut kvs = Vec::new();
1819        reader.must_ignore_byte(b'{')?;
1820        match ty {
1821            DataType::Tuple(inner_tys) => {
1822                for idx in 0.. {
1823                    let _ = reader.ignore_white_spaces();
1824                    if reader.ignore_byte(b'}') {
1825                        break;
1826                    }
1827                    if idx != 0 {
1828                        reader.must_ignore_byte(b',')?;
1829                    }
1830                    let _ = reader.ignore_white_spaces();
1831                    let key = self.read_field(&inner_tys[KEY], reader)?;
1832                    let _ = reader.ignore_white_spaces();
1833                    reader.must_ignore_byte(b':')?;
1834                    let _ = reader.ignore_white_spaces();
1835                    let val = self.read_field(&inner_tys[VALUE], reader)?;
1836                    kvs.push((key, val));
1837                }
1838                Ok(Value::Map(kvs))
1839            }
1840            _ => unreachable!(),
1841        }
1842    }
1843
1844    fn read_tuple<R: AsRef<[u8]>>(
1845        &self,
1846        tys: &[DataType],
1847        reader: &mut Cursor<R>,
1848    ) -> Result<Value> {
1849        let mut vals = Vec::new();
1850        reader.must_ignore_byte(b'(')?;
1851        for (idx, ty) in tys.iter().enumerate() {
1852            let _ = reader.ignore_white_spaces();
1853            if idx != 0 {
1854                reader.must_ignore_byte(b',')?;
1855            }
1856            let _ = reader.ignore_white_spaces();
1857            let val = self.read_field(ty, reader)?;
1858            vals.push(val);
1859        }
1860        reader.must_ignore_byte(b')')?;
1861        Ok(Value::Tuple(vals))
1862    }
1863}
1864
1865/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
1866#[derive(Debug, Copy, Clone, Default, PartialEq, PartialOrd, Ord, Eq, Hash)]
1867#[allow(non_camel_case_types)]
1868#[repr(C)]
1869pub struct months_days_micros(pub i128);
1870
1871/// Mask for extracting the lower 64 bits (microseconds).
1872pub const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
1873/// Mask for extracting the middle 32 bits (days or months).
1874pub const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
1875
1876impl months_days_micros {
1877    /// A new [`months_days_micros`].
1878    pub fn new(months: i32, days: i32, microseconds: i64) -> Self {
1879        let months_bits = (months as i128) << 96;
1880        // converting to u32 before i128 ensures we’re working with the raw, unsigned bit pattern of the i32 value,
1881        // preventing unwanted sign extension when that value is later used within the i128.
1882        let days_bits = ((days as u32) as i128) << 64;
1883        let micros_bits = (microseconds as u64) as i128;
1884
1885        Self(months_bits | days_bits | micros_bits)
1886    }
1887
1888    #[inline]
1889    pub fn months(&self) -> i32 {
1890        // Decoding logic
1891        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
1892    }
1893
1894    #[inline]
1895    pub fn days(&self) -> i32 {
1896        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
1897    }
1898
1899    #[inline]
1900    pub fn microseconds(&self) -> i64 {
1901        (self.0 & MICROS_MASK) as i64
1902    }
1903}