databend_driver_core/value/
string_decoder.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::{NumberValue, Value, DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT};
16use crate::_macro_internal::Error;
17use crate::cursor_ext::{
18    collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
19    ReadNumberExt,
20};
21use crate::error::{ConvertError, Result};
22use chrono::{Datelike, NaiveDate};
23use databend_client::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
24use ethnum::i256;
25use hex;
26use jiff::{civil::DateTime as JiffDateTime, tz::TimeZone, Zoned};
27use std::io::{BufRead, Cursor};
28use std::str::FromStr;
29
30const NULL_VALUE: &str = "NULL";
31const TRUE_VALUE: &str = "1";
32const FALSE_VALUE: &str = "0";
33
34impl TryFrom<(&DataType, Option<String>, &TimeZone)> for Value {
35    type Error = Error;
36
37    fn try_from((t, v, tz): (&DataType, Option<String>, &TimeZone)) -> Result<Self> {
38        match v {
39            Some(v) => Self::try_from((t, v, tz)),
40            None => match t {
41                DataType::Null => Ok(Self::Null),
42                DataType::Nullable(_) => Ok(Self::Null),
43                _ => Err(Error::InvalidResponse(
44                    "NULL value for non-nullable field".to_string(),
45                )),
46            },
47        }
48    }
49}
50
51impl TryFrom<(&DataType, String, &TimeZone)> for Value {
52    type Error = Error;
53
54    fn try_from((t, v, tz): (&DataType, String, &TimeZone)) -> Result<Self> {
55        match t {
56            DataType::Null => Ok(Self::Null),
57            DataType::EmptyArray => Ok(Self::EmptyArray),
58            DataType::EmptyMap => Ok(Self::EmptyMap),
59            DataType::Boolean => Ok(Self::Boolean(v == "1")),
60            DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
61            DataType::String => Ok(Self::String(v)),
62            DataType::Number(NumberDataType::Int8) => {
63                Ok(Self::Number(NumberValue::Int8(v.parse()?)))
64            }
65            DataType::Number(NumberDataType::Int16) => {
66                Ok(Self::Number(NumberValue::Int16(v.parse()?)))
67            }
68            DataType::Number(NumberDataType::Int32) => {
69                Ok(Self::Number(NumberValue::Int32(v.parse()?)))
70            }
71            DataType::Number(NumberDataType::Int64) => {
72                Ok(Self::Number(NumberValue::Int64(v.parse()?)))
73            }
74            DataType::Number(NumberDataType::UInt8) => {
75                Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
76            }
77            DataType::Number(NumberDataType::UInt16) => {
78                Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
79            }
80            DataType::Number(NumberDataType::UInt32) => {
81                Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
82            }
83            DataType::Number(NumberDataType::UInt64) => {
84                Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
85            }
86            DataType::Number(NumberDataType::Float32) => {
87                Ok(Self::Number(NumberValue::Float32(v.parse()?)))
88            }
89            DataType::Number(NumberDataType::Float64) => {
90                Ok(Self::Number(NumberValue::Float64(v.parse()?)))
91            }
92            DataType::Decimal(DecimalDataType::Decimal64(size)) => {
93                let d = parse_decimal(v.as_str(), *size)?;
94                Ok(Self::Number(d))
95            }
96            DataType::Decimal(DecimalDataType::Decimal128(size)) => {
97                let d = parse_decimal(v.as_str(), *size)?;
98                Ok(Self::Number(d))
99            }
100            DataType::Decimal(DecimalDataType::Decimal256(size)) => {
101                let d = parse_decimal(v.as_str(), *size)?;
102                Ok(Self::Number(d))
103            }
104            DataType::Timestamp => parse_timestamp(v.as_str(), tz),
105            DataType::TimestampTz => {
106                let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v.as_str())?;
107                Ok(Self::TimestampTz(t))
108            }
109            DataType::Date => Ok(Self::Date(
110                NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
111                    - DAYS_FROM_CE,
112            )),
113            DataType::Bitmap => Ok(Self::Bitmap(v)),
114            DataType::Variant => Ok(Self::Variant(v)),
115            DataType::Geometry => Ok(Self::Geometry(v)),
116            DataType::Geography => Ok(Self::Geography(v)),
117            DataType::Interval => Ok(Self::Interval(v)),
118            DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) | DataType::Vector(_) => {
119                let mut reader = Cursor::new(v.as_str());
120                let decoder = ValueDecoder {
121                    timezone: tz.clone(),
122                };
123                decoder.read_field(t, &mut reader)
124            }
125            DataType::Nullable(inner) => match inner.as_ref() {
126                DataType::String => Ok(Self::String(v.to_string())),
127                _ => {
128                    // not string type, try to check if it is NULL
129                    // for compatible with old version server
130                    if v == NULL_VALUE {
131                        Ok(Self::Null)
132                    } else {
133                        Self::try_from((inner.as_ref(), v, tz))
134                    }
135                }
136            },
137        }
138    }
139}
140
141struct ValueDecoder {
142    pub timezone: TimeZone,
143}
144
145impl ValueDecoder {
146    pub(super) fn read_field<R: AsRef<[u8]>>(
147        &self,
148        ty: &DataType,
149        reader: &mut Cursor<R>,
150    ) -> Result<Value> {
151        match ty {
152            DataType::Null => self.read_null(reader),
153            DataType::EmptyArray => self.read_empty_array(reader),
154            DataType::EmptyMap => self.read_empty_map(reader),
155            DataType::Boolean => self.read_bool(reader),
156            DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
157            DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
158            DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
159            DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
160            DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
161            DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
162            DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
163            DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
164            DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
165            DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
166            DataType::Decimal(DecimalDataType::Decimal64(size)) => self.read_decimal(size, reader),
167            DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
168            DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
169            DataType::String => self.read_string(reader),
170            DataType::Binary => self.read_binary(reader),
171            DataType::Timestamp => self.read_timestamp(reader),
172            DataType::TimestampTz => self.read_timestamp_tz(reader),
173            DataType::Date => self.read_date(reader),
174            DataType::Bitmap => self.read_bitmap(reader),
175            DataType::Variant => self.read_variant(reader),
176            DataType::Geometry => self.read_geometry(reader),
177            DataType::Interval => self.read_interval(reader),
178            DataType::Geography => self.read_geography(reader),
179            DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
180            DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
181            DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
182            DataType::Vector(dimension) => self.read_vector(*dimension as usize, reader),
183            DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
184        }
185    }
186
187    fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
188        let pos = reader.checkpoint();
189        if reader.ignore_bytes(bs) {
190            true
191        } else {
192            reader.rollback(pos);
193            false
194        }
195    }
196
197    fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
198        if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
199            Ok(Value::Null)
200        } else {
201            let buf = reader.fill_buf()?;
202            Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
203        }
204    }
205
206    fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
207        if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
208            Ok(Value::Boolean(true))
209        } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
210            Ok(Value::Boolean(false))
211        } else {
212            let buf = reader.fill_buf()?;
213            Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
214        }
215    }
216
217    fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
218        let v: i8 = reader.read_int_text()?;
219        Ok(Value::Number(NumberValue::Int8(v)))
220    }
221
222    fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
223        let v: i16 = reader.read_int_text()?;
224        Ok(Value::Number(NumberValue::Int16(v)))
225    }
226
227    fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
228        let v: i32 = reader.read_int_text()?;
229        Ok(Value::Number(NumberValue::Int32(v)))
230    }
231
232    fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
233        let v: i64 = reader.read_int_text()?;
234        Ok(Value::Number(NumberValue::Int64(v)))
235    }
236
237    fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
238        let v: u8 = reader.read_int_text()?;
239        Ok(Value::Number(NumberValue::UInt8(v)))
240    }
241
242    fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
243        let v: u16 = reader.read_int_text()?;
244        Ok(Value::Number(NumberValue::UInt16(v)))
245    }
246
247    fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
248        let v: u32 = reader.read_int_text()?;
249        Ok(Value::Number(NumberValue::UInt32(v)))
250    }
251
252    fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
253        let v: u64 = reader.read_int_text()?;
254        Ok(Value::Number(NumberValue::UInt64(v)))
255    }
256
257    fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
258        let v: f32 = reader.read_float_text()?;
259        Ok(Value::Number(NumberValue::Float32(v)))
260    }
261
262    fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
263        let v: f64 = reader.read_float_text()?;
264        Ok(Value::Number(NumberValue::Float64(v)))
265    }
266
267    fn read_decimal<R: AsRef<[u8]>>(
268        &self,
269        size: &DecimalSize,
270        reader: &mut Cursor<R>,
271    ) -> Result<Value> {
272        let buf = reader.fill_buf()?;
273        // parser decimal need fractional part.
274        // 10.00 and 10 is different value.
275        let (n_in, _) = collect_number(buf);
276        let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
277        let d = parse_decimal(v, *size)?;
278        reader.consume(n_in);
279        Ok(Value::Number(d))
280    }
281
282    fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
283        let mut buf = Vec::new();
284        reader.read_quoted_text(&mut buf, b'\'')?;
285        Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
286    }
287
288    fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
289        let buf = reader.fill_buf()?;
290        let n = collect_binary_number(buf);
291        let v = buf[..n].to_vec();
292        reader.consume(n);
293        Ok(Value::Binary(hex::decode(v)?))
294    }
295
296    fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
297        let mut buf = Vec::new();
298        reader.read_quoted_text(&mut buf, b'\'')?;
299        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
300        let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
301        Ok(Value::Date(days))
302    }
303
304    fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
305        let mut buf = Vec::new();
306        reader.read_quoted_text(&mut buf, b'\'')?;
307        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
308        parse_timestamp(v, &self.timezone)
309    }
310
311    fn read_timestamp_tz<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
312        let mut buf = Vec::new();
313        reader.read_quoted_text(&mut buf, b'\'')?;
314        let v = unsafe { std::str::from_utf8_unchecked(&buf) };
315        let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v)?;
316        Ok(Value::TimestampTz(t))
317    }
318
319    fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
320        let mut buf = Vec::new();
321        reader.read_quoted_text(&mut buf, b'\'')?;
322        Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
323    }
324
325    fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
326        let mut buf = Vec::new();
327        reader.read_quoted_text(&mut buf, b'\'')?;
328        Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
329    }
330
331    fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
332        let mut buf = Vec::new();
333        reader.read_quoted_text(&mut buf, b'\'')?;
334        Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
335    }
336
337    fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
338        let mut buf = Vec::new();
339        reader.read_quoted_text(&mut buf, b'\'')?;
340        Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
341    }
342
343    fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
344        let mut buf = Vec::new();
345        reader.read_quoted_text(&mut buf, b'\'')?;
346        Ok(Value::Geography(unsafe {
347            String::from_utf8_unchecked(buf)
348        }))
349    }
350
351    fn read_nullable<R: AsRef<[u8]>>(
352        &self,
353        ty: &DataType,
354        reader: &mut Cursor<R>,
355    ) -> Result<Value> {
356        match self.read_null(reader) {
357            Ok(val) => Ok(val),
358            Err(_) => self.read_field(ty, reader),
359        }
360    }
361
362    fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
363        reader.must_ignore_byte(b'[')?;
364        reader.must_ignore_byte(b']')?;
365        Ok(Value::EmptyArray)
366    }
367
368    fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
369        reader.must_ignore_byte(b'{')?;
370        reader.must_ignore_byte(b'}')?;
371        Ok(Value::EmptyArray)
372    }
373
374    fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
375        let mut vals = Vec::new();
376        reader.must_ignore_byte(b'[')?;
377        for idx in 0.. {
378            let _ = reader.ignore_white_spaces();
379            if reader.ignore_byte(b']') {
380                break;
381            }
382            if idx != 0 {
383                reader.must_ignore_byte(b',')?;
384            }
385            let _ = reader.ignore_white_spaces();
386            let val = self.read_field(ty, reader)?;
387            vals.push(val);
388        }
389        Ok(Value::Array(vals))
390    }
391
392    fn read_vector<R: AsRef<[u8]>>(
393        &self,
394        dimension: usize,
395        reader: &mut Cursor<R>,
396    ) -> Result<Value> {
397        let mut vals = Vec::with_capacity(dimension);
398        reader.must_ignore_byte(b'[')?;
399        for idx in 0..dimension {
400            let _ = reader.ignore_white_spaces();
401            if idx > 0 {
402                reader.must_ignore_byte(b',')?;
403            }
404            let _ = reader.ignore_white_spaces();
405            let val: f32 = reader.read_float_text()?;
406            vals.push(val);
407        }
408        reader.must_ignore_byte(b']')?;
409        Ok(Value::Vector(vals))
410    }
411
412    fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
413        const KEY: usize = 0;
414        const VALUE: usize = 1;
415        let mut kvs = Vec::new();
416        reader.must_ignore_byte(b'{')?;
417        match ty {
418            DataType::Tuple(inner_tys) => {
419                for idx in 0.. {
420                    let _ = reader.ignore_white_spaces();
421                    if reader.ignore_byte(b'}') {
422                        break;
423                    }
424                    if idx != 0 {
425                        reader.must_ignore_byte(b',')?;
426                    }
427                    let _ = reader.ignore_white_spaces();
428                    let key = self.read_field(&inner_tys[KEY], reader)?;
429                    let _ = reader.ignore_white_spaces();
430                    reader.must_ignore_byte(b':')?;
431                    let _ = reader.ignore_white_spaces();
432                    let val = self.read_field(&inner_tys[VALUE], reader)?;
433                    kvs.push((key, val));
434                }
435                Ok(Value::Map(kvs))
436            }
437            _ => unreachable!(),
438        }
439    }
440
441    fn read_tuple<R: AsRef<[u8]>>(
442        &self,
443        tys: &[DataType],
444        reader: &mut Cursor<R>,
445    ) -> Result<Value> {
446        let mut vals = Vec::new();
447        reader.must_ignore_byte(b'(')?;
448        for (idx, ty) in tys.iter().enumerate() {
449            let _ = reader.ignore_white_spaces();
450            if idx != 0 {
451                reader.must_ignore_byte(b',')?;
452            }
453            let _ = reader.ignore_white_spaces();
454            let val = self.read_field(ty, reader)?;
455            vals.push(val);
456        }
457        reader.must_ignore_byte(b')')?;
458        Ok(Value::Tuple(vals))
459    }
460}
461
462fn parse_timestamp(ts_string: &str, tz: &TimeZone) -> Result<Value> {
463    let local = JiffDateTime::strptime(TIMESTAMP_FORMAT, ts_string)?;
464    let dt_with_tz = local.to_zoned(tz.clone()).map_err(|e| {
465        Error::Parsing(format!(
466            "time {ts_string} not exists in timezone {tz:?}: {e}"
467        ))
468    })?;
469    Ok(Value::Timestamp(dt_with_tz))
470}
471
472fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
473    let mut start = 0;
474    let bytes = text.as_bytes();
475    let mut is_negative = false;
476
477    // Check if the number is negative
478    if bytes[start] == b'-' {
479        is_negative = true;
480        start += 1;
481    }
482
483    while start < text.len() && bytes[start] == b'0' {
484        start += 1
485    }
486    let text = &text[start..];
487    let point_pos = text.find('.');
488    let e_pos = text.find(|c| ['E', 'e'].contains(&c));
489    let (i_part, f_part, e_part) = match (point_pos, e_pos) {
490        (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
491        (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
492        (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
493        (None, None) => (text, "", None),
494    };
495    let exp = match e_part {
496        Some(s) => s.parse::<i32>()?,
497        None => 0,
498    };
499    if i_part.len() as i32 + exp > 76 {
500        Err(ConvertError::new("decimal", format!("{text:?}")).into())
501    } else {
502        let mut digits = Vec::with_capacity(76);
503        digits.extend_from_slice(i_part.as_bytes());
504        digits.extend_from_slice(f_part.as_bytes());
505        if digits.is_empty() {
506            digits.push(b'0')
507        }
508        let scale = f_part.len() as i32 - exp;
509        if scale < 0 {
510            // e.g 123.1e3
511            for _ in 0..(-scale) {
512                digits.push(b'0')
513            }
514        };
515
516        let precision = std::cmp::min(digits.len(), 76);
517        let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
518
519        let result = if size.precision > 38 {
520            NumberValue::Decimal256(i256::from_str(digits).unwrap(), size)
521        } else if size.precision > 19 {
522            NumberValue::Decimal128(digits.parse::<i128>()?, size)
523        } else {
524            NumberValue::Decimal64(digits.parse::<i64>()?, size)
525        };
526
527        // If the number was negative, negate the result
528        if is_negative {
529            match result {
530                NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
531                NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
532                NumberValue::Decimal64(val, size) => Ok(NumberValue::Decimal64(-val, size)),
533                _ => Ok(result),
534            }
535        } else {
536            Ok(result)
537        }
538    }
539}