1use super::{NumberValue, Value, DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT};
16use crate::_macro_internal::Error;
17use crate::cursor_ext::{
18 collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
19 ReadNumberExt,
20};
21use crate::error::{ConvertError, Result};
22use chrono::{Datelike, NaiveDate};
23use databend_client::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
24use ethnum::i256;
25use hex;
26use jiff::{civil::DateTime as JiffDateTime, tz::TimeZone, Zoned};
27use serde::Deserialize;
28use serde_json::{value::RawValue, Deserializer};
29use std::io::{BufRead, Cursor};
30use std::str::FromStr;
31
32const NULL_VALUE: &str = "NULL";
33const TRUE_VALUE: &str = "1";
34const FALSE_VALUE: &str = "0";
35
36impl TryFrom<(&DataType, Option<String>, &TimeZone)> for Value {
37 type Error = Error;
38
39 fn try_from((t, v, tz): (&DataType, Option<String>, &TimeZone)) -> Result<Self> {
40 match v {
41 Some(v) => Self::try_from((t, v, tz)),
42 None => match t {
43 DataType::Null => Ok(Self::Null),
44 DataType::Nullable(_) => Ok(Self::Null),
45 _ => Err(Error::InvalidResponse(
46 "NULL value for non-nullable field".to_string(),
47 )),
48 },
49 }
50 }
51}
52
53impl TryFrom<(&DataType, String, &TimeZone)> for Value {
54 type Error = Error;
55
56 fn try_from((t, v, tz): (&DataType, String, &TimeZone)) -> Result<Self> {
57 match t {
58 DataType::Null => Ok(Self::Null),
59 DataType::EmptyArray => Ok(Self::EmptyArray),
60 DataType::EmptyMap => Ok(Self::EmptyMap),
61 DataType::Boolean => Ok(Self::Boolean(v == "1")),
62 DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
63 DataType::String => Ok(Self::String(v)),
64 DataType::Number(NumberDataType::Int8) => {
65 Ok(Self::Number(NumberValue::Int8(v.parse()?)))
66 }
67 DataType::Number(NumberDataType::Int16) => {
68 Ok(Self::Number(NumberValue::Int16(v.parse()?)))
69 }
70 DataType::Number(NumberDataType::Int32) => {
71 Ok(Self::Number(NumberValue::Int32(v.parse()?)))
72 }
73 DataType::Number(NumberDataType::Int64) => {
74 Ok(Self::Number(NumberValue::Int64(v.parse()?)))
75 }
76 DataType::Number(NumberDataType::UInt8) => {
77 Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
78 }
79 DataType::Number(NumberDataType::UInt16) => {
80 Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
81 }
82 DataType::Number(NumberDataType::UInt32) => {
83 Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
84 }
85 DataType::Number(NumberDataType::UInt64) => {
86 Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
87 }
88 DataType::Number(NumberDataType::Float32) => {
89 Ok(Self::Number(NumberValue::Float32(v.parse()?)))
90 }
91 DataType::Number(NumberDataType::Float64) => {
92 Ok(Self::Number(NumberValue::Float64(v.parse()?)))
93 }
94 DataType::Decimal(DecimalDataType::Decimal64(size)) => {
95 let d = parse_decimal(v.as_str(), *size)?;
96 Ok(Self::Number(d))
97 }
98 DataType::Decimal(DecimalDataType::Decimal128(size)) => {
99 let d = parse_decimal(v.as_str(), *size)?;
100 Ok(Self::Number(d))
101 }
102 DataType::Decimal(DecimalDataType::Decimal256(size)) => {
103 let d = parse_decimal(v.as_str(), *size)?;
104 Ok(Self::Number(d))
105 }
106 DataType::Timestamp => parse_timestamp(v.as_str(), tz),
107 DataType::TimestampTz => {
108 let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v.as_str())?;
109 Ok(Self::TimestampTz(t))
110 }
111 DataType::Date => Ok(Self::Date(
112 NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
113 - DAYS_FROM_CE,
114 )),
115 DataType::Bitmap => Ok(Self::Bitmap(v)),
116 DataType::Variant => Ok(Self::Variant(v)),
117 DataType::Geometry => Ok(Self::Geometry(v)),
118 DataType::Geography => Ok(Self::Geography(v)),
119 DataType::Interval => Ok(Self::Interval(v)),
120 DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) | DataType::Vector(_) => {
121 let mut reader = Cursor::new(v.as_str());
122 let decoder = ValueDecoder {
123 timezone: tz.clone(),
124 };
125 decoder.read_field(t, &mut reader)
126 }
127 DataType::Nullable(inner) => match inner.as_ref() {
128 DataType::String => Ok(Self::String(v.to_string())),
129 _ => {
130 if v == NULL_VALUE {
133 Ok(Self::Null)
134 } else {
135 Self::try_from((inner.as_ref(), v, tz))
136 }
137 }
138 },
139 }
140 }
141}
142
143struct ValueDecoder {
144 pub timezone: TimeZone,
145}
146
147impl ValueDecoder {
148 pub(super) fn read_field<R: AsRef<[u8]>>(
149 &self,
150 ty: &DataType,
151 reader: &mut Cursor<R>,
152 ) -> Result<Value> {
153 match ty {
154 DataType::Null => self.read_null(reader),
155 DataType::EmptyArray => self.read_empty_array(reader),
156 DataType::EmptyMap => self.read_empty_map(reader),
157 DataType::Boolean => self.read_bool(reader),
158 DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
159 DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
160 DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
161 DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
162 DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
163 DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
164 DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
165 DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
166 DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
167 DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
168 DataType::Decimal(DecimalDataType::Decimal64(size)) => self.read_decimal(size, reader),
169 DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
170 DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
171 DataType::String => self.read_string(reader),
172 DataType::Binary => self.read_binary(reader),
173 DataType::Timestamp => self.read_timestamp(reader),
174 DataType::TimestampTz => self.read_timestamp_tz(reader),
175 DataType::Date => self.read_date(reader),
176 DataType::Bitmap => self.read_bitmap(reader),
177 DataType::Variant => self.read_variant(reader),
178 DataType::Geometry => self.read_geometry(reader),
179 DataType::Interval => self.read_interval(reader),
180 DataType::Geography => self.read_geography(reader),
181 DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
182 DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
183 DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
184 DataType::Vector(dimension) => self.read_vector(*dimension as usize, reader),
185 DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
186 }
187 }
188
189 fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
190 let pos = reader.checkpoint();
191 if reader.ignore_bytes(bs) {
192 true
193 } else {
194 reader.rollback(pos);
195 false
196 }
197 }
198
199 fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
200 if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
201 Ok(Value::Null)
202 } else {
203 let buf = reader.fill_buf()?;
204 Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
205 }
206 }
207
208 fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
209 if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
210 Ok(Value::Boolean(true))
211 } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
212 Ok(Value::Boolean(false))
213 } else {
214 let buf = reader.fill_buf()?;
215 Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
216 }
217 }
218
219 fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
220 let v: i8 = reader.read_int_text()?;
221 Ok(Value::Number(NumberValue::Int8(v)))
222 }
223
224 fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
225 let v: i16 = reader.read_int_text()?;
226 Ok(Value::Number(NumberValue::Int16(v)))
227 }
228
229 fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
230 let v: i32 = reader.read_int_text()?;
231 Ok(Value::Number(NumberValue::Int32(v)))
232 }
233
234 fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
235 let v: i64 = reader.read_int_text()?;
236 Ok(Value::Number(NumberValue::Int64(v)))
237 }
238
239 fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
240 let v: u8 = reader.read_int_text()?;
241 Ok(Value::Number(NumberValue::UInt8(v)))
242 }
243
244 fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
245 let v: u16 = reader.read_int_text()?;
246 Ok(Value::Number(NumberValue::UInt16(v)))
247 }
248
249 fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
250 let v: u32 = reader.read_int_text()?;
251 Ok(Value::Number(NumberValue::UInt32(v)))
252 }
253
254 fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
255 let v: u64 = reader.read_int_text()?;
256 Ok(Value::Number(NumberValue::UInt64(v)))
257 }
258
259 fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
260 let v: f32 = reader.read_float_text()?;
261 Ok(Value::Number(NumberValue::Float32(v)))
262 }
263
264 fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
265 let v: f64 = reader.read_float_text()?;
266 Ok(Value::Number(NumberValue::Float64(v)))
267 }
268
269 fn read_decimal<R: AsRef<[u8]>>(
270 &self,
271 size: &DecimalSize,
272 reader: &mut Cursor<R>,
273 ) -> Result<Value> {
274 let buf = reader.fill_buf()?;
275 let (n_in, _) = collect_number(buf);
278 let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
279 let d = parse_decimal(v, *size)?;
280 reader.consume(n_in);
281 Ok(Value::Number(d))
282 }
283
284 fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
285 let mut buf = Vec::new();
286 if reader.read_quoted_text(&mut buf, b'"').is_err() {
287 reader.read_quoted_text(&mut buf, b'\'')?;
288 }
289 Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
290 }
291
292 fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
293 let buf = reader.fill_buf()?;
294 let n = collect_binary_number(buf);
295 let v = buf[..n].to_vec();
296 reader.consume(n);
297 Ok(Value::Binary(hex::decode(v)?))
298 }
299
300 fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
301 let mut buf = Vec::new();
302 if reader.read_quoted_text(&mut buf, b'"').is_err() {
303 reader.read_quoted_text(&mut buf, b'\'')?;
304 }
305 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
306 let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
307 Ok(Value::Date(days))
308 }
309
310 fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
311 let mut buf = Vec::new();
312 if reader.read_quoted_text(&mut buf, b'"').is_err() {
313 reader.read_quoted_text(&mut buf, b'\'')?;
314 }
315 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
316 parse_timestamp(v, &self.timezone)
317 }
318
319 fn read_timestamp_tz<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
320 let mut buf = Vec::new();
321 if reader.read_quoted_text(&mut buf, b'"').is_err() {
322 reader.read_quoted_text(&mut buf, b'\'')?;
323 }
324 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
325 let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v)?;
326 Ok(Value::TimestampTz(t))
327 }
328
329 fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
330 let mut buf = Vec::new();
331 if reader.read_quoted_text(&mut buf, b'"').is_err() {
332 reader.read_quoted_text(&mut buf, b'\'')?;
333 }
334 Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
335 }
336
337 fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
338 let mut buf = Vec::new();
339 if reader.read_quoted_text(&mut buf, b'"').is_err() {
340 reader.read_quoted_text(&mut buf, b'\'')?;
341 }
342 Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
343 }
344
345 fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
346 if let Ok(val) = self.read_json(reader) {
347 Ok(Value::Variant(val))
348 } else {
349 let mut buf = Vec::new();
350 reader.read_quoted_text(&mut buf, b'\'')?;
351 Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
352 }
353 }
354
355 fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
356 let mut buf = Vec::new();
357 if reader.read_quoted_text(&mut buf, b'"').is_ok()
358 || reader.read_quoted_text(&mut buf, b'\'').is_ok()
359 {
360 Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
361 } else {
362 let val = self.read_json(reader)?;
363 Ok(Value::Geometry(val))
364 }
365 }
366
367 fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
368 let mut buf = Vec::new();
369 if reader.read_quoted_text(&mut buf, b'"').is_ok()
370 || reader.read_quoted_text(&mut buf, b'\'').is_ok()
371 {
372 Ok(Value::Geography(unsafe {
373 String::from_utf8_unchecked(buf)
374 }))
375 } else {
376 let val = self.read_json(reader)?;
377 Ok(Value::Geography(val))
378 }
379 }
380
381 fn read_nullable<R: AsRef<[u8]>>(
382 &self,
383 ty: &DataType,
384 reader: &mut Cursor<R>,
385 ) -> Result<Value> {
386 match self.read_null(reader) {
387 Ok(val) => Ok(val),
388 Err(_) => self.read_field(ty, reader),
389 }
390 }
391
392 fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
393 reader.must_ignore_byte(b'[')?;
394 reader.must_ignore_byte(b']')?;
395 Ok(Value::EmptyArray)
396 }
397
398 fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
399 reader.must_ignore_byte(b'{')?;
400 reader.must_ignore_byte(b'}')?;
401 Ok(Value::EmptyArray)
402 }
403
404 fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
405 let mut vals = Vec::new();
406 reader.must_ignore_byte(b'[')?;
407 for idx in 0.. {
408 let _ = reader.ignore_white_spaces();
409 if reader.ignore_byte(b']') {
410 break;
411 }
412 if idx != 0 {
413 reader.must_ignore_byte(b',')?;
414 }
415 let _ = reader.ignore_white_spaces();
416 let val = self.read_field(ty, reader)?;
417 vals.push(val);
418 }
419 Ok(Value::Array(vals))
420 }
421
422 fn read_vector<R: AsRef<[u8]>>(
423 &self,
424 dimension: usize,
425 reader: &mut Cursor<R>,
426 ) -> Result<Value> {
427 let mut vals = Vec::with_capacity(dimension);
428 reader.must_ignore_byte(b'[')?;
429 for idx in 0..dimension {
430 let _ = reader.ignore_white_spaces();
431 if idx > 0 {
432 reader.must_ignore_byte(b',')?;
433 }
434 let _ = reader.ignore_white_spaces();
435 let val: f32 = reader.read_float_text()?;
436 vals.push(val);
437 }
438 reader.must_ignore_byte(b']')?;
439 Ok(Value::Vector(vals))
440 }
441
442 fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
443 const KEY: usize = 0;
444 const VALUE: usize = 1;
445 let mut kvs = Vec::new();
446 reader.must_ignore_byte(b'{')?;
447 match ty {
448 DataType::Tuple(inner_tys) => {
449 for idx in 0.. {
450 let _ = reader.ignore_white_spaces();
451 if reader.ignore_byte(b'}') {
452 break;
453 }
454 if idx != 0 {
455 reader.must_ignore_byte(b',')?;
456 }
457 let _ = reader.ignore_white_spaces();
458 let key = self.read_field(&inner_tys[KEY], reader)?;
459 let _ = reader.ignore_white_spaces();
460 reader.must_ignore_byte(b':')?;
461 let _ = reader.ignore_white_spaces();
462 let val = self.read_field(&inner_tys[VALUE], reader)?;
463 kvs.push((key, val));
464 }
465 Ok(Value::Map(kvs))
466 }
467 _ => unreachable!(),
468 }
469 }
470
471 fn read_tuple<R: AsRef<[u8]>>(
472 &self,
473 tys: &[DataType],
474 reader: &mut Cursor<R>,
475 ) -> Result<Value> {
476 let mut vals = Vec::new();
477 reader.must_ignore_byte(b'(')?;
478 for (idx, ty) in tys.iter().enumerate() {
479 let _ = reader.ignore_white_spaces();
480 if idx != 0 {
481 reader.must_ignore_byte(b',')?;
482 }
483 let _ = reader.ignore_white_spaces();
484 let val = self.read_field(ty, reader)?;
485 vals.push(val);
486 }
487 reader.must_ignore_byte(b')')?;
488 Ok(Value::Tuple(vals))
489 }
490
491 fn read_json<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<String> {
492 let start = reader.position() as usize;
493 let data = reader.get_ref().as_ref();
494 let mut deserializer = Deserializer::from_slice(&data[start..]);
495 let raw: Box<RawValue> = Box::<RawValue>::deserialize(&mut deserializer)?;
496 reader.set_position((start + raw.get().len()) as u64);
497 Ok(raw.to_string())
498 }
499}
500
501fn parse_timestamp(ts_string: &str, tz: &TimeZone) -> Result<Value> {
502 let local = JiffDateTime::strptime(TIMESTAMP_FORMAT, ts_string)?;
503 let dt_with_tz = local.to_zoned(tz.clone()).map_err(|e| {
504 Error::Parsing(format!(
505 "time {ts_string} not exists in timezone {tz:?}: {e}"
506 ))
507 })?;
508 Ok(Value::Timestamp(dt_with_tz))
509}
510
511fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
512 let mut start = 0;
513 let bytes = text.as_bytes();
514 let mut is_negative = false;
515
516 if bytes[start] == b'-' {
518 is_negative = true;
519 start += 1;
520 }
521
522 while start < text.len() && bytes[start] == b'0' {
523 start += 1
524 }
525 let text = &text[start..];
526 let point_pos = text.find('.');
527 let e_pos = text.find(|c| ['E', 'e'].contains(&c));
528 let (i_part, f_part, e_part) = match (point_pos, e_pos) {
529 (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
530 (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
531 (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
532 (None, None) => (text, "", None),
533 };
534 let exp = match e_part {
535 Some(s) => s.parse::<i32>()?,
536 None => 0,
537 };
538 if i_part.len() as i32 + exp > 76 {
539 Err(ConvertError::new("decimal", format!("{text:?}")).into())
540 } else {
541 let mut digits = Vec::with_capacity(76);
542 digits.extend_from_slice(i_part.as_bytes());
543 digits.extend_from_slice(f_part.as_bytes());
544 if digits.is_empty() {
545 digits.push(b'0')
546 }
547 let scale = f_part.len() as i32 - exp;
548 if scale < 0 {
549 for _ in 0..(-scale) {
551 digits.push(b'0')
552 }
553 };
554
555 let precision = std::cmp::min(digits.len(), 76);
556 let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
557
558 let result = if size.precision > 38 {
559 NumberValue::Decimal256(i256::from_str(digits).unwrap(), size)
560 } else if size.precision > 19 {
561 NumberValue::Decimal128(digits.parse::<i128>()?, size)
562 } else {
563 NumberValue::Decimal64(digits.parse::<i64>()?, size)
564 };
565
566 if is_negative {
568 match result {
569 NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
570 NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
571 NumberValue::Decimal64(val, size) => Ok(NumberValue::Decimal64(-val, size)),
572 _ => Ok(result),
573 }
574 } else {
575 Ok(result)
576 }
577 }
578}