1use super::{NumberValue, Value, DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT};
16use crate::_macro_internal::Error;
17use crate::cursor_ext::{
18 collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
19 ReadNumberExt,
20};
21use crate::error::{ConvertError, Result};
22use chrono::{Datelike, NaiveDate};
23use databend_client::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
24use ethnum::i256;
25use hex;
26use jiff::{civil::DateTime as JiffDateTime, tz::TimeZone, Zoned};
27use std::io::{BufRead, Cursor};
28use std::str::FromStr;
29
30const NULL_VALUE: &str = "NULL";
31const TRUE_VALUE: &str = "1";
32const FALSE_VALUE: &str = "0";
33
34impl TryFrom<(&DataType, Option<String>, &TimeZone)> for Value {
35 type Error = Error;
36
37 fn try_from((t, v, tz): (&DataType, Option<String>, &TimeZone)) -> Result<Self> {
38 match v {
39 Some(v) => Self::try_from((t, v, tz)),
40 None => match t {
41 DataType::Null => Ok(Self::Null),
42 DataType::Nullable(_) => Ok(Self::Null),
43 _ => Err(Error::InvalidResponse(
44 "NULL value for non-nullable field".to_string(),
45 )),
46 },
47 }
48 }
49}
50
51impl TryFrom<(&DataType, String, &TimeZone)> for Value {
52 type Error = Error;
53
54 fn try_from((t, v, tz): (&DataType, String, &TimeZone)) -> Result<Self> {
55 match t {
56 DataType::Null => Ok(Self::Null),
57 DataType::EmptyArray => Ok(Self::EmptyArray),
58 DataType::EmptyMap => Ok(Self::EmptyMap),
59 DataType::Boolean => Ok(Self::Boolean(v == "1")),
60 DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
61 DataType::String => Ok(Self::String(v)),
62 DataType::Number(NumberDataType::Int8) => {
63 Ok(Self::Number(NumberValue::Int8(v.parse()?)))
64 }
65 DataType::Number(NumberDataType::Int16) => {
66 Ok(Self::Number(NumberValue::Int16(v.parse()?)))
67 }
68 DataType::Number(NumberDataType::Int32) => {
69 Ok(Self::Number(NumberValue::Int32(v.parse()?)))
70 }
71 DataType::Number(NumberDataType::Int64) => {
72 Ok(Self::Number(NumberValue::Int64(v.parse()?)))
73 }
74 DataType::Number(NumberDataType::UInt8) => {
75 Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
76 }
77 DataType::Number(NumberDataType::UInt16) => {
78 Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
79 }
80 DataType::Number(NumberDataType::UInt32) => {
81 Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
82 }
83 DataType::Number(NumberDataType::UInt64) => {
84 Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
85 }
86 DataType::Number(NumberDataType::Float32) => {
87 Ok(Self::Number(NumberValue::Float32(v.parse()?)))
88 }
89 DataType::Number(NumberDataType::Float64) => {
90 Ok(Self::Number(NumberValue::Float64(v.parse()?)))
91 }
92 DataType::Decimal(DecimalDataType::Decimal128(size)) => {
93 let d = parse_decimal(v.as_str(), *size)?;
94 Ok(Self::Number(d))
95 }
96 DataType::Decimal(DecimalDataType::Decimal256(size)) => {
97 let d = parse_decimal(v.as_str(), *size)?;
98 Ok(Self::Number(d))
99 }
100 DataType::Timestamp => parse_timestamp(v.as_str(), tz),
101 DataType::TimestampTz => {
102 let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v.as_str())?;
103 Ok(Self::TimestampTz(t))
104 }
105 DataType::Date => Ok(Self::Date(
106 NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
107 - DAYS_FROM_CE,
108 )),
109 DataType::Bitmap => Ok(Self::Bitmap(v)),
110 DataType::Variant => Ok(Self::Variant(v)),
111 DataType::Geometry => Ok(Self::Geometry(v)),
112 DataType::Geography => Ok(Self::Geography(v)),
113 DataType::Interval => Ok(Self::Interval(v)),
114 DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) | DataType::Vector(_) => {
115 let mut reader = Cursor::new(v.as_str());
116 let decoder = ValueDecoder {
117 timezone: tz.clone(),
118 };
119 decoder.read_field(t, &mut reader)
120 }
121 DataType::Nullable(inner) => match inner.as_ref() {
122 DataType::String => Ok(Self::String(v.to_string())),
123 _ => {
124 if v == NULL_VALUE {
127 Ok(Self::Null)
128 } else {
129 Self::try_from((inner.as_ref(), v, tz))
130 }
131 }
132 },
133 }
134 }
135}
136
137struct ValueDecoder {
138 pub timezone: TimeZone,
139}
140
141impl ValueDecoder {
142 pub(super) fn read_field<R: AsRef<[u8]>>(
143 &self,
144 ty: &DataType,
145 reader: &mut Cursor<R>,
146 ) -> Result<Value> {
147 match ty {
148 DataType::Null => self.read_null(reader),
149 DataType::EmptyArray => self.read_empty_array(reader),
150 DataType::EmptyMap => self.read_empty_map(reader),
151 DataType::Boolean => self.read_bool(reader),
152 DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
153 DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
154 DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
155 DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
156 DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
157 DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
158 DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
159 DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
160 DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
161 DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
162 DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
163 DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
164 DataType::String => self.read_string(reader),
165 DataType::Binary => self.read_binary(reader),
166 DataType::Timestamp => self.read_timestamp(reader),
167 DataType::TimestampTz => self.read_timestamp_tz(reader),
168 DataType::Date => self.read_date(reader),
169 DataType::Bitmap => self.read_bitmap(reader),
170 DataType::Variant => self.read_variant(reader),
171 DataType::Geometry => self.read_geometry(reader),
172 DataType::Interval => self.read_interval(reader),
173 DataType::Geography => self.read_geography(reader),
174 DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
175 DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
176 DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
177 DataType::Vector(dimension) => self.read_vector(*dimension as usize, reader),
178 DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
179 }
180 }
181
182 fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
183 let pos = reader.checkpoint();
184 if reader.ignore_bytes(bs) {
185 true
186 } else {
187 reader.rollback(pos);
188 false
189 }
190 }
191
192 fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
193 if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
194 Ok(Value::Null)
195 } else {
196 let buf = reader.fill_buf()?;
197 Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
198 }
199 }
200
201 fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
202 if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
203 Ok(Value::Boolean(true))
204 } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
205 Ok(Value::Boolean(false))
206 } else {
207 let buf = reader.fill_buf()?;
208 Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
209 }
210 }
211
212 fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
213 let v: i8 = reader.read_int_text()?;
214 Ok(Value::Number(NumberValue::Int8(v)))
215 }
216
217 fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
218 let v: i16 = reader.read_int_text()?;
219 Ok(Value::Number(NumberValue::Int16(v)))
220 }
221
222 fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
223 let v: i32 = reader.read_int_text()?;
224 Ok(Value::Number(NumberValue::Int32(v)))
225 }
226
227 fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
228 let v: i64 = reader.read_int_text()?;
229 Ok(Value::Number(NumberValue::Int64(v)))
230 }
231
232 fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
233 let v: u8 = reader.read_int_text()?;
234 Ok(Value::Number(NumberValue::UInt8(v)))
235 }
236
237 fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
238 let v: u16 = reader.read_int_text()?;
239 Ok(Value::Number(NumberValue::UInt16(v)))
240 }
241
242 fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
243 let v: u32 = reader.read_int_text()?;
244 Ok(Value::Number(NumberValue::UInt32(v)))
245 }
246
247 fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
248 let v: u64 = reader.read_int_text()?;
249 Ok(Value::Number(NumberValue::UInt64(v)))
250 }
251
252 fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
253 let v: f32 = reader.read_float_text()?;
254 Ok(Value::Number(NumberValue::Float32(v)))
255 }
256
257 fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
258 let v: f64 = reader.read_float_text()?;
259 Ok(Value::Number(NumberValue::Float64(v)))
260 }
261
262 fn read_decimal<R: AsRef<[u8]>>(
263 &self,
264 size: &DecimalSize,
265 reader: &mut Cursor<R>,
266 ) -> Result<Value> {
267 let buf = reader.fill_buf()?;
268 let (n_in, _) = collect_number(buf);
271 let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
272 let d = parse_decimal(v, *size)?;
273 reader.consume(n_in);
274 Ok(Value::Number(d))
275 }
276
277 fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
278 let mut buf = Vec::new();
279 reader.read_quoted_text(&mut buf, b'\'')?;
280 Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
281 }
282
283 fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
284 let buf = reader.fill_buf()?;
285 let n = collect_binary_number(buf);
286 let v = buf[..n].to_vec();
287 reader.consume(n);
288 Ok(Value::Binary(hex::decode(v)?))
289 }
290
291 fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
292 let mut buf = Vec::new();
293 reader.read_quoted_text(&mut buf, b'\'')?;
294 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
295 let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
296 Ok(Value::Date(days))
297 }
298
299 fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
300 let mut buf = Vec::new();
301 reader.read_quoted_text(&mut buf, b'\'')?;
302 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
303 parse_timestamp(v, &self.timezone)
304 }
305
306 fn read_timestamp_tz<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
307 let mut buf = Vec::new();
308 reader.read_quoted_text(&mut buf, b'\'')?;
309 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
310 let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v)?;
311 Ok(Value::TimestampTz(t))
312 }
313
314 fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
315 let mut buf = Vec::new();
316 reader.read_quoted_text(&mut buf, b'\'')?;
317 Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
318 }
319
320 fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
321 let mut buf = Vec::new();
322 reader.read_quoted_text(&mut buf, b'\'')?;
323 Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
324 }
325
326 fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
327 let mut buf = Vec::new();
328 reader.read_quoted_text(&mut buf, b'\'')?;
329 Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
330 }
331
332 fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
333 let mut buf = Vec::new();
334 reader.read_quoted_text(&mut buf, b'\'')?;
335 Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
336 }
337
338 fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
339 let mut buf = Vec::new();
340 reader.read_quoted_text(&mut buf, b'\'')?;
341 Ok(Value::Geography(unsafe {
342 String::from_utf8_unchecked(buf)
343 }))
344 }
345
346 fn read_nullable<R: AsRef<[u8]>>(
347 &self,
348 ty: &DataType,
349 reader: &mut Cursor<R>,
350 ) -> Result<Value> {
351 match self.read_null(reader) {
352 Ok(val) => Ok(val),
353 Err(_) => self.read_field(ty, reader),
354 }
355 }
356
357 fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
358 reader.must_ignore_byte(b'[')?;
359 reader.must_ignore_byte(b']')?;
360 Ok(Value::EmptyArray)
361 }
362
363 fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
364 reader.must_ignore_byte(b'{')?;
365 reader.must_ignore_byte(b'}')?;
366 Ok(Value::EmptyArray)
367 }
368
369 fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
370 let mut vals = Vec::new();
371 reader.must_ignore_byte(b'[')?;
372 for idx in 0.. {
373 let _ = reader.ignore_white_spaces();
374 if reader.ignore_byte(b']') {
375 break;
376 }
377 if idx != 0 {
378 reader.must_ignore_byte(b',')?;
379 }
380 let _ = reader.ignore_white_spaces();
381 let val = self.read_field(ty, reader)?;
382 vals.push(val);
383 }
384 Ok(Value::Array(vals))
385 }
386
387 fn read_vector<R: AsRef<[u8]>>(
388 &self,
389 dimension: usize,
390 reader: &mut Cursor<R>,
391 ) -> Result<Value> {
392 let mut vals = Vec::with_capacity(dimension);
393 reader.must_ignore_byte(b'[')?;
394 for idx in 0..dimension {
395 let _ = reader.ignore_white_spaces();
396 if idx > 0 {
397 reader.must_ignore_byte(b',')?;
398 }
399 let _ = reader.ignore_white_spaces();
400 let val: f32 = reader.read_float_text()?;
401 vals.push(val);
402 }
403 reader.must_ignore_byte(b']')?;
404 Ok(Value::Vector(vals))
405 }
406
407 fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
408 const KEY: usize = 0;
409 const VALUE: usize = 1;
410 let mut kvs = Vec::new();
411 reader.must_ignore_byte(b'{')?;
412 match ty {
413 DataType::Tuple(inner_tys) => {
414 for idx in 0.. {
415 let _ = reader.ignore_white_spaces();
416 if reader.ignore_byte(b'}') {
417 break;
418 }
419 if idx != 0 {
420 reader.must_ignore_byte(b',')?;
421 }
422 let _ = reader.ignore_white_spaces();
423 let key = self.read_field(&inner_tys[KEY], reader)?;
424 let _ = reader.ignore_white_spaces();
425 reader.must_ignore_byte(b':')?;
426 let _ = reader.ignore_white_spaces();
427 let val = self.read_field(&inner_tys[VALUE], reader)?;
428 kvs.push((key, val));
429 }
430 Ok(Value::Map(kvs))
431 }
432 _ => unreachable!(),
433 }
434 }
435
436 fn read_tuple<R: AsRef<[u8]>>(
437 &self,
438 tys: &[DataType],
439 reader: &mut Cursor<R>,
440 ) -> Result<Value> {
441 let mut vals = Vec::new();
442 reader.must_ignore_byte(b'(')?;
443 for (idx, ty) in tys.iter().enumerate() {
444 let _ = reader.ignore_white_spaces();
445 if idx != 0 {
446 reader.must_ignore_byte(b',')?;
447 }
448 let _ = reader.ignore_white_spaces();
449 let val = self.read_field(ty, reader)?;
450 vals.push(val);
451 }
452 reader.must_ignore_byte(b')')?;
453 Ok(Value::Tuple(vals))
454 }
455}
456
457fn parse_timestamp(ts_string: &str, tz: &TimeZone) -> Result<Value> {
458 let local = JiffDateTime::strptime(TIMESTAMP_FORMAT, ts_string)?;
459 let dt_with_tz = local.to_zoned(tz.clone()).map_err(|e| {
460 Error::Parsing(format!(
461 "time {ts_string} not exists in timezone {tz:?}: {e}"
462 ))
463 })?;
464 Ok(Value::Timestamp(dt_with_tz))
465}
466
467fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
468 let mut start = 0;
469 let bytes = text.as_bytes();
470 let mut is_negative = false;
471
472 if bytes[start] == b'-' {
474 is_negative = true;
475 start += 1;
476 }
477
478 while start < text.len() && bytes[start] == b'0' {
479 start += 1
480 }
481 let text = &text[start..];
482 let point_pos = text.find('.');
483 let e_pos = text.find(|c| ['E', 'e'].contains(&c));
484 let (i_part, f_part, e_part) = match (point_pos, e_pos) {
485 (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
486 (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
487 (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
488 (None, None) => (text, "", None),
489 };
490 let exp = match e_part {
491 Some(s) => s.parse::<i32>()?,
492 None => 0,
493 };
494 if i_part.len() as i32 + exp > 76 {
495 Err(ConvertError::new("decimal", format!("{text:?}")).into())
496 } else {
497 let mut digits = Vec::with_capacity(76);
498 digits.extend_from_slice(i_part.as_bytes());
499 digits.extend_from_slice(f_part.as_bytes());
500 if digits.is_empty() {
501 digits.push(b'0')
502 }
503 let scale = f_part.len() as i32 - exp;
504 if scale < 0 {
505 for _ in 0..(-scale) {
507 digits.push(b'0')
508 }
509 };
510
511 let precision = std::cmp::min(digits.len(), 76);
512 let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
513
514 let result = if size.precision > 38 {
515 NumberValue::Decimal256(i256::from_str(digits).unwrap(), size)
516 } else {
517 NumberValue::Decimal128(digits.parse::<i128>()?, size)
518 };
519
520 if is_negative {
522 match result {
523 NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
524 NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
525 _ => Ok(result),
526 }
527 } else {
528 Ok(result)
529 }
530 }
531}