clickhouse_client/query/fmt/rowbin/
mod.rs

1//! RowBin format
2
3use std::{
4    collections::HashMap,
5    io::{Read, Write},
6    str::FromStr,
7};
8
9use ethnum::{I256, U256};
10use uuid::Uuid;
11
12use crate::{
13    error::Error,
14    query::QueryData,
15    value::{Type, Value},
16};
17
18use super::Formatter;
19
20#[cfg(test)]
21mod tests;
22
23/// RowBinary formatter
24#[derive(Debug, Clone, Default)]
25pub struct RowBinFormatter {
26    /// Has column names
27    with_names: bool,
28    /// With column types
29    with_types: bool,
30}
31
32impl RowBinFormatter {
33    /// Creates a new [RowBinaryFormatter]
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Creates a new [RowBinaryFormatter] with names
39    pub fn with_names() -> Self {
40        Self {
41            with_names: true,
42            with_types: false,
43        }
44    }
45
46    /// Creates a new [RowBinaryFormatter] with names and types
47    pub fn with_names_and_types() -> Self {
48        Self {
49            with_names: true,
50            with_types: true,
51        }
52    }
53}
54
55impl Formatter for RowBinFormatter {
56    fn serialize_value(&self, value: Value) -> Vec<u8> {
57        self.format_value(value)
58    }
59
60    fn serialize_query_data(&self, data: QueryData) -> Result<Vec<u8>, Error> {
61        self.format_data(data)
62    }
63
64    fn deserialize_value(&self, bytes: &[u8], ty: Type) -> Result<Value, Error> {
65        let mut bytes = bytes;
66        let value = self.parse_value(&mut bytes, ty)?;
67        if !bytes.is_empty() {
68            return Err(Error::new("Value bytes has remaining bytes"));
69        }
70        Ok(value)
71    }
72
73    fn deserialize_query_data(
74        &self,
75        bytes: &[u8],
76        mapping: Option<&[(&str, Type)]>,
77    ) -> Result<QueryData, Error> {
78        let mut bytes = bytes;
79        self.parse_data(&mut bytes, mapping)
80    }
81}
82
83impl RowBinFormatter {
84    /// Formats a value
85    #[allow(clippy::only_used_in_recursion)]
86    fn format_value(&self, value: Value) -> Vec<u8> {
87        /// Implements the nullable variant for formatting
88        macro_rules! impl_nullable {
89            ($VAL:tt, $VAR:ident) => {
90                match $VAL {
91                    Some(v) => {
92                        let mut buf = vec![0x00];
93                        let mut bytes = self.format_value(Value::$VAR(v));
94                        buf.append(&mut bytes);
95                        buf
96                    }
97                    None => vec![0x01],
98                }
99            };
100        }
101
102        match value {
103            Value::UInt8(v) => v.to_le_bytes().to_vec(),
104            Value::UInt16(v) => v.to_le_bytes().to_vec(),
105            Value::UInt32(v) => v.to_le_bytes().to_vec(),
106            Value::UInt64(v) => v.to_le_bytes().to_vec(),
107            Value::UInt128(v) => v.to_le_bytes().to_vec(),
108            Value::UInt256(_) => {
109                let u256: U256 = value.try_into().unwrap();
110                u256.to_le_bytes().to_vec()
111            }
112            Value::Int8(v) => v.to_le_bytes().to_vec(),
113            Value::Int16(v) => v.to_le_bytes().to_vec(),
114            Value::Int32(v) => v.to_le_bytes().to_vec(),
115            Value::Int64(v) => v.to_le_bytes().to_vec(),
116            Value::Int128(v) => v.to_le_bytes().to_vec(),
117            Value::Int256(_) => {
118                let i256: I256 = value.try_into().unwrap();
119                i256.to_le_bytes().to_vec()
120            }
121            Value::Float32(v) => v.to_le_bytes().to_vec(),
122            Value::Float64(v) => v.to_le_bytes().to_vec(),
123            Value::Bool(v) => {
124                if v {
125                    vec![0x01]
126                } else {
127                    vec![0x00]
128                }
129            }
130            Value::String(v) => {
131                let mut buf = vec![];
132                leb128::write::unsigned(&mut buf, v.len() as u64).unwrap();
133                buf.write_all(v.as_bytes()).unwrap();
134                buf
135            }
136            Value::UUID(v) => {
137                // NB: in RowBinary, the UUID is represented as 2 u64 in little endian
138                let (w1, w2) = Uuid::from_bytes(v).as_u64_pair();
139                let mut buf = w1.to_le_bytes().to_vec();
140                buf.append(&mut w2.to_le_bytes().to_vec());
141                buf
142            }
143            Value::Date(v) => v.to_le_bytes().to_vec(),
144            Value::Date32(v) => v.to_le_bytes().to_vec(),
145            Value::DateTime(v) => v.to_le_bytes().to_vec(),
146            Value::DateTime64(v) => v.to_le_bytes().to_vec(),
147            Value::Enum8(v) => v.to_le_bytes().to_vec(),
148            Value::Enum16(v) => v.to_le_bytes().to_vec(),
149            Value::Array(v) => {
150                let mut buf = vec![];
151                leb128::write::unsigned(&mut buf, v.len() as u64).unwrap();
152                let values = v
153                    .into_iter()
154                    .flat_map(|value| self.format_value(value))
155                    .collect::<Vec<_>>();
156                buf.write_all(&values).unwrap();
157                buf
158            }
159            Value::Tuple(v) => v
160                .into_iter()
161                .flat_map(|value| self.format_value(value))
162                .collect::<Vec<_>>(),
163            Value::Map(map) => {
164                let mut buf = vec![];
165                leb128::write::unsigned(&mut buf, map.len() as u64).unwrap();
166                for (k, v) in map {
167                    let mut key_bytes = self.format_value(Value::String(k));
168                    let mut val_bytes = self.format_value(v);
169                    buf.append(&mut key_bytes);
170                    buf.append(&mut val_bytes);
171                }
172                buf
173            }
174            Value::Nested(fields) => {
175                let mut buf = vec![];
176                for (k, v) in fields {
177                    let mut key_bytes = self.format_value(Value::String(k));
178                    let mut val_bytes = self.format_value(v);
179                    buf.append(&mut key_bytes);
180                    buf.append(&mut val_bytes);
181                }
182                buf
183            }
184            Value::NullableUInt8(v) => impl_nullable!(v, UInt8),
185            Value::NullableUInt16(v) => impl_nullable!(v, UInt16),
186            Value::NullableUInt32(v) => impl_nullable!(v, UInt32),
187            Value::NullableUInt64(v) => impl_nullable!(v, UInt64),
188            Value::NullableUInt128(v) => impl_nullable!(v, UInt128),
189            Value::NullableUInt256(v) => impl_nullable!(v, UInt256),
190            Value::NullableInt8(v) => impl_nullable!(v, Int8),
191            Value::NullableInt16(v) => impl_nullable!(v, Int16),
192            Value::NullableInt32(v) => impl_nullable!(v, Int32),
193            Value::NullableInt64(v) => impl_nullable!(v, Int64),
194            Value::NullableInt128(v) => impl_nullable!(v, Int128),
195            Value::NullableInt256(v) => impl_nullable!(v, Int256),
196            Value::NullableFloat32(v) => impl_nullable!(v, Float32),
197            Value::NullableFloat64(v) => impl_nullable!(v, Float64),
198            Value::NullableBool(v) => impl_nullable!(v, Bool),
199            Value::NullableString(v) => impl_nullable!(v, String),
200            Value::NullableUUID(v) => impl_nullable!(v, UUID),
201            Value::NullableDate(v) => impl_nullable!(v, Date),
202            Value::NullableDate32(v) => impl_nullable!(v, Date32),
203            Value::NullableDateTime(v) => impl_nullable!(v, DateTime),
204            Value::NullableDateTime64(v) => impl_nullable!(v, DateTime64),
205            Value::NullableEnum8(v) => impl_nullable!(v, Enum8),
206            Value::NullableEnum16(v) => impl_nullable!(v, Enum16),
207        }
208    }
209
210    /// Formats a table
211    fn format_data(&self, data: QueryData) -> Result<Vec<u8>, Error> {
212        let mut buf = vec![];
213        let parts = data.into_parts();
214
215        // column names
216        if self.with_names {
217            if let Some(names) = parts.names {
218                leb128::write::unsigned(&mut buf, names.len().try_into()?).unwrap();
219                for name in names {
220                    let bytes = self.format_value(Value::String(name));
221                    buf.write_all(&bytes)?;
222                }
223            } else {
224                return Err(Error::new("Table is missing the column names"));
225            }
226        }
227
228        // column types
229        if self.with_types {
230            if let Some(types) = parts.types {
231                let types = types.into_iter().map(|t| t.to_string()).collect::<Vec<_>>();
232                leb128::write::unsigned(&mut buf, types.len().try_into()?).unwrap();
233                for ty in types {
234                    let bytes = self.format_value(Value::String(ty));
235                    buf.write_all(&bytes)?;
236                }
237            } else {
238                return Err(Error::new("Table is missing the column types"));
239            }
240        }
241
242        for row in parts.rows {
243            for value in row {
244                let bytes = self.format_value(value);
245                buf.write_all(&bytes)?;
246            }
247        }
248
249        Ok(buf)
250    }
251
252    /// Parses a value
253    fn parse_value(&self, bytes: &mut &[u8], ty: Type) -> Result<Value, Error> {
254        /// Implements the nullable variant for parsing
255        macro_rules! impl_nullable {
256            ($NULL_TY:tt, $TY:expr) => {{
257                let mut buf = [0x00_u8; 1];
258                bytes.read_exact(&mut buf)?;
259                match buf {
260                    [0x01] => Ok(Value::$NULL_TY(None)),
261                    [0x00] => match self.parse_value(bytes, $TY)?.into_nullable() {
262                        Some(v) => Ok(v),
263                        None => Err(Error::new("Invalid nullable value")),
264                    },
265                    _ => Err(Error::new("Invalid nullable value")),
266                }
267            }};
268        }
269
270        match ty {
271            Type::UInt8 => {
272                let mut buf = [0x00_u8; 1];
273                bytes.read_exact(&mut buf)?;
274                let v = u8::from_le_bytes(buf);
275                Ok(Value::UInt8(v))
276            }
277            Type::UInt16 => {
278                let mut buf = [0x00_u8; 2];
279                bytes.read_exact(&mut buf)?;
280                let v = u16::from_le_bytes(buf);
281                Ok(Value::UInt16(v))
282            }
283            Type::UInt32 => {
284                let mut buf = [0x00_u8; 4];
285                bytes.read_exact(&mut buf)?;
286                let v = u32::from_le_bytes(buf);
287                Ok(Value::UInt32(v))
288            }
289            Type::UInt64 => {
290                let mut buf = [0x00_u8; 8];
291                bytes.read_exact(&mut buf)?;
292                let v = u64::from_le_bytes(buf);
293                Ok(Value::UInt64(v))
294            }
295            Type::UInt128 => {
296                let mut buf = [0x00_u8; 16];
297                bytes.read_exact(&mut buf)?;
298                let v = u128::from_le_bytes(buf);
299                Ok(Value::UInt128(v))
300            }
301            Type::UInt256 => {
302                let mut buf = [0x00_u8; 32];
303                bytes.read_exact(&mut buf)?;
304                let v = U256::from_le_bytes(buf);
305                Ok(Value::UInt256(v.into_words().into()))
306            }
307            Type::Int8 => {
308                let mut buf = [0x00_u8; 1];
309                bytes.read_exact(&mut buf)?;
310                let v = i8::from_le_bytes(buf);
311                Ok(Value::Int8(v))
312            }
313            Type::Int16 => {
314                let mut buf = [0x00_u8; 2];
315                bytes.read_exact(&mut buf)?;
316                let v = i16::from_le_bytes(buf);
317                Ok(Value::Int16(v))
318            }
319            Type::Int32 => {
320                let mut buf = [0x00_u8; 4];
321                bytes.read_exact(&mut buf)?;
322                let v = i32::from_le_bytes(buf);
323                Ok(Value::Int32(v))
324            }
325            Type::Int64 => {
326                let mut buf = [0x00_u8; 8];
327                bytes.read_exact(&mut buf)?;
328                let v = i64::from_le_bytes(buf);
329                Ok(Value::Int64(v))
330            }
331            Type::Int128 => {
332                let mut buf = [0x00_u8; 16];
333                bytes.read_exact(&mut buf)?;
334                let v = i128::from_le_bytes(buf);
335                Ok(Value::Int128(v))
336            }
337            Type::Int256 => {
338                let mut buf = [0x00_u8; 32];
339                bytes.read_exact(&mut buf)?;
340                let v = I256::from_le_bytes(buf);
341                Ok(Value::Int256(v.into_words().into()))
342            }
343            Type::Float32 => {
344                let mut buf = [0x00_u8; 4];
345                bytes.read_exact(&mut buf)?;
346                let v = f32::from_le_bytes(buf);
347                Ok(Value::Float32(v))
348            }
349            Type::Float64 => {
350                let mut buf = [0x00_u8; 8];
351                bytes.read_exact(&mut buf)?;
352                let v = f64::from_le_bytes(buf);
353                Ok(Value::Float64(v))
354            }
355            Type::Decimal(_, _) => {
356                unimplemented!("RowBinary format Decimal")
357            }
358            Type::Decimal32(_) => {
359                unimplemented!("RowBinary format Decimal32")
360            }
361            Type::Decimal64(_) => {
362                unimplemented!("RowBinary format Decimal64")
363            }
364            Type::Decimal128(_) => {
365                unimplemented!("RowBinary format Decimal128")
366            }
367            Type::Decimal256(_) => {
368                unimplemented!("RowBinary format Decimal256")
369            }
370            Type::Bool => {
371                let mut buf = [0x00_u8; 1];
372                bytes.read_exact(&mut buf)?;
373                match buf {
374                    [0x00] => Ok(Value::Bool(false)),
375                    [0x01] => Ok(Value::Bool(true)),
376                    _ => Err(Error::new("Invalid bool value")),
377                }
378            }
379            Type::String => {
380                let n: usize = leb128::read::unsigned(bytes)?.try_into()?;
381                let mut buf = vec![0x00_u8; n];
382                bytes.read_exact(&mut buf)?;
383                let s = String::from_utf8(buf)?;
384                Ok(Value::String(s))
385            }
386            Type::FixedString(n) => {
387                let mut buf = vec![0x00_u8; n.into()];
388                bytes.read_exact(&mut buf)?;
389                let s = String::from_utf8(buf)?;
390                Ok(Value::String(s))
391            }
392            Type::UUID => {
393                // NB: in RowBinary, the UUID is represented as 2 u64 in little endian
394                let mut buf = [0x00_u8; 8];
395                bytes.read_exact(&mut buf)?;
396                let w1 = u64::from_le_bytes(buf);
397                bytes.read_exact(&mut buf)?;
398                let w2 = u64::from_le_bytes(buf);
399                let uuid = Uuid::from_u64_pair(w1, w2);
400                Ok(Value::UUID(uuid.into_bytes()))
401            }
402            Type::Date => {
403                let mut buf = [0x00_u8; 2];
404                bytes.read_exact(&mut buf)?;
405                let v = u16::from_le_bytes(buf);
406                Ok(Value::Date(v))
407            }
408            Type::Date32 => {
409                let mut buf = [0x00_u8; 4];
410                bytes.read_exact(&mut buf)?;
411                let v = i32::from_le_bytes(buf);
412                Ok(Value::Date32(v))
413            }
414            Type::DateTime => {
415                let mut buf = [0x00_u8; 4];
416                bytes.read_exact(&mut buf)?;
417                let v = u32::from_le_bytes(buf);
418                Ok(Value::DateTime(v))
419            }
420            Type::DateTime64(_) => {
421                let mut buf = [0x00_u8; 8];
422                bytes.read_exact(&mut buf)?;
423                let v = i64::from_le_bytes(buf);
424                Ok(Value::DateTime64(v))
425            }
426            Type::Enum8(_) => {
427                let mut buf = [0x00_u8; 1];
428                bytes.read_exact(&mut buf)?;
429                let v = i8::from_le_bytes(buf);
430                Ok(Value::Enum8(v))
431            }
432            Type::Enum16(_) => {
433                let mut buf = [0x00_u8; 2];
434                bytes.read_exact(&mut buf)?;
435                let v = i16::from_le_bytes(buf);
436                Ok(Value::Enum16(v))
437            }
438            Type::Array(ty) => {
439                let mut values = vec![];
440                let n = leb128::read::unsigned(bytes)?;
441                for _ in 0..n {
442                    let value = self.parse_value(bytes, (*ty).clone())?;
443                    values.push(value);
444                }
445                Ok(Value::Array(values))
446            }
447            Type::Tuple(types) => {
448                let mut values = vec![];
449                for ty in types {
450                    let value = self.parse_value(bytes, ty)?;
451                    values.push(value);
452                }
453                Ok(Value::Tuple(values))
454            }
455            Type::Map(_ty_key, ty_val) => {
456                let mut map = HashMap::new();
457                let n = leb128::read::unsigned(bytes)?;
458                for _ in 0..n {
459                    let key = self.parse_value_str(bytes)?;
460                    let value = self.parse_value(bytes, (*ty_val).clone())?;
461                    map.insert(key, value);
462                }
463                Ok(Value::Map(map))
464            }
465            Type::Nested(fields) => {
466                let mut map = HashMap::new();
467                for (_name, ty) in fields {
468                    let key = self.parse_value_str(bytes)?;
469                    let value = self.parse_value(bytes, ty)?;
470                    map.insert(key, value);
471                }
472                Ok(Value::Nested(map))
473            }
474            Type::NullableUInt8 => impl_nullable!(NullableUInt8, Type::UInt8),
475            Type::NullableUInt16 => impl_nullable!(NullableUInt16, Type::UInt16),
476            Type::NullableUInt32 => impl_nullable!(NullableUInt32, Type::UInt32),
477            Type::NullableUInt64 => impl_nullable!(NullableUInt64, Type::UInt64),
478            Type::NullableUInt128 => impl_nullable!(NullableUInt128, Type::UInt128),
479            Type::NullableUInt256 => impl_nullable!(NullableUInt256, Type::UInt256),
480            Type::NullableInt8 => impl_nullable!(NullableInt8, Type::Int8),
481            Type::NullableInt16 => impl_nullable!(NullableInt16, Type::Int16),
482            Type::NullableInt32 => impl_nullable!(NullableInt32, Type::Int32),
483            Type::NullableInt64 => impl_nullable!(NullableInt64, Type::Int64),
484            Type::NullableInt128 => impl_nullable!(NullableInt128, Type::Int128),
485            Type::NullableInt256 => impl_nullable!(NullableInt256, Type::Int256),
486            Type::NullableFloat32 => impl_nullable!(NullableFloat32, Type::Float32),
487            Type::NullableFloat64 => impl_nullable!(NullableFloat64, Type::Float64),
488            Type::NullableDecimal(_, _) => unimplemented!("RowBinary format Decimal"),
489            Type::NullableDecimal32(_) => unimplemented!("RowBinary format Decimal32"),
490            Type::NullableDecimal64(_) => unimplemented!("RowBinary format Decimal64"),
491            Type::NullableDecimal128(_) => unimplemented!("RowBinary format Decimal128"),
492            Type::NullableDecimal256(_) => unimplemented!("RowBinary format Decimal256"),
493            Type::NullableBool => impl_nullable!(NullableBool, Type::Bool),
494            Type::NullableString => impl_nullable!(NullableString, Type::String),
495            Type::NullableFixedString(n) => impl_nullable!(NullableString, Type::FixedString(n)),
496            Type::NullableUUID => impl_nullable!(NullableUUID, Type::UUID),
497            Type::NullableDate => impl_nullable!(NullableDate, Type::Date),
498            Type::NullableDate32 => impl_nullable!(NullableDate32, Type::Date32),
499            Type::NullableDateTime => impl_nullable!(NullableDateTime, Type::DateTime),
500            Type::NullableDateTime64(p) => impl_nullable!(NullableDateTime64, Type::DateTime64(p)),
501            Type::NullableEnum8(variants) => impl_nullable!(NullableEnum8, Type::Enum8(variants)),
502            Type::NullableEnum16(variants) => {
503                impl_nullable!(NullableEnum16, Type::Enum16(variants))
504            }
505        }
506    }
507
508    /// Parses a value as a string
509    fn parse_value_str(&self, bytes: &mut &[u8]) -> Result<String, Error> {
510        let n: usize = leb128::read::unsigned(bytes)?.try_into()?;
511        let mut buf = vec![0x00_u8; n];
512        bytes.read_exact(&mut buf)?;
513        Ok(String::from_utf8(buf)?)
514    }
515
516    fn parse_data(
517        &self,
518        bytes: &mut &[u8],
519        mapping: Option<&[(&str, Type)]>,
520    ) -> Result<QueryData, Error> {
521        // parse names + types from the buffer
522        let mut data = if self.with_names {
523            let n = leb128::read::unsigned(bytes).unwrap().try_into()?;
524            let mut names = vec![];
525            for _i in 0..n {
526                let name = self.parse_value_str(bytes)?;
527                names.push(name);
528            }
529
530            if self.with_types {
531                let mut types = vec![];
532                let mut names_and_types = vec![];
533                for i in 0..n {
534                    let ty_str = self.parse_value_str(bytes)?;
535                    let ty = Type::from_str(&ty_str)?;
536                    types.push(ty.clone());
537                    let name = names.get(i).ok_or(Error::new("Missing column name"))?;
538                    names_and_types.push((name.as_str(), ty));
539                }
540                QueryData::with_names_and_types(names_and_types)
541            } else {
542                let names = names.iter().map(String::as_str).collect();
543                QueryData::with_names(names)
544            }
545        } else {
546            QueryData::no_headers()
547        };
548
549        // parse rows from the buffer
550        let types = if let Some(types) = data.get_types() {
551            types
552        } else if let Some(mapping) = mapping {
553            mapping.iter().map(|(_, t)| t.clone()).collect()
554        } else {
555            return Err(Error::new("Deserializing data requires a mapping table"));
556        };
557
558        // parse rows
559        while !bytes.is_empty() {
560            // loop on each columns
561            let mut row = vec![];
562            for ty in &types {
563                let value = self.parse_value(bytes, ty.clone())?;
564                row.push(value);
565            }
566            data.add_row(row);
567        }
568
569        Ok(data)
570    }
571}