databend_driver_core/
value.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter, Write};
17use std::hash::Hash;
18use std::io::BufRead;
19use std::io::Cursor;
20
21use arrow_buffer::i256;
22use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime};
23use geozero::wkb::FromWkb;
24use geozero::wkb::WkbDialect;
25use geozero::wkt::Ewkt;
26
27use crate::cursor_ext::{
28    collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
29    ReadNumberExt,
30};
31use crate::error::{ConvertError, Error, Result};
32use crate::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
33
34#[cfg(feature = "flight-sql")]
35use {
36    crate::schema::{
37        ARROW_EXT_TYPE_BITMAP, ARROW_EXT_TYPE_EMPTY_ARRAY, ARROW_EXT_TYPE_EMPTY_MAP,
38        ARROW_EXT_TYPE_GEOGRAPHY, ARROW_EXT_TYPE_GEOMETRY, ARROW_EXT_TYPE_INTERVAL,
39        ARROW_EXT_TYPE_VARIANT, EXTENSION_KEY,
40    },
41    arrow_array::{
42        Array as ArrowArray, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
43        Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
44        LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, MapArray, StringArray,
45        StringViewArray, StructArray, TimestampMicrosecondArray, UInt16Array, UInt32Array,
46        UInt64Array, UInt8Array,
47    },
48    arrow_schema::{DataType as ArrowDataType, Field as ArrowField, TimeUnit},
49    jsonb::RawJsonb,
50    std::sync::Arc,
51};
52
53// Thu 1970-01-01 is R.D. 719163
54const DAYS_FROM_CE: i32 = 719_163;
55const NULL_VALUE: &str = "NULL";
56const TRUE_VALUE: &str = "1";
57const FALSE_VALUE: &str = "0";
58const TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.6f";
59
60#[derive(Clone, Debug, PartialEq)]
61pub enum NumberValue {
62    Int8(i8),
63    Int16(i16),
64    Int32(i32),
65    Int64(i64),
66    UInt8(u8),
67    UInt16(u16),
68    UInt32(u32),
69    UInt64(u64),
70    Float32(f32),
71    Float64(f64),
72    Decimal128(i128, DecimalSize),
73    Decimal256(i256, DecimalSize),
74}
75
76#[derive(Clone, Debug, PartialEq)]
77pub enum Value {
78    Null,
79    EmptyArray,
80    EmptyMap,
81    Boolean(bool),
82    Binary(Vec<u8>),
83    String(String),
84    Number(NumberValue),
85    /// Microseconds from 1970-01-01 00:00:00 UTC
86    Timestamp(i64),
87    Date(i32),
88    Array(Vec<Value>),
89    Map(Vec<(Value, Value)>),
90    Tuple(Vec<Value>),
91    Bitmap(String),
92    Variant(String),
93    Geometry(String),
94    Geography(String),
95    Interval(String),
96}
97
98impl Value {
99    pub fn get_type(&self) -> DataType {
100        match self {
101            Self::Null => DataType::Null,
102            Self::EmptyArray => DataType::EmptyArray,
103            Self::EmptyMap => DataType::EmptyMap,
104            Self::Boolean(_) => DataType::Boolean,
105            Self::Binary(_) => DataType::Binary,
106            Self::String(_) => DataType::String,
107            Self::Number(n) => match n {
108                NumberValue::Int8(_) => DataType::Number(NumberDataType::Int8),
109                NumberValue::Int16(_) => DataType::Number(NumberDataType::Int16),
110                NumberValue::Int32(_) => DataType::Number(NumberDataType::Int32),
111                NumberValue::Int64(_) => DataType::Number(NumberDataType::Int64),
112                NumberValue::UInt8(_) => DataType::Number(NumberDataType::UInt8),
113                NumberValue::UInt16(_) => DataType::Number(NumberDataType::UInt16),
114                NumberValue::UInt32(_) => DataType::Number(NumberDataType::UInt32),
115                NumberValue::UInt64(_) => DataType::Number(NumberDataType::UInt64),
116                NumberValue::Float32(_) => DataType::Number(NumberDataType::Float32),
117                NumberValue::Float64(_) => DataType::Number(NumberDataType::Float64),
118                NumberValue::Decimal128(_, s) => DataType::Decimal(DecimalDataType::Decimal128(*s)),
119                NumberValue::Decimal256(_, s) => DataType::Decimal(DecimalDataType::Decimal256(*s)),
120            },
121            Self::Timestamp(_) => DataType::Timestamp,
122
123            Self::Date(_) => DataType::Date,
124            Self::Interval(_) => DataType::Interval,
125            Self::Array(vals) => {
126                if vals.is_empty() {
127                    DataType::EmptyArray
128                } else {
129                    DataType::Array(Box::new(vals[0].get_type()))
130                }
131            }
132            Self::Map(kvs) => {
133                if kvs.is_empty() {
134                    DataType::EmptyMap
135                } else {
136                    let inner_ty = DataType::Tuple(vec![kvs[0].0.get_type(), kvs[0].1.get_type()]);
137                    DataType::Map(Box::new(inner_ty))
138                }
139            }
140            Self::Tuple(vals) => {
141                let inner_tys = vals.iter().map(|v| v.get_type()).collect::<Vec<_>>();
142                DataType::Tuple(inner_tys)
143            }
144            Self::Bitmap(_) => DataType::Bitmap,
145            Self::Variant(_) => DataType::Variant,
146            Self::Geometry(_) => DataType::Geometry,
147            Self::Geography(_) => DataType::Geography,
148        }
149    }
150}
151
152impl TryFrom<(&DataType, Option<String>)> for Value {
153    type Error = Error;
154
155    fn try_from((t, v): (&DataType, Option<String>)) -> Result<Self> {
156        match v {
157            Some(v) => Self::try_from((t, v)),
158            None => match t {
159                DataType::Null => Ok(Self::Null),
160                DataType::Nullable(_) => Ok(Self::Null),
161                _ => Err(Error::InvalidResponse(
162                    "NULL value for non-nullable field".to_string(),
163                )),
164            },
165        }
166    }
167}
168
169impl TryFrom<(&DataType, String)> for Value {
170    type Error = Error;
171
172    fn try_from((t, v): (&DataType, String)) -> Result<Self> {
173        match t {
174            DataType::Null => Ok(Self::Null),
175            DataType::EmptyArray => Ok(Self::EmptyArray),
176            DataType::EmptyMap => Ok(Self::EmptyMap),
177            DataType::Boolean => Ok(Self::Boolean(v == "1")),
178            DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
179            DataType::String => Ok(Self::String(v)),
180            DataType::Number(NumberDataType::Int8) => {
181                Ok(Self::Number(NumberValue::Int8(v.parse()?)))
182            }
183            DataType::Number(NumberDataType::Int16) => {
184                Ok(Self::Number(NumberValue::Int16(v.parse()?)))
185            }
186            DataType::Number(NumberDataType::Int32) => {
187                Ok(Self::Number(NumberValue::Int32(v.parse()?)))
188            }
189            DataType::Number(NumberDataType::Int64) => {
190                Ok(Self::Number(NumberValue::Int64(v.parse()?)))
191            }
192            DataType::Number(NumberDataType::UInt8) => {
193                Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
194            }
195            DataType::Number(NumberDataType::UInt16) => {
196                Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
197            }
198            DataType::Number(NumberDataType::UInt32) => {
199                Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
200            }
201            DataType::Number(NumberDataType::UInt64) => {
202                Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
203            }
204            DataType::Number(NumberDataType::Float32) => {
205                Ok(Self::Number(NumberValue::Float32(v.parse()?)))
206            }
207            DataType::Number(NumberDataType::Float64) => {
208                Ok(Self::Number(NumberValue::Float64(v.parse()?)))
209            }
210            DataType::Decimal(DecimalDataType::Decimal128(size)) => {
211                let d = parse_decimal(v.as_str(), *size)?;
212                Ok(Self::Number(d))
213            }
214            DataType::Decimal(DecimalDataType::Decimal256(size)) => {
215                let d = parse_decimal(v.as_str(), *size)?;
216                Ok(Self::Number(d))
217            }
218            DataType::Timestamp => Ok(Self::Timestamp(
219                NaiveDateTime::parse_from_str(v.as_str(), "%Y-%m-%d %H:%M:%S%.6f")?
220                    .and_utc()
221                    .timestamp_micros(),
222            )),
223            DataType::Date => Ok(Self::Date(
224                NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
225                    - DAYS_FROM_CE,
226            )),
227            DataType::Bitmap => Ok(Self::Bitmap(v)),
228            DataType::Variant => Ok(Self::Variant(v)),
229            DataType::Geometry => Ok(Self::Geometry(v)),
230            DataType::Geography => Ok(Self::Geography(v)),
231            DataType::Interval => Ok(Self::Interval(v)),
232            DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) => {
233                let mut reader = Cursor::new(v.as_str());
234                let decoder = ValueDecoder {};
235                decoder.read_field(t, &mut reader)
236            }
237            DataType::Nullable(inner) => match inner.as_ref() {
238                DataType::String => Ok(Self::String(v.to_string())),
239                _ => {
240                    // not string type, try to check if it is NULL
241                    // for compatible with old version server
242                    if v == NULL_VALUE {
243                        Ok(Self::Null)
244                    } else {
245                        Self::try_from((inner.as_ref(), v))
246                    }
247                }
248            },
249        }
250    }
251}
252
253#[cfg(feature = "flight-sql")]
254impl TryFrom<(&ArrowField, &Arc<dyn ArrowArray>, usize)> for Value {
255    type Error = Error;
256    fn try_from(
257        (field, array, seq): (&ArrowField, &Arc<dyn ArrowArray>, usize),
258    ) -> std::result::Result<Self, Self::Error> {
259        if let Some(extend_type) = field.metadata().get(EXTENSION_KEY) {
260            return match extend_type.as_str() {
261                ARROW_EXT_TYPE_EMPTY_ARRAY => Ok(Value::EmptyArray),
262                ARROW_EXT_TYPE_EMPTY_MAP => Ok(Value::EmptyMap),
263                ARROW_EXT_TYPE_VARIANT => {
264                    if field.is_nullable() && array.is_null(seq) {
265                        return Ok(Value::Null);
266                    }
267                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
268                        Some(array) => {
269                            Ok(Value::Variant(RawJsonb::new(array.value(seq)).to_string()))
270                        }
271                        None => Err(ConvertError::new("variant", format!("{:?}", array)).into()),
272                    }
273                }
274                ARROW_EXT_TYPE_INTERVAL => {
275                    if field.is_nullable() && array.is_null(seq) {
276                        return Ok(Value::Null);
277                    }
278                    match array.as_any().downcast_ref::<Decimal128Array>() {
279                        Some(array) => {
280                            let res = months_days_micros(array.value(seq));
281                            Ok(Value::Interval(
282                                Interval {
283                                    months: res.months(),
284                                    days: res.days(),
285                                    micros: res.microseconds(),
286                                }
287                                .to_string(),
288                            ))
289                        }
290                        None => Err(ConvertError::new("Interval", format!("{:?}", array)).into()),
291                    }
292                }
293                ARROW_EXT_TYPE_BITMAP => {
294                    if field.is_nullable() && array.is_null(seq) {
295                        return Ok(Value::Null);
296                    }
297                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
298                        Some(array) => {
299                            let rb = roaring::RoaringTreemap::deserialize_from(array.value(seq))
300                                .expect("failed to deserialize bitmap");
301                            let raw = rb.into_iter().collect::<Vec<_>>();
302                            let s = itertools::join(raw.iter(), ",");
303                            Ok(Value::Bitmap(s))
304                        }
305                        None => Err(ConvertError::new("bitmap", format!("{:?}", array)).into()),
306                    }
307                }
308                ARROW_EXT_TYPE_GEOMETRY => {
309                    if field.is_nullable() && array.is_null(seq) {
310                        return Ok(Value::Null);
311                    }
312                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
313                        Some(array) => {
314                            let wkt = parse_geometry(array.value(seq))?;
315                            Ok(Value::Geometry(wkt))
316                        }
317                        None => Err(ConvertError::new("geometry", format!("{:?}", array)).into()),
318                    }
319                }
320                ARROW_EXT_TYPE_GEOGRAPHY => {
321                    if field.is_nullable() && array.is_null(seq) {
322                        return Ok(Value::Null);
323                    }
324                    match array.as_any().downcast_ref::<LargeBinaryArray>() {
325                        Some(array) => {
326                            let wkt = parse_geometry(array.value(seq))?;
327                            Ok(Value::Geography(wkt))
328                        }
329                        None => Err(ConvertError::new("geography", format!("{:?}", array)).into()),
330                    }
331                }
332                _ => Err(ConvertError::new(
333                    "extension",
334                    format!(
335                        "Unsupported extension datatype for arrow field: {:?}",
336                        field
337                    ),
338                )
339                .into()),
340            };
341        }
342
343        if field.is_nullable() && array.is_null(seq) {
344            return Ok(Value::Null);
345        }
346        match field.data_type() {
347            ArrowDataType::Null => Ok(Value::Null),
348            ArrowDataType::Boolean => match array.as_any().downcast_ref::<BooleanArray>() {
349                Some(array) => Ok(Value::Boolean(array.value(seq))),
350                None => Err(ConvertError::new("bool", format!("{:?}", array)).into()),
351            },
352            ArrowDataType::Int8 => match array.as_any().downcast_ref::<Int8Array>() {
353                Some(array) => Ok(Value::Number(NumberValue::Int8(array.value(seq)))),
354                None => Err(ConvertError::new("int8", format!("{:?}", array)).into()),
355            },
356            ArrowDataType::Int16 => match array.as_any().downcast_ref::<Int16Array>() {
357                Some(array) => Ok(Value::Number(NumberValue::Int16(array.value(seq)))),
358                None => Err(ConvertError::new("int16", format!("{:?}", array)).into()),
359            },
360            ArrowDataType::Int32 => match array.as_any().downcast_ref::<Int32Array>() {
361                Some(array) => Ok(Value::Number(NumberValue::Int32(array.value(seq)))),
362                None => Err(ConvertError::new("int64", format!("{:?}", array)).into()),
363            },
364            ArrowDataType::Int64 => match array.as_any().downcast_ref::<Int64Array>() {
365                Some(array) => Ok(Value::Number(NumberValue::Int64(array.value(seq)))),
366                None => Err(ConvertError::new("int64", format!("{:?}", array)).into()),
367            },
368            ArrowDataType::UInt8 => match array.as_any().downcast_ref::<UInt8Array>() {
369                Some(array) => Ok(Value::Number(NumberValue::UInt8(array.value(seq)))),
370                None => Err(ConvertError::new("uint8", format!("{:?}", array)).into()),
371            },
372            ArrowDataType::UInt16 => match array.as_any().downcast_ref::<UInt16Array>() {
373                Some(array) => Ok(Value::Number(NumberValue::UInt16(array.value(seq)))),
374                None => Err(ConvertError::new("uint16", format!("{:?}", array)).into()),
375            },
376            ArrowDataType::UInt32 => match array.as_any().downcast_ref::<UInt32Array>() {
377                Some(array) => Ok(Value::Number(NumberValue::UInt32(array.value(seq)))),
378                None => Err(ConvertError::new("uint32", format!("{:?}", array)).into()),
379            },
380            ArrowDataType::UInt64 => match array.as_any().downcast_ref::<UInt64Array>() {
381                Some(array) => Ok(Value::Number(NumberValue::UInt64(array.value(seq)))),
382                None => Err(ConvertError::new("uint64", format!("{:?}", array)).into()),
383            },
384            ArrowDataType::Float32 => match array.as_any().downcast_ref::<Float32Array>() {
385                Some(array) => Ok(Value::Number(NumberValue::Float32(array.value(seq)))),
386                None => Err(ConvertError::new("float32", format!("{:?}", array)).into()),
387            },
388            ArrowDataType::Float64 => match array.as_any().downcast_ref::<Float64Array>() {
389                Some(array) => Ok(Value::Number(NumberValue::Float64(array.value(seq)))),
390                None => Err(ConvertError::new("float64", format!("{:?}", array)).into()),
391            },
392
393            ArrowDataType::Decimal128(p, s) => {
394                match array.as_any().downcast_ref::<Decimal128Array>() {
395                    Some(array) => Ok(Value::Number(NumberValue::Decimal128(
396                        array.value(seq),
397                        DecimalSize {
398                            precision: *p,
399                            scale: *s as u8,
400                        },
401                    ))),
402                    None => Err(ConvertError::new("Decimal128", format!("{:?}", array)).into()),
403                }
404            }
405            ArrowDataType::Decimal256(p, s) => {
406                match array.as_any().downcast_ref::<Decimal256Array>() {
407                    Some(array) => Ok(Value::Number(NumberValue::Decimal256(
408                        array.value(seq),
409                        DecimalSize {
410                            precision: *p,
411                            scale: *s as u8,
412                        },
413                    ))),
414                    None => Err(ConvertError::new("Decimal256", format!("{:?}", array)).into()),
415                }
416            }
417
418            ArrowDataType::Binary => match array.as_any().downcast_ref::<BinaryArray>() {
419                Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
420                None => Err(ConvertError::new("binary", format!("{:?}", array)).into()),
421            },
422            ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
423                match array.as_any().downcast_ref::<LargeBinaryArray>() {
424                    Some(array) => Ok(Value::Binary(array.value(seq).to_vec())),
425                    None => Err(ConvertError::new("large binary", format!("{:?}", array)).into()),
426                }
427            }
428            ArrowDataType::Utf8 => match array.as_any().downcast_ref::<StringArray>() {
429                Some(array) => Ok(Value::String(array.value(seq).to_string())),
430                None => Err(ConvertError::new("string", format!("{:?}", array)).into()),
431            },
432            ArrowDataType::LargeUtf8 => match array.as_any().downcast_ref::<LargeStringArray>() {
433                Some(array) => Ok(Value::String(array.value(seq).to_string())),
434                None => Err(ConvertError::new("large string", format!("{:?}", array)).into()),
435            },
436            ArrowDataType::Utf8View => match array.as_any().downcast_ref::<StringViewArray>() {
437                Some(array) => Ok(Value::String(array.value(seq).to_string())),
438                None => Err(ConvertError::new("string view", format!("{:?}", array)).into()),
439            },
440            // we only support timestamp in microsecond in databend
441            ArrowDataType::Timestamp(unit, tz) => {
442                match array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
443                    Some(array) => {
444                        if unit != &TimeUnit::Microsecond {
445                            return Err(ConvertError::new("timestamp", format!("{:?}", array))
446                                .with_message(format!(
447                                    "unsupported timestamp unit: {:?}, only support microsecond",
448                                    unit
449                                ))
450                                .into());
451                        }
452                        let ts = array.value(seq);
453                        match tz {
454                            None => Ok(Value::Timestamp(ts)),
455                            Some(tz) => Err(ConvertError::new("timestamp", format!("{:?}", array))
456                                .with_message(format!("non-UTC timezone not supported: {:?}", tz))
457                                .into()),
458                        }
459                    }
460                    None => Err(ConvertError::new("timestamp", format!("{:?}", array)).into()),
461                }
462            }
463            ArrowDataType::Date32 => match array.as_any().downcast_ref::<Date32Array>() {
464                Some(array) => Ok(Value::Date(array.value(seq))),
465                None => Err(ConvertError::new("date", format!("{:?}", array)).into()),
466            },
467            ArrowDataType::List(f) => match array.as_any().downcast_ref::<ListArray>() {
468                Some(array) => {
469                    let inner_array = unsafe { array.value_unchecked(seq) };
470                    let mut values = Vec::with_capacity(inner_array.len());
471                    for i in 0..inner_array.len() {
472                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
473                        values.push(value);
474                    }
475                    Ok(Value::Array(values))
476                }
477                None => Err(ConvertError::new("list", format!("{:?}", array)).into()),
478            },
479            ArrowDataType::LargeList(f) => match array.as_any().downcast_ref::<LargeListArray>() {
480                Some(array) => {
481                    let inner_array = unsafe { array.value_unchecked(seq) };
482                    let mut values = Vec::with_capacity(inner_array.len());
483                    for i in 0..inner_array.len() {
484                        let value = Value::try_from((f.as_ref(), &inner_array, i))?;
485                        values.push(value);
486                    }
487                    Ok(Value::Array(values))
488                }
489                None => Err(ConvertError::new("large list", format!("{:?}", array)).into()),
490            },
491            ArrowDataType::Map(f, _) => match array.as_any().downcast_ref::<MapArray>() {
492                Some(array) => {
493                    if let ArrowDataType::Struct(fs) = f.data_type() {
494                        let inner_array = unsafe { array.value_unchecked(seq) };
495                        let mut values = Vec::with_capacity(inner_array.len());
496                        for i in 0..inner_array.len() {
497                            let key = Value::try_from((fs[0].as_ref(), inner_array.column(0), i))?;
498                            let val = Value::try_from((fs[1].as_ref(), inner_array.column(1), i))?;
499                            values.push((key, val));
500                        }
501                        Ok(Value::Map(values))
502                    } else {
503                        Err(
504                            ConvertError::new("invalid map inner type", format!("{:?}", array))
505                                .into(),
506                        )
507                    }
508                }
509                None => Err(ConvertError::new("map", format!("{:?}", array)).into()),
510            },
511            ArrowDataType::Struct(fs) => match array.as_any().downcast_ref::<StructArray>() {
512                Some(array) => {
513                    let mut values = Vec::with_capacity(array.len());
514                    for (f, inner_array) in fs.iter().zip(array.columns().iter()) {
515                        let value = Value::try_from((f.as_ref(), inner_array, seq))?;
516                        values.push(value);
517                    }
518                    Ok(Value::Tuple(values))
519                }
520                None => Err(ConvertError::new("struct", format!("{:?}", array)).into()),
521            },
522            _ => Err(ConvertError::new("unsupported data type", format!("{:?}", array)).into()),
523        }
524    }
525}
526
527impl TryFrom<Value> for String {
528    type Error = Error;
529    fn try_from(val: Value) -> Result<Self> {
530        match val {
531            Value::String(s) => Ok(s),
532            Value::Bitmap(s) => Ok(s),
533            Value::Number(NumberValue::Decimal128(v, s)) => Ok(display_decimal_128(v, s.scale)),
534            Value::Number(NumberValue::Decimal256(v, s)) => Ok(display_decimal_256(v, s.scale)),
535            Value::Geometry(s) => Ok(s),
536            Value::Geography(s) => Ok(s),
537            Value::Interval(s) => Ok(s),
538            Value::Variant(s) => Ok(s),
539            _ => Err(ConvertError::new("string", format!("{:?}", val)).into()),
540        }
541    }
542}
543
544impl TryFrom<Value> for bool {
545    type Error = Error;
546    fn try_from(val: Value) -> Result<Self> {
547        match val {
548            Value::Boolean(b) => Ok(b),
549            Value::Number(n) => Ok(n != NumberValue::Int8(0)),
550            _ => Err(ConvertError::new("bool", format!("{:?}", val)).into()),
551        }
552    }
553}
554
555// This macro implements TryFrom for NumberValue
556macro_rules! impl_try_from_number_value {
557    ($($t:ty),*) => {
558        $(
559            impl TryFrom<Value> for $t {
560                type Error = Error;
561                fn try_from(val: Value) -> Result<Self> {
562                    match val {
563                        Value::Number(NumberValue::Int8(i)) => Ok(i as $t),
564                        Value::Number(NumberValue::Int16(i)) => Ok(i as $t),
565                        Value::Number(NumberValue::Int32(i)) => Ok(i as $t),
566                        Value::Number(NumberValue::Int64(i)) => Ok(i as $t),
567                        Value::Number(NumberValue::UInt8(i)) => Ok(i as $t),
568                        Value::Number(NumberValue::UInt16(i)) => Ok(i as $t),
569                        Value::Number(NumberValue::UInt32(i)) => Ok(i as $t),
570                        Value::Number(NumberValue::UInt64(i)) => Ok(i as $t),
571                        Value::Number(NumberValue::Float32(i)) => Ok(i as $t),
572                        Value::Number(NumberValue::Float64(i)) => Ok(i as $t),
573                        Value::Date(i) => Ok(i as $t),
574                        Value::Timestamp(i) => Ok(i as $t),
575                        _ => Err(ConvertError::new("number", format!("{:?}", val)).into()),
576                    }
577                }
578            }
579        )*
580    };
581}
582
583impl_try_from_number_value!(u8);
584impl_try_from_number_value!(u16);
585impl_try_from_number_value!(u32);
586impl_try_from_number_value!(u64);
587impl_try_from_number_value!(i8);
588impl_try_from_number_value!(i16);
589impl_try_from_number_value!(i32);
590impl_try_from_number_value!(i64);
591impl_try_from_number_value!(f32);
592impl_try_from_number_value!(f64);
593
594impl TryFrom<Value> for NaiveDateTime {
595    type Error = Error;
596    fn try_from(val: Value) -> Result<Self> {
597        match val {
598            Value::Timestamp(i) => {
599                let secs = i / 1_000_000;
600                let nanos = ((i % 1_000_000) * 1000) as u32;
601                match DateTime::from_timestamp(secs, nanos) {
602                    Some(t) => Ok(t.naive_utc()),
603                    None => Err(ConvertError::new("NaiveDateTime", "".to_string()).into()),
604                }
605            }
606            _ => Err(ConvertError::new("NaiveDateTime", format!("{}", val)).into()),
607        }
608    }
609}
610
611impl TryFrom<Value> for NaiveDate {
612    type Error = Error;
613    fn try_from(val: Value) -> Result<Self> {
614        match val {
615            Value::Date(i) => {
616                let days = i + DAYS_FROM_CE;
617                match NaiveDate::from_num_days_from_ce_opt(days) {
618                    Some(d) => Ok(d),
619                    None => Err(ConvertError::new("NaiveDate", "".to_string()).into()),
620                }
621            }
622            _ => Err(ConvertError::new("NaiveDate", format!("{}", val)).into()),
623        }
624    }
625}
626
627impl<V> TryFrom<Value> for Vec<V>
628where
629    V: TryFrom<Value, Error = Error>,
630{
631    type Error = Error;
632    fn try_from(val: Value) -> Result<Self> {
633        match val {
634            Value::Binary(vals) => vals
635                .into_iter()
636                .map(|v| V::try_from(Value::Number(NumberValue::UInt8(v))))
637                .collect(),
638            Value::Array(vals) => vals.into_iter().map(V::try_from).collect(),
639            Value::EmptyArray => Ok(vec![]),
640            _ => Err(ConvertError::new("Vec", format!("{}", val)).into()),
641        }
642    }
643}
644
645impl<K, V> TryFrom<Value> for HashMap<K, V>
646where
647    K: TryFrom<Value, Error = Error> + Eq + Hash,
648    V: TryFrom<Value, Error = Error>,
649{
650    type Error = Error;
651    fn try_from(val: Value) -> Result<Self> {
652        match val {
653            Value::Map(kvs) => {
654                let mut map = HashMap::new();
655                for (k, v) in kvs {
656                    let k = K::try_from(k)?;
657                    let v = V::try_from(v)?;
658                    map.insert(k, v);
659                }
660                Ok(map)
661            }
662            Value::EmptyMap => Ok(HashMap::new()),
663            _ => Err(ConvertError::new("HashMap", format!("{}", val)).into()),
664        }
665    }
666}
667
668macro_rules! replace_expr {
669    ($_t:tt $sub:expr) => {
670        $sub
671    };
672}
673
674// This macro implements TryFrom for tuple of types
675macro_rules! impl_tuple_from_value {
676    ( $($Ti:tt),+ ) => {
677        impl<$($Ti),+> TryFrom<Value> for ($($Ti,)+)
678        where
679            $($Ti: TryFrom<Value>),+
680        {
681            type Error = String;
682            fn try_from(val: Value) -> Result<Self, String> {
683                // It is not possible yet to get the number of metavariable repetitions
684                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
685                // This is a workaround
686                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
687
688                match val {
689                    Value::Tuple(vals) => {
690                        if expected_len != vals.len() {
691                            return Err(format!("value tuple size mismatch: expected {} columns, got {}", expected_len, vals.len()));
692                        }
693                        let mut vals_iter = vals.into_iter().enumerate();
694
695                        Ok((
696                            $(
697                                {
698                                    let (col_ix, col_value) = vals_iter
699                                        .next()
700                                        .unwrap(); // vals_iter size is checked before this code is reached,
701                                                   // so it is safe to unwrap
702                                    let t = col_value.get_type();
703                                    $Ti::try_from(col_value)
704                                        .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
705                                }
706                            ,)+
707                        ))
708                    }
709                    _ => Err(format!("expected tuple, got {:?}", val)),
710                }
711            }
712        }
713    }
714}
715
716// Implement From Value for tuples of size up to 16
717impl_tuple_from_value!(T1);
718impl_tuple_from_value!(T1, T2);
719impl_tuple_from_value!(T1, T2, T3);
720impl_tuple_from_value!(T1, T2, T3, T4);
721impl_tuple_from_value!(T1, T2, T3, T4, T5);
722impl_tuple_from_value!(T1, T2, T3, T4, T5, T6);
723impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7);
724impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8);
725impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
726impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
727impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
728impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
729impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
730impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
731impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
732impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
733impl_tuple_from_value!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17);
734impl_tuple_from_value!(
735    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18
736);
737impl_tuple_from_value!(
738    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19
739);
740impl_tuple_from_value!(
741    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20
742);
743impl_tuple_from_value!(
744    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21
745);
746impl_tuple_from_value!(
747    T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21,
748    T22
749);
750
751// This macro implements TryFrom to Option for Nullable column
752macro_rules! impl_try_from_to_option {
753    ($($t:ty),*) => {
754        $(
755            impl TryFrom<Value> for Option<$t> {
756                type Error = Error;
757                fn try_from(val: Value) -> Result<Self> {
758                    match val {
759                        Value::Null => Ok(None),
760                        _ => {
761                            let inner: $t = val.try_into()?;
762                            Ok(Some(inner))
763                        },
764                    }
765
766                }
767            }
768        )*
769    };
770}
771
772impl_try_from_to_option!(String);
773impl_try_from_to_option!(bool);
774impl_try_from_to_option!(u8);
775impl_try_from_to_option!(u16);
776impl_try_from_to_option!(u32);
777impl_try_from_to_option!(u64);
778impl_try_from_to_option!(i8);
779impl_try_from_to_option!(i16);
780impl_try_from_to_option!(i32);
781impl_try_from_to_option!(i64);
782impl_try_from_to_option!(f32);
783impl_try_from_to_option!(f64);
784impl_try_from_to_option!(NaiveDateTime);
785impl_try_from_to_option!(NaiveDate);
786
787impl std::fmt::Display for NumberValue {
788    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
789        match self {
790            NumberValue::Int8(i) => write!(f, "{}", i),
791            NumberValue::Int16(i) => write!(f, "{}", i),
792            NumberValue::Int32(i) => write!(f, "{}", i),
793            NumberValue::Int64(i) => write!(f, "{}", i),
794            NumberValue::UInt8(i) => write!(f, "{}", i),
795            NumberValue::UInt16(i) => write!(f, "{}", i),
796            NumberValue::UInt32(i) => write!(f, "{}", i),
797            NumberValue::UInt64(i) => write!(f, "{}", i),
798            NumberValue::Float32(i) => write!(f, "{}", i),
799            NumberValue::Float64(i) => write!(f, "{}", i),
800            NumberValue::Decimal128(v, s) => write!(f, "{}", display_decimal_128(*v, s.scale)),
801            NumberValue::Decimal256(v, s) => write!(f, "{}", display_decimal_256(*v, s.scale)),
802        }
803    }
804}
805
806impl std::fmt::Display for Value {
807    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
808        encode_value(f, self, true)
809    }
810}
811
812// Compatible with Databend, inner values of nested types are quoted.
813fn encode_value(f: &mut std::fmt::Formatter<'_>, val: &Value, raw: bool) -> std::fmt::Result {
814    match val {
815        Value::Null => write!(f, "NULL"),
816        Value::EmptyArray => write!(f, "[]"),
817        Value::EmptyMap => write!(f, "{{}}"),
818        Value::Boolean(b) => {
819            if *b {
820                write!(f, "true")
821            } else {
822                write!(f, "false")
823            }
824        }
825        Value::Number(n) => write!(f, "{}", n),
826        Value::Binary(s) => write!(f, "{}", hex::encode_upper(s)),
827        Value::String(s)
828        | Value::Bitmap(s)
829        | Value::Variant(s)
830        | Value::Interval(s)
831        | Value::Geometry(s)
832        | Value::Geography(s) => {
833            if raw {
834                write!(f, "{}", s)
835            } else {
836                write!(f, "'{}'", s)
837            }
838        }
839        Value::Timestamp(micros) => {
840            let (mut secs, mut nanos) = (*micros / 1_000_000, (*micros % 1_000_000) * 1_000);
841            if nanos < 0 {
842                secs -= 1;
843                nanos += 1_000_000_000;
844            }
845            let t = DateTime::from_timestamp(secs, nanos as _).unwrap_or_default();
846            let t = t.naive_utc();
847            if raw {
848                write!(f, "{}", t.format(TIMESTAMP_FORMAT))
849            } else {
850                write!(f, "'{}'", t.format(TIMESTAMP_FORMAT))
851            }
852        }
853        Value::Date(i) => {
854            let days = i + DAYS_FROM_CE;
855            let d = NaiveDate::from_num_days_from_ce_opt(days).unwrap_or_default();
856            if raw {
857                write!(f, "{}", d)
858            } else {
859                write!(f, "'{}'", d)
860            }
861        }
862        Value::Array(vals) => {
863            write!(f, "[")?;
864            for (i, val) in vals.iter().enumerate() {
865                if i > 0 {
866                    write!(f, ",")?;
867                }
868                encode_value(f, val, false)?;
869            }
870            write!(f, "]")?;
871            Ok(())
872        }
873        Value::Map(kvs) => {
874            write!(f, "{{")?;
875            for (i, (key, val)) in kvs.iter().enumerate() {
876                if i > 0 {
877                    write!(f, ",")?;
878                }
879                encode_value(f, key, false)?;
880                write!(f, ":")?;
881                encode_value(f, val, false)?;
882            }
883            write!(f, "}}")?;
884            Ok(())
885        }
886        Value::Tuple(vals) => {
887            write!(f, "(")?;
888            for (i, val) in vals.iter().enumerate() {
889                if i > 0 {
890                    write!(f, ",")?;
891                }
892                encode_value(f, val, false)?;
893            }
894            write!(f, ")")?;
895            Ok(())
896        }
897    }
898}
899
900pub fn display_decimal_128(num: i128, scale: u8) -> String {
901    let mut buf = String::new();
902    if scale == 0 {
903        write!(buf, "{}", num).unwrap();
904    } else {
905        let pow_scale = 10_i128.pow(scale as u32);
906        if num >= 0 {
907            write!(
908                buf,
909                "{}.{:0>width$}",
910                num / pow_scale,
911                (num % pow_scale).abs(),
912                width = scale as usize
913            )
914            .unwrap();
915        } else {
916            write!(
917                buf,
918                "-{}.{:0>width$}",
919                -num / pow_scale,
920                (num % pow_scale).abs(),
921                width = scale as usize
922            )
923            .unwrap();
924        }
925    }
926    buf
927}
928
929pub fn display_decimal_256(num: i256, scale: u8) -> String {
930    let mut buf = String::new();
931    if scale == 0 {
932        write!(buf, "{}", num).unwrap();
933    } else {
934        let pow_scale = i256::from_i128(10i128).wrapping_pow(scale as u32);
935        let width = scale as usize;
936        // -1/10 = 0
937        let (int_part, neg) = if num >= i256::ZERO {
938            (num / pow_scale, "")
939        } else {
940            (-num / pow_scale, "-")
941        };
942        let frac_part = (num % pow_scale).wrapping_abs();
943
944        match frac_part.to_i128() {
945            Some(frac_part) => {
946                write!(
947                    buf,
948                    "{}{}.{:0>width$}",
949                    neg,
950                    int_part,
951                    frac_part,
952                    width = width
953                )
954                .unwrap();
955            }
956            None => {
957                // fractional part is too big for display,
958                // split it into two parts.
959                let pow = i256::from_i128(10i128).wrapping_pow(38);
960                let frac_high_part = frac_part / pow;
961                let frac_low_part = frac_part % pow;
962                let frac_width = (scale - 38) as usize;
963
964                write!(
965                    buf,
966                    "{}{}.{:0>width$}{}",
967                    neg,
968                    int_part,
969                    frac_high_part.to_i128().unwrap(),
970                    frac_low_part.to_i128().unwrap(),
971                    width = frac_width
972                )
973                .unwrap();
974            }
975        }
976    }
977    buf
978}
979
980/// assume text is from
981/// used only for expr, so put more weight on readability
982pub fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
983    let mut start = 0;
984    let bytes = text.as_bytes();
985    let mut is_negative = false;
986
987    // Check if the number is negative
988    if bytes[start] == b'-' {
989        is_negative = true;
990        start += 1;
991    }
992
993    while start < text.len() && bytes[start] == b'0' {
994        start += 1
995    }
996    let text = &text[start..];
997    let point_pos = text.find('.');
998    let e_pos = text.find(|c| ['E', 'e'].contains(&c));
999    let (i_part, f_part, e_part) = match (point_pos, e_pos) {
1000        (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
1001        (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
1002        (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
1003        (None, None) => (text, "", None),
1004    };
1005    let exp = match e_part {
1006        Some(s) => s.parse::<i32>()?,
1007        None => 0,
1008    };
1009    if i_part.len() as i32 + exp > 76 {
1010        Err(ConvertError::new("decimal", format!("{:?}", text)).into())
1011    } else {
1012        let mut digits = Vec::with_capacity(76);
1013        digits.extend_from_slice(i_part.as_bytes());
1014        digits.extend_from_slice(f_part.as_bytes());
1015        if digits.is_empty() {
1016            digits.push(b'0')
1017        }
1018        let scale = f_part.len() as i32 - exp;
1019        if scale < 0 {
1020            // e.g 123.1e3
1021            for _ in 0..(-scale) {
1022                digits.push(b'0')
1023            }
1024        };
1025
1026        let precision = std::cmp::min(digits.len(), 76);
1027        let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
1028
1029        let result = if size.precision > 38 {
1030            NumberValue::Decimal256(i256::from_string(digits).unwrap(), size)
1031        } else {
1032            NumberValue::Decimal128(digits.parse::<i128>()?, size)
1033        };
1034
1035        // If the number was negative, negate the result
1036        if is_negative {
1037            match result {
1038                NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
1039                NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
1040                _ => Ok(result),
1041            }
1042        } else {
1043            Ok(result)
1044        }
1045    }
1046}
1047
1048#[derive(Debug, Copy, Clone, PartialEq, Default)]
1049pub struct Interval {
1050    pub months: i32,
1051    pub days: i32,
1052    pub micros: i64,
1053}
1054
1055impl Display for Interval {
1056    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1057        let mut buffer = [0u8; 70];
1058        let len = IntervalToStringCast::format(*self, &mut buffer);
1059        write!(f, "{}", String::from_utf8_lossy(&buffer[..len]))
1060    }
1061}
1062
1063struct IntervalToStringCast;
1064
1065impl IntervalToStringCast {
1066    fn format_signed_number(value: i64, buffer: &mut [u8], length: &mut usize) {
1067        let s = value.to_string();
1068        let bytes = s.as_bytes();
1069        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1070        *length += bytes.len();
1071    }
1072
1073    fn format_two_digits(value: i64, buffer: &mut [u8], length: &mut usize) {
1074        let s = format!("{:02}", value.abs());
1075        let bytes = s.as_bytes();
1076        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1077        *length += bytes.len();
1078    }
1079
1080    fn format_interval_value(value: i32, buffer: &mut [u8], length: &mut usize, name: &str) {
1081        if value == 0 {
1082            return;
1083        }
1084        if *length != 0 {
1085            buffer[*length] = b' ';
1086            *length += 1;
1087        }
1088        Self::format_signed_number(value as i64, buffer, length);
1089        let name_bytes = name.as_bytes();
1090        buffer[*length..*length + name_bytes.len()].copy_from_slice(name_bytes);
1091        *length += name_bytes.len();
1092        if value != 1 && value != -1 {
1093            buffer[*length] = b's';
1094            *length += 1;
1095        }
1096    }
1097
1098    fn format_micros(mut micros: i64, buffer: &mut [u8], length: &mut usize) {
1099        if micros < 0 {
1100            micros = -micros;
1101        }
1102        let s = format!("{:06}", micros);
1103        let bytes = s.as_bytes();
1104        buffer[*length..*length + bytes.len()].copy_from_slice(bytes);
1105        *length += bytes.len();
1106
1107        while *length > 0 && buffer[*length - 1] == b'0' {
1108            *length -= 1;
1109        }
1110    }
1111
1112    pub fn format(interval: Interval, buffer: &mut [u8]) -> usize {
1113        let mut length = 0;
1114        if interval.months != 0 {
1115            let years = interval.months / 12;
1116            let months = interval.months - years * 12;
1117            Self::format_interval_value(years, buffer, &mut length, " year");
1118            Self::format_interval_value(months, buffer, &mut length, " month");
1119        }
1120        if interval.days != 0 {
1121            Self::format_interval_value(interval.days, buffer, &mut length, " day");
1122        }
1123        if interval.micros != 0 {
1124            if length != 0 {
1125                buffer[length] = b' ';
1126                length += 1;
1127            }
1128            let mut micros = interval.micros;
1129            if micros < 0 {
1130                buffer[length] = b'-';
1131                length += 1;
1132                micros = -micros;
1133            }
1134            let hour = micros / MINROS_PER_HOUR;
1135            micros -= hour * MINROS_PER_HOUR;
1136            let min = micros / MICROS_PER_MINUTE;
1137            micros -= min * MICROS_PER_MINUTE;
1138            let sec = micros / MICROS_PER_SEC;
1139            micros -= sec * MICROS_PER_SEC;
1140
1141            Self::format_signed_number(hour, buffer, &mut length);
1142            buffer[length] = b':';
1143            length += 1;
1144            Self::format_two_digits(min, buffer, &mut length);
1145            buffer[length] = b':';
1146            length += 1;
1147            Self::format_two_digits(sec, buffer, &mut length);
1148            if micros != 0 {
1149                buffer[length] = b'.';
1150                length += 1;
1151                Self::format_micros(micros, buffer, &mut length);
1152            }
1153        } else if length == 0 {
1154            buffer[..8].copy_from_slice(b"00:00:00");
1155            return 8;
1156        }
1157        length
1158    }
1159}
1160
1161impl Interval {
1162    pub fn from_string(str: &str) -> Result<Self> {
1163        Self::from_cstring(str.as_bytes())
1164    }
1165
1166    pub fn from_cstring(str: &[u8]) -> Result<Self> {
1167        let mut result = Interval::default();
1168        let mut pos = 0;
1169        let len = str.len();
1170        let mut found_any = false;
1171
1172        if len == 0 {
1173            return Err(Error::BadArgument("Empty string".to_string()));
1174        }
1175        match str[pos] {
1176            b'@' => {
1177                pos += 1;
1178            }
1179            b'P' | b'p' => {
1180                return Err(Error::BadArgument(
1181                    "Posix intervals not supported yet".to_string(),
1182                ));
1183            }
1184            _ => {}
1185        }
1186
1187        while pos < len {
1188            match str[pos] {
1189                b' ' | b'\t' | b'\n' => {
1190                    pos += 1;
1191                    continue;
1192                }
1193                b'0'..=b'9' => {
1194                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1195                    pos += next_pos;
1196                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1197
1198                    pos += next_pos;
1199                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1200                    found_any = true;
1201                }
1202                b'-' => {
1203                    pos += 1;
1204                    let (number, fraction, next_pos) = parse_number(&str[pos..])?;
1205                    let number = -number;
1206                    let fraction = -fraction;
1207
1208                    pos += next_pos;
1209
1210                    let (specifier, next_pos) = parse_identifier(&str[pos..]);
1211
1212                    pos += next_pos;
1213                    let _ = apply_specifier(&mut result, number, fraction, &specifier);
1214                    found_any = true;
1215                }
1216                b'a' | b'A' => {
1217                    if len - pos < 3
1218                        || str[pos + 1] != b'g' && str[pos + 1] != b'G'
1219                        || str[pos + 2] != b'o' && str[pos + 2] != b'O'
1220                    {
1221                        return Err(Error::BadArgument("Invalid 'ago' specifier".to_string()));
1222                    }
1223                    pos += 3;
1224                    while pos < len {
1225                        match str[pos] {
1226                            b' ' | b'\t' | b'\n' => {
1227                                pos += 1;
1228                            }
1229                            _ => {
1230                                return Err(Error::BadArgument(
1231                                    "Trailing characters after 'ago'".to_string(),
1232                                ));
1233                            }
1234                        }
1235                    }
1236                    result.months = -result.months;
1237                    result.days = -result.days;
1238                    result.micros = -result.micros;
1239                    return Ok(result);
1240                }
1241                _ => {
1242                    return Err(Error::BadArgument(format!(
1243                        "Unexpected character at position {}",
1244                        pos
1245                    )));
1246                }
1247            }
1248        }
1249
1250        if !found_any {
1251            return Err(Error::BadArgument(
1252                "No interval specifiers found".to_string(),
1253            ));
1254        }
1255        Ok(result)
1256    }
1257}
1258
1259fn parse_number(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1260    let mut number: i64 = 0;
1261    let mut fraction: i64 = 0;
1262    let mut pos = 0;
1263
1264    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1265        number = number
1266            .checked_mul(10)
1267            .ok_or(Error::BadArgument("Number too large".to_string()))?
1268            + (bytes[pos] - b'0') as i64;
1269        pos += 1;
1270    }
1271
1272    if pos < bytes.len() && bytes[pos] == b'.' {
1273        pos += 1;
1274        let mut mult: i64 = 100000;
1275        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1276            if mult > 0 {
1277                fraction += (bytes[pos] - b'0') as i64 * mult;
1278            }
1279            mult /= 10;
1280            pos += 1;
1281        }
1282    }
1283    if pos < bytes.len() && bytes[pos] == b':' {
1284        // parse time format HH:MM:SS[.FFFFFF]
1285        let time_bytes = &bytes[pos..];
1286        let mut time_pos = 0;
1287        let mut total_micros: i64 = number * 60 * 60 * MICROS_PER_SEC;
1288        let mut colon_count = 0;
1289
1290        while colon_count < 2 && time_bytes.len() > time_pos {
1291            let (minute, _, next_pos) = parse_time_part(&time_bytes[time_pos..])?;
1292            let minute_micros = minute * 60 * MICROS_PER_SEC;
1293            total_micros += minute_micros;
1294            time_pos += next_pos;
1295
1296            if time_bytes.len() > time_pos && time_bytes[time_pos] == b':' {
1297                time_pos += 1;
1298                colon_count += 1;
1299            } else {
1300                break;
1301            }
1302        }
1303        if time_bytes.len() > time_pos {
1304            let (seconds, nanos, next_pos) = parse_time_part_with_nanos(&time_bytes[time_pos..])?;
1305            total_micros += seconds * MICROS_PER_SEC + nanos;
1306            time_pos += next_pos;
1307        }
1308        return Ok((total_micros, 0, pos + time_pos));
1309    }
1310
1311    if pos == 0 {
1312        return Err(Error::BadArgument("Expected number".to_string()));
1313    }
1314
1315    Ok((number, fraction, pos))
1316}
1317
1318fn parse_time_part(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1319    let mut number: i64 = 0;
1320    let mut pos = 0;
1321    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1322        number = number
1323            .checked_mul(10)
1324            .ok_or(Error::BadArgument("Number too large".to_string()))?
1325            + (bytes[pos] - b'0') as i64;
1326        pos += 1;
1327    }
1328    Ok((number, 0, pos))
1329}
1330
1331fn parse_time_part_with_nanos(bytes: &[u8]) -> Result<(i64, i64, usize)> {
1332    let mut number: i64 = 0;
1333    let mut fraction: i64 = 0;
1334    let mut pos = 0;
1335
1336    while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1337        number = number
1338            .checked_mul(10)
1339            .ok_or(Error::BadArgument("Number too large".to_string()))?
1340            + (bytes[pos] - b'0') as i64;
1341        pos += 1;
1342    }
1343
1344    if pos < bytes.len() && bytes[pos] == b'.' {
1345        pos += 1;
1346        let mut mult: i64 = 100000000;
1347        while pos < bytes.len() && bytes[pos].is_ascii_digit() {
1348            if mult > 0 {
1349                fraction += (bytes[pos] - b'0') as i64 * mult;
1350            }
1351            mult /= 10;
1352            pos += 1;
1353        }
1354    }
1355
1356    Ok((number, fraction, pos))
1357}
1358
1359fn parse_identifier(s: &[u8]) -> (String, usize) {
1360    let mut pos = 0;
1361    while pos < s.len() && (s[pos] == b' ' || s[pos] == b'\t' || s[pos] == b'\n') {
1362        pos += 1;
1363    }
1364    let start_pos = pos;
1365    while pos < s.len() && (s[pos].is_ascii_alphabetic()) {
1366        pos += 1;
1367    }
1368
1369    if pos == start_pos {
1370        return ("".to_string(), pos);
1371    }
1372
1373    let identifier = String::from_utf8_lossy(&s[start_pos..pos]).to_string();
1374    (identifier, pos)
1375}
1376
1377#[derive(Debug, PartialEq, Eq)]
1378enum DatePartSpecifier {
1379    Millennium,
1380    Century,
1381    Decade,
1382    Year,
1383    Quarter,
1384    Month,
1385    Day,
1386    Week,
1387    Microseconds,
1388    Milliseconds,
1389    Second,
1390    Minute,
1391    Hour,
1392}
1393
1394fn try_get_date_part_specifier(specifier_str: &str) -> Result<DatePartSpecifier> {
1395    match specifier_str.to_lowercase().as_str() {
1396        "millennium" | "millennia" => Ok(DatePartSpecifier::Millennium),
1397        "century" | "centuries" => Ok(DatePartSpecifier::Century),
1398        "decade" | "decades" => Ok(DatePartSpecifier::Decade),
1399        "year" | "years" | "y" => Ok(DatePartSpecifier::Year),
1400        "quarter" | "quarters" => Ok(DatePartSpecifier::Quarter),
1401        "month" | "months" | "mon" => Ok(DatePartSpecifier::Month),
1402        "day" | "days" | "d" => Ok(DatePartSpecifier::Day),
1403        "week" | "weeks" | "w" => Ok(DatePartSpecifier::Week),
1404        "microsecond" | "microseconds" | "us" => Ok(DatePartSpecifier::Microseconds),
1405        "millisecond" | "milliseconds" | "ms" => Ok(DatePartSpecifier::Milliseconds),
1406        "second" | "seconds" | "s" => Ok(DatePartSpecifier::Second),
1407        "minute" | "minutes" | "m" => Ok(DatePartSpecifier::Minute),
1408        "hour" | "hours" | "h" => Ok(DatePartSpecifier::Hour),
1409        _ => Err(Error::BadArgument(format!(
1410            "Invalid date part specifier: {}",
1411            specifier_str
1412        ))),
1413    }
1414}
1415
1416const MICROS_PER_SEC: i64 = 1_000_000;
1417const MICROS_PER_MSEC: i64 = 1_000;
1418const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SEC;
1419const MINROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE;
1420const DAYS_PER_WEEK: i32 = 7;
1421const MONTHS_PER_QUARTER: i32 = 3;
1422const MONTHS_PER_YEAR: i32 = 12;
1423const MONTHS_PER_DECADE: i32 = 120;
1424const MONTHS_PER_CENTURY: i32 = 1200;
1425const MONTHS_PER_MILLENNIUM: i32 = 12000;
1426
1427fn apply_specifier(
1428    result: &mut Interval,
1429    number: i64,
1430    fraction: i64,
1431    specifier_str: &str,
1432) -> Result<()> {
1433    if specifier_str.is_empty() {
1434        result.micros = result
1435            .micros
1436            .checked_add(number)
1437            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1438        result.micros = result
1439            .micros
1440            .checked_add(fraction)
1441            .ok_or(Error::BadArgument("Overflow".to_string()))?;
1442        return Ok(());
1443    }
1444
1445    let specifier = try_get_date_part_specifier(specifier_str)?;
1446    match specifier {
1447        DatePartSpecifier::Millennium => {
1448            result.months = result
1449                .months
1450                .checked_add(
1451                    number
1452                        .checked_mul(MONTHS_PER_MILLENNIUM as i64)
1453                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1454                        .try_into()
1455                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1456                )
1457                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1458        }
1459        DatePartSpecifier::Century => {
1460            result.months = result
1461                .months
1462                .checked_add(
1463                    number
1464                        .checked_mul(MONTHS_PER_CENTURY as i64)
1465                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1466                        .try_into()
1467                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1468                )
1469                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1470        }
1471        DatePartSpecifier::Decade => {
1472            result.months = result
1473                .months
1474                .checked_add(
1475                    number
1476                        .checked_mul(MONTHS_PER_DECADE as i64)
1477                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1478                        .try_into()
1479                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1480                )
1481                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1482        }
1483        DatePartSpecifier::Year => {
1484            result.months = result
1485                .months
1486                .checked_add(
1487                    number
1488                        .checked_mul(MONTHS_PER_YEAR as i64)
1489                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1490                        .try_into()
1491                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1492                )
1493                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1494        }
1495        DatePartSpecifier::Quarter => {
1496            result.months = result
1497                .months
1498                .checked_add(
1499                    number
1500                        .checked_mul(MONTHS_PER_QUARTER as i64)
1501                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1502                        .try_into()
1503                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1504                )
1505                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1506        }
1507        DatePartSpecifier::Month => {
1508            result.months = result
1509                .months
1510                .checked_add(
1511                    number
1512                        .try_into()
1513                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1514                )
1515                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1516        }
1517        DatePartSpecifier::Day => {
1518            result.days = result
1519                .days
1520                .checked_add(
1521                    number
1522                        .try_into()
1523                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1524                )
1525                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1526        }
1527        DatePartSpecifier::Week => {
1528            result.days = result
1529                .days
1530                .checked_add(
1531                    number
1532                        .checked_mul(DAYS_PER_WEEK as i64)
1533                        .ok_or(Error::BadArgument("Overflow".to_string()))?
1534                        .try_into()
1535                        .map_err(|_| Error::BadArgument("Overflow".to_string()))?,
1536                )
1537                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1538        }
1539        DatePartSpecifier::Microseconds => {
1540            result.micros = result
1541                .micros
1542                .checked_add(number)
1543                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1544        }
1545        DatePartSpecifier::Milliseconds => {
1546            result.micros = result
1547                .micros
1548                .checked_add(
1549                    number
1550                        .checked_mul(MICROS_PER_MSEC)
1551                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1552                )
1553                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1554        }
1555        DatePartSpecifier::Second => {
1556            result.micros = result
1557                .micros
1558                .checked_add(
1559                    number
1560                        .checked_mul(MICROS_PER_SEC)
1561                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1562                )
1563                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1564        }
1565        DatePartSpecifier::Minute => {
1566            result.micros = result
1567                .micros
1568                .checked_add(
1569                    number
1570                        .checked_mul(MICROS_PER_MINUTE)
1571                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1572                )
1573                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1574        }
1575        DatePartSpecifier::Hour => {
1576            result.micros = result
1577                .micros
1578                .checked_add(
1579                    number
1580                        .checked_mul(MINROS_PER_HOUR)
1581                        .ok_or(Error::BadArgument("Overflow".to_string()))?,
1582                )
1583                .ok_or(Error::BadArgument("Overflow".to_string()))?;
1584        }
1585    }
1586    Ok(())
1587}
1588
1589pub fn parse_geometry(raw_data: &[u8]) -> Result<String> {
1590    let mut data = Cursor::new(raw_data);
1591    let wkt = Ewkt::from_wkb(&mut data, WkbDialect::Ewkb)?;
1592    Ok(wkt.0)
1593}
1594
1595struct ValueDecoder {}
1596
1597impl ValueDecoder {
1598    fn read_field<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1599        match ty {
1600            DataType::Null => self.read_null(reader),
1601            DataType::EmptyArray => self.read_empty_array(reader),
1602            DataType::EmptyMap => self.read_empty_map(reader),
1603            DataType::Boolean => self.read_bool(reader),
1604            DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
1605            DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
1606            DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
1607            DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
1608            DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
1609            DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
1610            DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
1611            DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
1612            DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
1613            DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
1614            DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
1615            DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
1616            DataType::String => self.read_string(reader),
1617            DataType::Binary => self.read_binary(reader),
1618            DataType::Timestamp => self.read_timestamp(reader),
1619            DataType::Date => self.read_date(reader),
1620            DataType::Bitmap => self.read_bitmap(reader),
1621            DataType::Variant => self.read_variant(reader),
1622            DataType::Geometry => self.read_geometry(reader),
1623            DataType::Interval => self.read_interval(reader),
1624            DataType::Geography => self.read_geography(reader),
1625            DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
1626            DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
1627            DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
1628            DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
1629        }
1630    }
1631
1632    fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
1633        let pos = reader.checkpoint();
1634        if reader.ignore_bytes(bs) {
1635            true
1636        } else {
1637            reader.rollback(pos);
1638            false
1639        }
1640    }
1641
1642    fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1643        if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
1644            Ok(Value::Null)
1645        } else {
1646            let buf = reader.fill_buf()?;
1647            Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
1648        }
1649    }
1650
1651    fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1652        if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
1653            Ok(Value::Boolean(true))
1654        } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
1655            Ok(Value::Boolean(false))
1656        } else {
1657            let buf = reader.fill_buf()?;
1658            Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
1659        }
1660    }
1661
1662    fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1663        let v: i8 = reader.read_int_text()?;
1664        Ok(Value::Number(NumberValue::Int8(v)))
1665    }
1666
1667    fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1668        let v: i16 = reader.read_int_text()?;
1669        Ok(Value::Number(NumberValue::Int16(v)))
1670    }
1671
1672    fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1673        let v: i32 = reader.read_int_text()?;
1674        Ok(Value::Number(NumberValue::Int32(v)))
1675    }
1676
1677    fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1678        let v: i64 = reader.read_int_text()?;
1679        Ok(Value::Number(NumberValue::Int64(v)))
1680    }
1681
1682    fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1683        let v: u8 = reader.read_int_text()?;
1684        Ok(Value::Number(NumberValue::UInt8(v)))
1685    }
1686
1687    fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1688        let v: u16 = reader.read_int_text()?;
1689        Ok(Value::Number(NumberValue::UInt16(v)))
1690    }
1691
1692    fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1693        let v: u32 = reader.read_int_text()?;
1694        Ok(Value::Number(NumberValue::UInt32(v)))
1695    }
1696
1697    fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1698        let v: u64 = reader.read_int_text()?;
1699        Ok(Value::Number(NumberValue::UInt64(v)))
1700    }
1701
1702    fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1703        let v: f32 = reader.read_float_text()?;
1704        Ok(Value::Number(NumberValue::Float32(v)))
1705    }
1706
1707    fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1708        let v: f64 = reader.read_float_text()?;
1709        Ok(Value::Number(NumberValue::Float64(v)))
1710    }
1711
1712    fn read_decimal<R: AsRef<[u8]>>(
1713        &self,
1714        size: &DecimalSize,
1715        reader: &mut Cursor<R>,
1716    ) -> Result<Value> {
1717        let buf = reader.fill_buf()?;
1718        // parser decimal need fractional part.
1719        // 10.00 and 10 is different value.
1720        let (n_in, _) = collect_number(buf);
1721        let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
1722        let d = parse_decimal(v, *size)?;
1723        reader.consume(n_in);
1724        Ok(Value::Number(d))
1725    }
1726
1727    fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1728        let mut buf = Vec::new();
1729        reader.read_quoted_text(&mut buf, b'\'')?;
1730        Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
1731    }
1732
1733    fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1734        let buf = reader.fill_buf()?;
1735        let n = collect_binary_number(buf);
1736        let v = buf[..n].to_vec();
1737        reader.consume(n);
1738        Ok(Value::Binary(hex::decode(v)?))
1739    }
1740
1741    fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1742        let mut buf = Vec::new();
1743        reader.read_quoted_text(&mut buf, b'\'')?;
1744        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1745        let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
1746        Ok(Value::Date(days))
1747    }
1748
1749    fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1750        let mut buf = Vec::new();
1751        reader.read_quoted_text(&mut buf, b'\'')?;
1752        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
1753        let ts = NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.6f")?
1754            .and_utc()
1755            .timestamp_micros();
1756        Ok(Value::Timestamp(ts))
1757    }
1758
1759    fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1760        let mut buf = Vec::new();
1761        reader.read_quoted_text(&mut buf, b'\'')?;
1762        Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
1763    }
1764
1765    fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1766        let mut buf = Vec::new();
1767        reader.read_quoted_text(&mut buf, b'\'')?;
1768        Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
1769    }
1770
1771    fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1772        let mut buf = Vec::new();
1773        reader.read_quoted_text(&mut buf, b'\'')?;
1774        Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
1775    }
1776
1777    fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1778        let mut buf = Vec::new();
1779        reader.read_quoted_text(&mut buf, b'\'')?;
1780        Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
1781    }
1782
1783    fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1784        let mut buf = Vec::new();
1785        reader.read_quoted_text(&mut buf, b'\'')?;
1786        Ok(Value::Geography(unsafe {
1787            String::from_utf8_unchecked(buf)
1788        }))
1789    }
1790
1791    fn read_nullable<R: AsRef<[u8]>>(
1792        &self,
1793        ty: &DataType,
1794        reader: &mut Cursor<R>,
1795    ) -> Result<Value> {
1796        match self.read_null(reader) {
1797            Ok(val) => Ok(val),
1798            Err(_) => self.read_field(ty, reader),
1799        }
1800    }
1801
1802    fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1803        reader.must_ignore_byte(b'[')?;
1804        reader.must_ignore_byte(b']')?;
1805        Ok(Value::EmptyArray)
1806    }
1807
1808    fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
1809        reader.must_ignore_byte(b'{')?;
1810        reader.must_ignore_byte(b'}')?;
1811        Ok(Value::EmptyArray)
1812    }
1813
1814    fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1815        let mut vals = Vec::new();
1816        reader.must_ignore_byte(b'[')?;
1817        for idx in 0.. {
1818            let _ = reader.ignore_white_spaces();
1819            if reader.ignore_byte(b']') {
1820                break;
1821            }
1822            if idx != 0 {
1823                reader.must_ignore_byte(b',')?;
1824            }
1825            let _ = reader.ignore_white_spaces();
1826            let val = self.read_field(ty, reader)?;
1827            vals.push(val);
1828        }
1829        Ok(Value::Array(vals))
1830    }
1831
1832    fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
1833        const KEY: usize = 0;
1834        const VALUE: usize = 1;
1835        let mut kvs = Vec::new();
1836        reader.must_ignore_byte(b'{')?;
1837        match ty {
1838            DataType::Tuple(inner_tys) => {
1839                for idx in 0.. {
1840                    let _ = reader.ignore_white_spaces();
1841                    if reader.ignore_byte(b'}') {
1842                        break;
1843                    }
1844                    if idx != 0 {
1845                        reader.must_ignore_byte(b',')?;
1846                    }
1847                    let _ = reader.ignore_white_spaces();
1848                    let key = self.read_field(&inner_tys[KEY], reader)?;
1849                    let _ = reader.ignore_white_spaces();
1850                    reader.must_ignore_byte(b':')?;
1851                    let _ = reader.ignore_white_spaces();
1852                    let val = self.read_field(&inner_tys[VALUE], reader)?;
1853                    kvs.push((key, val));
1854                }
1855                Ok(Value::Map(kvs))
1856            }
1857            _ => unreachable!(),
1858        }
1859    }
1860
1861    fn read_tuple<R: AsRef<[u8]>>(
1862        &self,
1863        tys: &[DataType],
1864        reader: &mut Cursor<R>,
1865    ) -> Result<Value> {
1866        let mut vals = Vec::new();
1867        reader.must_ignore_byte(b'(')?;
1868        for (idx, ty) in tys.iter().enumerate() {
1869            let _ = reader.ignore_white_spaces();
1870            if idx != 0 {
1871                reader.must_ignore_byte(b',')?;
1872            }
1873            let _ = reader.ignore_white_spaces();
1874            let val = self.read_field(ty, reader)?;
1875            vals.push(val);
1876        }
1877        reader.must_ignore_byte(b')')?;
1878        Ok(Value::Tuple(vals))
1879    }
1880}
1881
1882/// The in-memory representation of the MonthDayMicros variant of the "Interval" logical type.
1883#[derive(Debug, Copy, Clone, Default, PartialEq, PartialOrd, Ord, Eq, Hash)]
1884#[allow(non_camel_case_types)]
1885#[repr(C)]
1886pub struct months_days_micros(pub i128);
1887
1888/// Mask for extracting the lower 64 bits (microseconds).
1889pub const MICROS_MASK: i128 = 0xFFFFFFFFFFFFFFFF;
1890/// Mask for extracting the middle 32 bits (days or months).
1891pub const DAYS_MONTHS_MASK: i128 = 0xFFFFFFFF;
1892
1893impl months_days_micros {
1894    /// A new [`months_days_micros`].
1895    pub fn new(months: i32, days: i32, microseconds: i64) -> Self {
1896        let months_bits = (months as i128) << 96;
1897        // converting to u32 before i128 ensures we’re working with the raw, unsigned bit pattern of the i32 value,
1898        // preventing unwanted sign extension when that value is later used within the i128.
1899        let days_bits = ((days as u32) as i128) << 64;
1900        let micros_bits = (microseconds as u64) as i128;
1901
1902        Self(months_bits | days_bits | micros_bits)
1903    }
1904
1905    #[inline]
1906    pub fn months(&self) -> i32 {
1907        // Decoding logic
1908        ((self.0 >> 96) & DAYS_MONTHS_MASK) as i32
1909    }
1910
1911    #[inline]
1912    pub fn days(&self) -> i32 {
1913        ((self.0 >> 64) & DAYS_MONTHS_MASK) as i32
1914    }
1915
1916    #[inline]
1917    pub fn microseconds(&self) -> i64 {
1918        (self.0 & MICROS_MASK) as i64
1919    }
1920}