1use std::{
4 collections::HashMap,
5 io::{Read, Write},
6 str::FromStr,
7};
8
9use ethnum::{I256, U256};
10use uuid::Uuid;
11
12use crate::{
13 error::Error,
14 query::QueryData,
15 value::{Type, Value},
16};
17
18use super::Formatter;
19
20#[cfg(test)]
21mod tests;
22
23#[derive(Debug, Clone, Default)]
25pub struct RowBinFormatter {
26 with_names: bool,
28 with_types: bool,
30}
31
32impl RowBinFormatter {
33 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn with_names() -> Self {
40 Self {
41 with_names: true,
42 with_types: false,
43 }
44 }
45
46 pub fn with_names_and_types() -> Self {
48 Self {
49 with_names: true,
50 with_types: true,
51 }
52 }
53}
54
55impl Formatter for RowBinFormatter {
56 fn serialize_value(&self, value: Value) -> Vec<u8> {
57 self.format_value(value)
58 }
59
60 fn serialize_query_data(&self, data: QueryData) -> Result<Vec<u8>, Error> {
61 self.format_data(data)
62 }
63
64 fn deserialize_value(&self, bytes: &[u8], ty: Type) -> Result<Value, Error> {
65 let mut bytes = bytes;
66 let value = self.parse_value(&mut bytes, ty)?;
67 if !bytes.is_empty() {
68 return Err(Error::new("Value bytes has remaining bytes"));
69 }
70 Ok(value)
71 }
72
73 fn deserialize_query_data(
74 &self,
75 bytes: &[u8],
76 mapping: Option<&[(&str, Type)]>,
77 ) -> Result<QueryData, Error> {
78 let mut bytes = bytes;
79 self.parse_data(&mut bytes, mapping)
80 }
81}
82
83impl RowBinFormatter {
84 #[allow(clippy::only_used_in_recursion)]
86 fn format_value(&self, value: Value) -> Vec<u8> {
87 macro_rules! impl_nullable {
89 ($VAL:tt, $VAR:ident) => {
90 match $VAL {
91 Some(v) => {
92 let mut buf = vec![0x00];
93 let mut bytes = self.format_value(Value::$VAR(v));
94 buf.append(&mut bytes);
95 buf
96 }
97 None => vec![0x01],
98 }
99 };
100 }
101
102 match value {
103 Value::UInt8(v) => v.to_le_bytes().to_vec(),
104 Value::UInt16(v) => v.to_le_bytes().to_vec(),
105 Value::UInt32(v) => v.to_le_bytes().to_vec(),
106 Value::UInt64(v) => v.to_le_bytes().to_vec(),
107 Value::UInt128(v) => v.to_le_bytes().to_vec(),
108 Value::UInt256(_) => {
109 let u256: U256 = value.try_into().unwrap();
110 u256.to_le_bytes().to_vec()
111 }
112 Value::Int8(v) => v.to_le_bytes().to_vec(),
113 Value::Int16(v) => v.to_le_bytes().to_vec(),
114 Value::Int32(v) => v.to_le_bytes().to_vec(),
115 Value::Int64(v) => v.to_le_bytes().to_vec(),
116 Value::Int128(v) => v.to_le_bytes().to_vec(),
117 Value::Int256(_) => {
118 let i256: I256 = value.try_into().unwrap();
119 i256.to_le_bytes().to_vec()
120 }
121 Value::Float32(v) => v.to_le_bytes().to_vec(),
122 Value::Float64(v) => v.to_le_bytes().to_vec(),
123 Value::Bool(v) => {
124 if v {
125 vec![0x01]
126 } else {
127 vec![0x00]
128 }
129 }
130 Value::String(v) => {
131 let mut buf = vec![];
132 leb128::write::unsigned(&mut buf, v.len() as u64).unwrap();
133 buf.write_all(v.as_bytes()).unwrap();
134 buf
135 }
136 Value::UUID(v) => {
137 let (w1, w2) = Uuid::from_bytes(v).as_u64_pair();
139 let mut buf = w1.to_le_bytes().to_vec();
140 buf.append(&mut w2.to_le_bytes().to_vec());
141 buf
142 }
143 Value::Date(v) => v.to_le_bytes().to_vec(),
144 Value::Date32(v) => v.to_le_bytes().to_vec(),
145 Value::DateTime(v) => v.to_le_bytes().to_vec(),
146 Value::DateTime64(v) => v.to_le_bytes().to_vec(),
147 Value::Enum8(v) => v.to_le_bytes().to_vec(),
148 Value::Enum16(v) => v.to_le_bytes().to_vec(),
149 Value::Array(v) => {
150 let mut buf = vec![];
151 leb128::write::unsigned(&mut buf, v.len() as u64).unwrap();
152 let values = v
153 .into_iter()
154 .flat_map(|value| self.format_value(value))
155 .collect::<Vec<_>>();
156 buf.write_all(&values).unwrap();
157 buf
158 }
159 Value::Tuple(v) => v
160 .into_iter()
161 .flat_map(|value| self.format_value(value))
162 .collect::<Vec<_>>(),
163 Value::Map(map) => {
164 let mut buf = vec![];
165 leb128::write::unsigned(&mut buf, map.len() as u64).unwrap();
166 for (k, v) in map {
167 let mut key_bytes = self.format_value(Value::String(k));
168 let mut val_bytes = self.format_value(v);
169 buf.append(&mut key_bytes);
170 buf.append(&mut val_bytes);
171 }
172 buf
173 }
174 Value::Nested(fields) => {
175 let mut buf = vec![];
176 for (k, v) in fields {
177 let mut key_bytes = self.format_value(Value::String(k));
178 let mut val_bytes = self.format_value(v);
179 buf.append(&mut key_bytes);
180 buf.append(&mut val_bytes);
181 }
182 buf
183 }
184 Value::NullableUInt8(v) => impl_nullable!(v, UInt8),
185 Value::NullableUInt16(v) => impl_nullable!(v, UInt16),
186 Value::NullableUInt32(v) => impl_nullable!(v, UInt32),
187 Value::NullableUInt64(v) => impl_nullable!(v, UInt64),
188 Value::NullableUInt128(v) => impl_nullable!(v, UInt128),
189 Value::NullableUInt256(v) => impl_nullable!(v, UInt256),
190 Value::NullableInt8(v) => impl_nullable!(v, Int8),
191 Value::NullableInt16(v) => impl_nullable!(v, Int16),
192 Value::NullableInt32(v) => impl_nullable!(v, Int32),
193 Value::NullableInt64(v) => impl_nullable!(v, Int64),
194 Value::NullableInt128(v) => impl_nullable!(v, Int128),
195 Value::NullableInt256(v) => impl_nullable!(v, Int256),
196 Value::NullableFloat32(v) => impl_nullable!(v, Float32),
197 Value::NullableFloat64(v) => impl_nullable!(v, Float64),
198 Value::NullableBool(v) => impl_nullable!(v, Bool),
199 Value::NullableString(v) => impl_nullable!(v, String),
200 Value::NullableUUID(v) => impl_nullable!(v, UUID),
201 Value::NullableDate(v) => impl_nullable!(v, Date),
202 Value::NullableDate32(v) => impl_nullable!(v, Date32),
203 Value::NullableDateTime(v) => impl_nullable!(v, DateTime),
204 Value::NullableDateTime64(v) => impl_nullable!(v, DateTime64),
205 Value::NullableEnum8(v) => impl_nullable!(v, Enum8),
206 Value::NullableEnum16(v) => impl_nullable!(v, Enum16),
207 }
208 }
209
210 fn format_data(&self, data: QueryData) -> Result<Vec<u8>, Error> {
212 let mut buf = vec![];
213 let parts = data.into_parts();
214
215 if self.with_names {
217 if let Some(names) = parts.names {
218 leb128::write::unsigned(&mut buf, names.len().try_into()?).unwrap();
219 for name in names {
220 let bytes = self.format_value(Value::String(name));
221 buf.write_all(&bytes)?;
222 }
223 } else {
224 return Err(Error::new("Table is missing the column names"));
225 }
226 }
227
228 if self.with_types {
230 if let Some(types) = parts.types {
231 let types = types.into_iter().map(|t| t.to_string()).collect::<Vec<_>>();
232 leb128::write::unsigned(&mut buf, types.len().try_into()?).unwrap();
233 for ty in types {
234 let bytes = self.format_value(Value::String(ty));
235 buf.write_all(&bytes)?;
236 }
237 } else {
238 return Err(Error::new("Table is missing the column types"));
239 }
240 }
241
242 for row in parts.rows {
243 for value in row {
244 let bytes = self.format_value(value);
245 buf.write_all(&bytes)?;
246 }
247 }
248
249 Ok(buf)
250 }
251
252 fn parse_value(&self, bytes: &mut &[u8], ty: Type) -> Result<Value, Error> {
254 macro_rules! impl_nullable {
256 ($NULL_TY:tt, $TY:expr) => {{
257 let mut buf = [0x00_u8; 1];
258 bytes.read_exact(&mut buf)?;
259 match buf {
260 [0x01] => Ok(Value::$NULL_TY(None)),
261 [0x00] => match self.parse_value(bytes, $TY)?.into_nullable() {
262 Some(v) => Ok(v),
263 None => Err(Error::new("Invalid nullable value")),
264 },
265 _ => Err(Error::new("Invalid nullable value")),
266 }
267 }};
268 }
269
270 match ty {
271 Type::UInt8 => {
272 let mut buf = [0x00_u8; 1];
273 bytes.read_exact(&mut buf)?;
274 let v = u8::from_le_bytes(buf);
275 Ok(Value::UInt8(v))
276 }
277 Type::UInt16 => {
278 let mut buf = [0x00_u8; 2];
279 bytes.read_exact(&mut buf)?;
280 let v = u16::from_le_bytes(buf);
281 Ok(Value::UInt16(v))
282 }
283 Type::UInt32 => {
284 let mut buf = [0x00_u8; 4];
285 bytes.read_exact(&mut buf)?;
286 let v = u32::from_le_bytes(buf);
287 Ok(Value::UInt32(v))
288 }
289 Type::UInt64 => {
290 let mut buf = [0x00_u8; 8];
291 bytes.read_exact(&mut buf)?;
292 let v = u64::from_le_bytes(buf);
293 Ok(Value::UInt64(v))
294 }
295 Type::UInt128 => {
296 let mut buf = [0x00_u8; 16];
297 bytes.read_exact(&mut buf)?;
298 let v = u128::from_le_bytes(buf);
299 Ok(Value::UInt128(v))
300 }
301 Type::UInt256 => {
302 let mut buf = [0x00_u8; 32];
303 bytes.read_exact(&mut buf)?;
304 let v = U256::from_le_bytes(buf);
305 Ok(Value::UInt256(v.into_words().into()))
306 }
307 Type::Int8 => {
308 let mut buf = [0x00_u8; 1];
309 bytes.read_exact(&mut buf)?;
310 let v = i8::from_le_bytes(buf);
311 Ok(Value::Int8(v))
312 }
313 Type::Int16 => {
314 let mut buf = [0x00_u8; 2];
315 bytes.read_exact(&mut buf)?;
316 let v = i16::from_le_bytes(buf);
317 Ok(Value::Int16(v))
318 }
319 Type::Int32 => {
320 let mut buf = [0x00_u8; 4];
321 bytes.read_exact(&mut buf)?;
322 let v = i32::from_le_bytes(buf);
323 Ok(Value::Int32(v))
324 }
325 Type::Int64 => {
326 let mut buf = [0x00_u8; 8];
327 bytes.read_exact(&mut buf)?;
328 let v = i64::from_le_bytes(buf);
329 Ok(Value::Int64(v))
330 }
331 Type::Int128 => {
332 let mut buf = [0x00_u8; 16];
333 bytes.read_exact(&mut buf)?;
334 let v = i128::from_le_bytes(buf);
335 Ok(Value::Int128(v))
336 }
337 Type::Int256 => {
338 let mut buf = [0x00_u8; 32];
339 bytes.read_exact(&mut buf)?;
340 let v = I256::from_le_bytes(buf);
341 Ok(Value::Int256(v.into_words().into()))
342 }
343 Type::Float32 => {
344 let mut buf = [0x00_u8; 4];
345 bytes.read_exact(&mut buf)?;
346 let v = f32::from_le_bytes(buf);
347 Ok(Value::Float32(v))
348 }
349 Type::Float64 => {
350 let mut buf = [0x00_u8; 8];
351 bytes.read_exact(&mut buf)?;
352 let v = f64::from_le_bytes(buf);
353 Ok(Value::Float64(v))
354 }
355 Type::Decimal(_, _) => {
356 unimplemented!("RowBinary format Decimal")
357 }
358 Type::Decimal32(_) => {
359 unimplemented!("RowBinary format Decimal32")
360 }
361 Type::Decimal64(_) => {
362 unimplemented!("RowBinary format Decimal64")
363 }
364 Type::Decimal128(_) => {
365 unimplemented!("RowBinary format Decimal128")
366 }
367 Type::Decimal256(_) => {
368 unimplemented!("RowBinary format Decimal256")
369 }
370 Type::Bool => {
371 let mut buf = [0x00_u8; 1];
372 bytes.read_exact(&mut buf)?;
373 match buf {
374 [0x00] => Ok(Value::Bool(false)),
375 [0x01] => Ok(Value::Bool(true)),
376 _ => Err(Error::new("Invalid bool value")),
377 }
378 }
379 Type::String => {
380 let n: usize = leb128::read::unsigned(bytes)?.try_into()?;
381 let mut buf = vec![0x00_u8; n];
382 bytes.read_exact(&mut buf)?;
383 let s = String::from_utf8(buf)?;
384 Ok(Value::String(s))
385 }
386 Type::FixedString(n) => {
387 let mut buf = vec![0x00_u8; n.into()];
388 bytes.read_exact(&mut buf)?;
389 let s = String::from_utf8(buf)?;
390 Ok(Value::String(s))
391 }
392 Type::UUID => {
393 let mut buf = [0x00_u8; 8];
395 bytes.read_exact(&mut buf)?;
396 let w1 = u64::from_le_bytes(buf);
397 bytes.read_exact(&mut buf)?;
398 let w2 = u64::from_le_bytes(buf);
399 let uuid = Uuid::from_u64_pair(w1, w2);
400 Ok(Value::UUID(uuid.into_bytes()))
401 }
402 Type::Date => {
403 let mut buf = [0x00_u8; 2];
404 bytes.read_exact(&mut buf)?;
405 let v = u16::from_le_bytes(buf);
406 Ok(Value::Date(v))
407 }
408 Type::Date32 => {
409 let mut buf = [0x00_u8; 4];
410 bytes.read_exact(&mut buf)?;
411 let v = i32::from_le_bytes(buf);
412 Ok(Value::Date32(v))
413 }
414 Type::DateTime => {
415 let mut buf = [0x00_u8; 4];
416 bytes.read_exact(&mut buf)?;
417 let v = u32::from_le_bytes(buf);
418 Ok(Value::DateTime(v))
419 }
420 Type::DateTime64(_) => {
421 let mut buf = [0x00_u8; 8];
422 bytes.read_exact(&mut buf)?;
423 let v = i64::from_le_bytes(buf);
424 Ok(Value::DateTime64(v))
425 }
426 Type::Enum8(_) => {
427 let mut buf = [0x00_u8; 1];
428 bytes.read_exact(&mut buf)?;
429 let v = i8::from_le_bytes(buf);
430 Ok(Value::Enum8(v))
431 }
432 Type::Enum16(_) => {
433 let mut buf = [0x00_u8; 2];
434 bytes.read_exact(&mut buf)?;
435 let v = i16::from_le_bytes(buf);
436 Ok(Value::Enum16(v))
437 }
438 Type::Array(ty) => {
439 let mut values = vec![];
440 let n = leb128::read::unsigned(bytes)?;
441 for _ in 0..n {
442 let value = self.parse_value(bytes, (*ty).clone())?;
443 values.push(value);
444 }
445 Ok(Value::Array(values))
446 }
447 Type::Tuple(types) => {
448 let mut values = vec![];
449 for ty in types {
450 let value = self.parse_value(bytes, ty)?;
451 values.push(value);
452 }
453 Ok(Value::Tuple(values))
454 }
455 Type::Map(_ty_key, ty_val) => {
456 let mut map = HashMap::new();
457 let n = leb128::read::unsigned(bytes)?;
458 for _ in 0..n {
459 let key = self.parse_value_str(bytes)?;
460 let value = self.parse_value(bytes, (*ty_val).clone())?;
461 map.insert(key, value);
462 }
463 Ok(Value::Map(map))
464 }
465 Type::Nested(fields) => {
466 let mut map = HashMap::new();
467 for (_name, ty) in fields {
468 let key = self.parse_value_str(bytes)?;
469 let value = self.parse_value(bytes, ty)?;
470 map.insert(key, value);
471 }
472 Ok(Value::Nested(map))
473 }
474 Type::NullableUInt8 => impl_nullable!(NullableUInt8, Type::UInt8),
475 Type::NullableUInt16 => impl_nullable!(NullableUInt16, Type::UInt16),
476 Type::NullableUInt32 => impl_nullable!(NullableUInt32, Type::UInt32),
477 Type::NullableUInt64 => impl_nullable!(NullableUInt64, Type::UInt64),
478 Type::NullableUInt128 => impl_nullable!(NullableUInt128, Type::UInt128),
479 Type::NullableUInt256 => impl_nullable!(NullableUInt256, Type::UInt256),
480 Type::NullableInt8 => impl_nullable!(NullableInt8, Type::Int8),
481 Type::NullableInt16 => impl_nullable!(NullableInt16, Type::Int16),
482 Type::NullableInt32 => impl_nullable!(NullableInt32, Type::Int32),
483 Type::NullableInt64 => impl_nullable!(NullableInt64, Type::Int64),
484 Type::NullableInt128 => impl_nullable!(NullableInt128, Type::Int128),
485 Type::NullableInt256 => impl_nullable!(NullableInt256, Type::Int256),
486 Type::NullableFloat32 => impl_nullable!(NullableFloat32, Type::Float32),
487 Type::NullableFloat64 => impl_nullable!(NullableFloat64, Type::Float64),
488 Type::NullableDecimal(_, _) => unimplemented!("RowBinary format Decimal"),
489 Type::NullableDecimal32(_) => unimplemented!("RowBinary format Decimal32"),
490 Type::NullableDecimal64(_) => unimplemented!("RowBinary format Decimal64"),
491 Type::NullableDecimal128(_) => unimplemented!("RowBinary format Decimal128"),
492 Type::NullableDecimal256(_) => unimplemented!("RowBinary format Decimal256"),
493 Type::NullableBool => impl_nullable!(NullableBool, Type::Bool),
494 Type::NullableString => impl_nullable!(NullableString, Type::String),
495 Type::NullableFixedString(n) => impl_nullable!(NullableString, Type::FixedString(n)),
496 Type::NullableUUID => impl_nullable!(NullableUUID, Type::UUID),
497 Type::NullableDate => impl_nullable!(NullableDate, Type::Date),
498 Type::NullableDate32 => impl_nullable!(NullableDate32, Type::Date32),
499 Type::NullableDateTime => impl_nullable!(NullableDateTime, Type::DateTime),
500 Type::NullableDateTime64(p) => impl_nullable!(NullableDateTime64, Type::DateTime64(p)),
501 Type::NullableEnum8(variants) => impl_nullable!(NullableEnum8, Type::Enum8(variants)),
502 Type::NullableEnum16(variants) => {
503 impl_nullable!(NullableEnum16, Type::Enum16(variants))
504 }
505 }
506 }
507
508 fn parse_value_str(&self, bytes: &mut &[u8]) -> Result<String, Error> {
510 let n: usize = leb128::read::unsigned(bytes)?.try_into()?;
511 let mut buf = vec![0x00_u8; n];
512 bytes.read_exact(&mut buf)?;
513 Ok(String::from_utf8(buf)?)
514 }
515
516 fn parse_data(
517 &self,
518 bytes: &mut &[u8],
519 mapping: Option<&[(&str, Type)]>,
520 ) -> Result<QueryData, Error> {
521 let mut data = if self.with_names {
523 let n = leb128::read::unsigned(bytes).unwrap().try_into()?;
524 let mut names = vec![];
525 for _i in 0..n {
526 let name = self.parse_value_str(bytes)?;
527 names.push(name);
528 }
529
530 if self.with_types {
531 let mut types = vec![];
532 let mut names_and_types = vec![];
533 for i in 0..n {
534 let ty_str = self.parse_value_str(bytes)?;
535 let ty = Type::from_str(&ty_str)?;
536 types.push(ty.clone());
537 let name = names.get(i).ok_or(Error::new("Missing column name"))?;
538 names_and_types.push((name.as_str(), ty));
539 }
540 QueryData::with_names_and_types(names_and_types)
541 } else {
542 let names = names.iter().map(String::as_str).collect();
543 QueryData::with_names(names)
544 }
545 } else {
546 QueryData::no_headers()
547 };
548
549 let types = if let Some(types) = data.get_types() {
551 types
552 } else if let Some(mapping) = mapping {
553 mapping.iter().map(|(_, t)| t.clone()).collect()
554 } else {
555 return Err(Error::new("Deserializing data requires a mapping table"));
556 };
557
558 while !bytes.is_empty() {
560 let mut row = vec![];
562 for ty in &types {
563 let value = self.parse_value(bytes, ty.clone())?;
564 row.push(value);
565 }
566 data.add_row(row);
567 }
568
569 Ok(data)
570 }
571}