1use super::{NumberValue, Value, DAYS_FROM_CE, TIMESTAMP_FORMAT, TIMESTAMP_TIMEZONE_FORMAT};
16use crate::_macro_internal::Error;
17use crate::cursor_ext::{
18 collect_binary_number, collect_number, BufferReadStringExt, ReadBytesExt, ReadCheckPointExt,
19 ReadNumberExt,
20};
21use crate::error::{ConvertError, Result};
22use chrono::{Datelike, NaiveDate};
23use databend_client::schema::{DataType, DecimalDataType, DecimalSize, NumberDataType};
24use ethnum::i256;
25use hex;
26use jiff::{civil::DateTime as JiffDateTime, tz::TimeZone, Zoned};
27use std::io::{BufRead, Cursor};
28use std::str::FromStr;
29
30const NULL_VALUE: &str = "NULL";
31const TRUE_VALUE: &str = "1";
32const FALSE_VALUE: &str = "0";
33
34impl TryFrom<(&DataType, Option<String>, &TimeZone)> for Value {
35 type Error = Error;
36
37 fn try_from((t, v, tz): (&DataType, Option<String>, &TimeZone)) -> Result<Self> {
38 match v {
39 Some(v) => Self::try_from((t, v, tz)),
40 None => match t {
41 DataType::Null => Ok(Self::Null),
42 DataType::Nullable(_) => Ok(Self::Null),
43 _ => Err(Error::InvalidResponse(
44 "NULL value for non-nullable field".to_string(),
45 )),
46 },
47 }
48 }
49}
50
51impl TryFrom<(&DataType, String, &TimeZone)> for Value {
52 type Error = Error;
53
54 fn try_from((t, v, tz): (&DataType, String, &TimeZone)) -> Result<Self> {
55 match t {
56 DataType::Null => Ok(Self::Null),
57 DataType::EmptyArray => Ok(Self::EmptyArray),
58 DataType::EmptyMap => Ok(Self::EmptyMap),
59 DataType::Boolean => Ok(Self::Boolean(v == "1")),
60 DataType::Binary => Ok(Self::Binary(hex::decode(v)?)),
61 DataType::String => Ok(Self::String(v)),
62 DataType::Number(NumberDataType::Int8) => {
63 Ok(Self::Number(NumberValue::Int8(v.parse()?)))
64 }
65 DataType::Number(NumberDataType::Int16) => {
66 Ok(Self::Number(NumberValue::Int16(v.parse()?)))
67 }
68 DataType::Number(NumberDataType::Int32) => {
69 Ok(Self::Number(NumberValue::Int32(v.parse()?)))
70 }
71 DataType::Number(NumberDataType::Int64) => {
72 Ok(Self::Number(NumberValue::Int64(v.parse()?)))
73 }
74 DataType::Number(NumberDataType::UInt8) => {
75 Ok(Self::Number(NumberValue::UInt8(v.parse()?)))
76 }
77 DataType::Number(NumberDataType::UInt16) => {
78 Ok(Self::Number(NumberValue::UInt16(v.parse()?)))
79 }
80 DataType::Number(NumberDataType::UInt32) => {
81 Ok(Self::Number(NumberValue::UInt32(v.parse()?)))
82 }
83 DataType::Number(NumberDataType::UInt64) => {
84 Ok(Self::Number(NumberValue::UInt64(v.parse()?)))
85 }
86 DataType::Number(NumberDataType::Float32) => {
87 Ok(Self::Number(NumberValue::Float32(v.parse()?)))
88 }
89 DataType::Number(NumberDataType::Float64) => {
90 Ok(Self::Number(NumberValue::Float64(v.parse()?)))
91 }
92 DataType::Decimal(DecimalDataType::Decimal64(size)) => {
93 let d = parse_decimal(v.as_str(), *size)?;
94 Ok(Self::Number(d))
95 }
96 DataType::Decimal(DecimalDataType::Decimal128(size)) => {
97 let d = parse_decimal(v.as_str(), *size)?;
98 Ok(Self::Number(d))
99 }
100 DataType::Decimal(DecimalDataType::Decimal256(size)) => {
101 let d = parse_decimal(v.as_str(), *size)?;
102 Ok(Self::Number(d))
103 }
104 DataType::Timestamp => parse_timestamp(v.as_str(), tz),
105 DataType::TimestampTz => {
106 let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v.as_str())?;
107 Ok(Self::TimestampTz(t))
108 }
109 DataType::Date => Ok(Self::Date(
110 NaiveDate::parse_from_str(v.as_str(), "%Y-%m-%d")?.num_days_from_ce()
111 - DAYS_FROM_CE,
112 )),
113 DataType::Bitmap => Ok(Self::Bitmap(v)),
114 DataType::Variant => Ok(Self::Variant(v)),
115 DataType::Geometry => Ok(Self::Geometry(v)),
116 DataType::Geography => Ok(Self::Geography(v)),
117 DataType::Interval => Ok(Self::Interval(v)),
118 DataType::Array(_) | DataType::Map(_) | DataType::Tuple(_) | DataType::Vector(_) => {
119 let mut reader = Cursor::new(v.as_str());
120 let decoder = ValueDecoder {
121 timezone: tz.clone(),
122 };
123 decoder.read_field(t, &mut reader)
124 }
125 DataType::Nullable(inner) => match inner.as_ref() {
126 DataType::String => Ok(Self::String(v.to_string())),
127 _ => {
128 if v == NULL_VALUE {
131 Ok(Self::Null)
132 } else {
133 Self::try_from((inner.as_ref(), v, tz))
134 }
135 }
136 },
137 }
138 }
139}
140
141struct ValueDecoder {
142 pub timezone: TimeZone,
143}
144
145impl ValueDecoder {
146 pub(super) fn read_field<R: AsRef<[u8]>>(
147 &self,
148 ty: &DataType,
149 reader: &mut Cursor<R>,
150 ) -> Result<Value> {
151 match ty {
152 DataType::Null => self.read_null(reader),
153 DataType::EmptyArray => self.read_empty_array(reader),
154 DataType::EmptyMap => self.read_empty_map(reader),
155 DataType::Boolean => self.read_bool(reader),
156 DataType::Number(NumberDataType::Int8) => self.read_int8(reader),
157 DataType::Number(NumberDataType::Int16) => self.read_int16(reader),
158 DataType::Number(NumberDataType::Int32) => self.read_int32(reader),
159 DataType::Number(NumberDataType::Int64) => self.read_int64(reader),
160 DataType::Number(NumberDataType::UInt8) => self.read_uint8(reader),
161 DataType::Number(NumberDataType::UInt16) => self.read_uint16(reader),
162 DataType::Number(NumberDataType::UInt32) => self.read_uint32(reader),
163 DataType::Number(NumberDataType::UInt64) => self.read_uint64(reader),
164 DataType::Number(NumberDataType::Float32) => self.read_float32(reader),
165 DataType::Number(NumberDataType::Float64) => self.read_float64(reader),
166 DataType::Decimal(DecimalDataType::Decimal64(size)) => self.read_decimal(size, reader),
167 DataType::Decimal(DecimalDataType::Decimal128(size)) => self.read_decimal(size, reader),
168 DataType::Decimal(DecimalDataType::Decimal256(size)) => self.read_decimal(size, reader),
169 DataType::String => self.read_string(reader),
170 DataType::Binary => self.read_binary(reader),
171 DataType::Timestamp => self.read_timestamp(reader),
172 DataType::TimestampTz => self.read_timestamp_tz(reader),
173 DataType::Date => self.read_date(reader),
174 DataType::Bitmap => self.read_bitmap(reader),
175 DataType::Variant => self.read_variant(reader),
176 DataType::Geometry => self.read_geometry(reader),
177 DataType::Interval => self.read_interval(reader),
178 DataType::Geography => self.read_geography(reader),
179 DataType::Array(inner_ty) => self.read_array(inner_ty.as_ref(), reader),
180 DataType::Map(inner_ty) => self.read_map(inner_ty.as_ref(), reader),
181 DataType::Tuple(inner_tys) => self.read_tuple(inner_tys.as_ref(), reader),
182 DataType::Vector(dimension) => self.read_vector(*dimension as usize, reader),
183 DataType::Nullable(inner_ty) => self.read_nullable(inner_ty.as_ref(), reader),
184 }
185 }
186
187 fn match_bytes<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>, bs: &[u8]) -> bool {
188 let pos = reader.checkpoint();
189 if reader.ignore_bytes(bs) {
190 true
191 } else {
192 reader.rollback(pos);
193 false
194 }
195 }
196
197 fn read_null<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
198 if self.match_bytes(reader, NULL_VALUE.as_bytes()) {
199 Ok(Value::Null)
200 } else {
201 let buf = reader.fill_buf()?;
202 Err(ConvertError::new("null", String::from_utf8_lossy(buf).to_string()).into())
203 }
204 }
205
206 fn read_bool<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
207 if self.match_bytes(reader, TRUE_VALUE.as_bytes()) {
208 Ok(Value::Boolean(true))
209 } else if self.match_bytes(reader, FALSE_VALUE.as_bytes()) {
210 Ok(Value::Boolean(false))
211 } else {
212 let buf = reader.fill_buf()?;
213 Err(ConvertError::new("boolean", String::from_utf8_lossy(buf).to_string()).into())
214 }
215 }
216
217 fn read_int8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
218 let v: i8 = reader.read_int_text()?;
219 Ok(Value::Number(NumberValue::Int8(v)))
220 }
221
222 fn read_int16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
223 let v: i16 = reader.read_int_text()?;
224 Ok(Value::Number(NumberValue::Int16(v)))
225 }
226
227 fn read_int32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
228 let v: i32 = reader.read_int_text()?;
229 Ok(Value::Number(NumberValue::Int32(v)))
230 }
231
232 fn read_int64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
233 let v: i64 = reader.read_int_text()?;
234 Ok(Value::Number(NumberValue::Int64(v)))
235 }
236
237 fn read_uint8<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
238 let v: u8 = reader.read_int_text()?;
239 Ok(Value::Number(NumberValue::UInt8(v)))
240 }
241
242 fn read_uint16<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
243 let v: u16 = reader.read_int_text()?;
244 Ok(Value::Number(NumberValue::UInt16(v)))
245 }
246
247 fn read_uint32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
248 let v: u32 = reader.read_int_text()?;
249 Ok(Value::Number(NumberValue::UInt32(v)))
250 }
251
252 fn read_uint64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
253 let v: u64 = reader.read_int_text()?;
254 Ok(Value::Number(NumberValue::UInt64(v)))
255 }
256
257 fn read_float32<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
258 let v: f32 = reader.read_float_text()?;
259 Ok(Value::Number(NumberValue::Float32(v)))
260 }
261
262 fn read_float64<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
263 let v: f64 = reader.read_float_text()?;
264 Ok(Value::Number(NumberValue::Float64(v)))
265 }
266
267 fn read_decimal<R: AsRef<[u8]>>(
268 &self,
269 size: &DecimalSize,
270 reader: &mut Cursor<R>,
271 ) -> Result<Value> {
272 let buf = reader.fill_buf()?;
273 let (n_in, _) = collect_number(buf);
276 let v = unsafe { std::str::from_utf8_unchecked(&buf[..n_in]) };
277 let d = parse_decimal(v, *size)?;
278 reader.consume(n_in);
279 Ok(Value::Number(d))
280 }
281
282 fn read_string<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
283 let mut buf = Vec::new();
284 reader.read_quoted_text(&mut buf, b'\'')?;
285 Ok(Value::String(unsafe { String::from_utf8_unchecked(buf) }))
286 }
287
288 fn read_binary<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
289 let buf = reader.fill_buf()?;
290 let n = collect_binary_number(buf);
291 let v = buf[..n].to_vec();
292 reader.consume(n);
293 Ok(Value::Binary(hex::decode(v)?))
294 }
295
296 fn read_date<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
297 let mut buf = Vec::new();
298 reader.read_quoted_text(&mut buf, b'\'')?;
299 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
300 let days = NaiveDate::parse_from_str(v, "%Y-%m-%d")?.num_days_from_ce() - DAYS_FROM_CE;
301 Ok(Value::Date(days))
302 }
303
304 fn read_timestamp<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
305 let mut buf = Vec::new();
306 reader.read_quoted_text(&mut buf, b'\'')?;
307 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
308 parse_timestamp(v, &self.timezone)
309 }
310
311 fn read_timestamp_tz<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
312 let mut buf = Vec::new();
313 reader.read_quoted_text(&mut buf, b'\'')?;
314 let v = unsafe { std::str::from_utf8_unchecked(&buf) };
315 let t = Zoned::strptime(TIMESTAMP_TIMEZONE_FORMAT, v)?;
316 Ok(Value::TimestampTz(t))
317 }
318
319 fn read_interval<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
320 let mut buf = Vec::new();
321 reader.read_quoted_text(&mut buf, b'\'')?;
322 Ok(Value::Interval(unsafe { String::from_utf8_unchecked(buf) }))
323 }
324
325 fn read_bitmap<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
326 let mut buf = Vec::new();
327 reader.read_quoted_text(&mut buf, b'\'')?;
328 Ok(Value::Bitmap(unsafe { String::from_utf8_unchecked(buf) }))
329 }
330
331 fn read_variant<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
332 let mut buf = Vec::new();
333 reader.read_quoted_text(&mut buf, b'\'')?;
334 Ok(Value::Variant(unsafe { String::from_utf8_unchecked(buf) }))
335 }
336
337 fn read_geometry<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
338 let mut buf = Vec::new();
339 reader.read_quoted_text(&mut buf, b'\'')?;
340 Ok(Value::Geometry(unsafe { String::from_utf8_unchecked(buf) }))
341 }
342
343 fn read_geography<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
344 let mut buf = Vec::new();
345 reader.read_quoted_text(&mut buf, b'\'')?;
346 Ok(Value::Geography(unsafe {
347 String::from_utf8_unchecked(buf)
348 }))
349 }
350
351 fn read_nullable<R: AsRef<[u8]>>(
352 &self,
353 ty: &DataType,
354 reader: &mut Cursor<R>,
355 ) -> Result<Value> {
356 match self.read_null(reader) {
357 Ok(val) => Ok(val),
358 Err(_) => self.read_field(ty, reader),
359 }
360 }
361
362 fn read_empty_array<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
363 reader.must_ignore_byte(b'[')?;
364 reader.must_ignore_byte(b']')?;
365 Ok(Value::EmptyArray)
366 }
367
368 fn read_empty_map<R: AsRef<[u8]>>(&self, reader: &mut Cursor<R>) -> Result<Value> {
369 reader.must_ignore_byte(b'{')?;
370 reader.must_ignore_byte(b'}')?;
371 Ok(Value::EmptyArray)
372 }
373
374 fn read_array<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
375 let mut vals = Vec::new();
376 reader.must_ignore_byte(b'[')?;
377 for idx in 0.. {
378 let _ = reader.ignore_white_spaces();
379 if reader.ignore_byte(b']') {
380 break;
381 }
382 if idx != 0 {
383 reader.must_ignore_byte(b',')?;
384 }
385 let _ = reader.ignore_white_spaces();
386 let val = self.read_field(ty, reader)?;
387 vals.push(val);
388 }
389 Ok(Value::Array(vals))
390 }
391
392 fn read_vector<R: AsRef<[u8]>>(
393 &self,
394 dimension: usize,
395 reader: &mut Cursor<R>,
396 ) -> Result<Value> {
397 let mut vals = Vec::with_capacity(dimension);
398 reader.must_ignore_byte(b'[')?;
399 for idx in 0..dimension {
400 let _ = reader.ignore_white_spaces();
401 if idx > 0 {
402 reader.must_ignore_byte(b',')?;
403 }
404 let _ = reader.ignore_white_spaces();
405 let val: f32 = reader.read_float_text()?;
406 vals.push(val);
407 }
408 reader.must_ignore_byte(b']')?;
409 Ok(Value::Vector(vals))
410 }
411
412 fn read_map<R: AsRef<[u8]>>(&self, ty: &DataType, reader: &mut Cursor<R>) -> Result<Value> {
413 const KEY: usize = 0;
414 const VALUE: usize = 1;
415 let mut kvs = Vec::new();
416 reader.must_ignore_byte(b'{')?;
417 match ty {
418 DataType::Tuple(inner_tys) => {
419 for idx in 0.. {
420 let _ = reader.ignore_white_spaces();
421 if reader.ignore_byte(b'}') {
422 break;
423 }
424 if idx != 0 {
425 reader.must_ignore_byte(b',')?;
426 }
427 let _ = reader.ignore_white_spaces();
428 let key = self.read_field(&inner_tys[KEY], reader)?;
429 let _ = reader.ignore_white_spaces();
430 reader.must_ignore_byte(b':')?;
431 let _ = reader.ignore_white_spaces();
432 let val = self.read_field(&inner_tys[VALUE], reader)?;
433 kvs.push((key, val));
434 }
435 Ok(Value::Map(kvs))
436 }
437 _ => unreachable!(),
438 }
439 }
440
441 fn read_tuple<R: AsRef<[u8]>>(
442 &self,
443 tys: &[DataType],
444 reader: &mut Cursor<R>,
445 ) -> Result<Value> {
446 let mut vals = Vec::new();
447 reader.must_ignore_byte(b'(')?;
448 for (idx, ty) in tys.iter().enumerate() {
449 let _ = reader.ignore_white_spaces();
450 if idx != 0 {
451 reader.must_ignore_byte(b',')?;
452 }
453 let _ = reader.ignore_white_spaces();
454 let val = self.read_field(ty, reader)?;
455 vals.push(val);
456 }
457 reader.must_ignore_byte(b')')?;
458 Ok(Value::Tuple(vals))
459 }
460}
461
462fn parse_timestamp(ts_string: &str, tz: &TimeZone) -> Result<Value> {
463 let local = JiffDateTime::strptime(TIMESTAMP_FORMAT, ts_string)?;
464 let dt_with_tz = local.to_zoned(tz.clone()).map_err(|e| {
465 Error::Parsing(format!(
466 "time {ts_string} not exists in timezone {tz:?}: {e}"
467 ))
468 })?;
469 Ok(Value::Timestamp(dt_with_tz))
470}
471
472fn parse_decimal(text: &str, size: DecimalSize) -> Result<NumberValue> {
473 let mut start = 0;
474 let bytes = text.as_bytes();
475 let mut is_negative = false;
476
477 if bytes[start] == b'-' {
479 is_negative = true;
480 start += 1;
481 }
482
483 while start < text.len() && bytes[start] == b'0' {
484 start += 1
485 }
486 let text = &text[start..];
487 let point_pos = text.find('.');
488 let e_pos = text.find(|c| ['E', 'e'].contains(&c));
489 let (i_part, f_part, e_part) = match (point_pos, e_pos) {
490 (Some(p1), Some(p2)) => (&text[..p1], &text[(p1 + 1)..p2], Some(&text[(p2 + 1)..])),
491 (Some(p), None) => (&text[..p], &text[(p + 1)..], None),
492 (None, Some(p)) => (&text[..p], "", Some(&text[(p + 1)..])),
493 (None, None) => (text, "", None),
494 };
495 let exp = match e_part {
496 Some(s) => s.parse::<i32>()?,
497 None => 0,
498 };
499 if i_part.len() as i32 + exp > 76 {
500 Err(ConvertError::new("decimal", format!("{text:?}")).into())
501 } else {
502 let mut digits = Vec::with_capacity(76);
503 digits.extend_from_slice(i_part.as_bytes());
504 digits.extend_from_slice(f_part.as_bytes());
505 if digits.is_empty() {
506 digits.push(b'0')
507 }
508 let scale = f_part.len() as i32 - exp;
509 if scale < 0 {
510 for _ in 0..(-scale) {
512 digits.push(b'0')
513 }
514 };
515
516 let precision = std::cmp::min(digits.len(), 76);
517 let digits = unsafe { std::str::from_utf8_unchecked(&digits[..precision]) };
518
519 let result = if size.precision > 38 {
520 NumberValue::Decimal256(i256::from_str(digits).unwrap(), size)
521 } else if size.precision > 19 {
522 NumberValue::Decimal128(digits.parse::<i128>()?, size)
523 } else {
524 NumberValue::Decimal64(digits.parse::<i64>()?, size)
525 };
526
527 if is_negative {
529 match result {
530 NumberValue::Decimal256(val, size) => Ok(NumberValue::Decimal256(-val, size)),
531 NumberValue::Decimal128(val, size) => Ok(NumberValue::Decimal128(-val, size)),
532 NumberValue::Decimal64(val, size) => Ok(NumberValue::Decimal64(-val, size)),
533 _ => Ok(result),
534 }
535 } else {
536 Ok(result)
537 }
538 }
539}