clickhouse_arrow/native/
block_info.rs

1use tokio::io::{AsyncReadExt, AsyncWriteExt};
2
3use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
4use crate::{Error, Result};
5
6/// Metadata about a block
7#[derive(Debug, Clone, Copy)]
8pub struct BlockInfo {
9    pub is_overflows: bool,
10    pub bucket_num:   i32,
11}
12
13impl Default for BlockInfo {
14    fn default() -> Self { BlockInfo { is_overflows: false, bucket_num: -1 } }
15}
16
17impl BlockInfo {
18    pub(crate) async fn read_async<R: ClickHouseRead>(reader: &mut R) -> Result<Self> {
19        let mut new = Self::default();
20        loop {
21            let field_num = reader.read_var_uint().await?;
22            match field_num {
23                0 => break,
24                1 => {
25                    new.is_overflows = reader.read_u8().await? != 0;
26                }
27                2 => {
28                    new.bucket_num = reader.read_i32_le().await?;
29                }
30                field_num => {
31                    return Err(Error::Protocol(format!(
32                        "unknown block info field number: {field_num}"
33                    )));
34                }
35            }
36        }
37        Ok(new)
38    }
39
40    pub(crate) async fn write_async<W: ClickHouseWrite>(&self, writer: &mut W) -> Result<()> {
41        writer.write_var_uint(1).await?; // Block info version
42        writer.write_u8(if self.is_overflows { 1 } else { 2 }).await?; // Is overflows
43        writer.write_var_uint(2).await?; // Bucket num
44        writer.write_i32_le(self.bucket_num).await?; // Bucket num
45        writer.write_var_uint(0).await?; // End field
46        Ok(())
47    }
48
49    #[allow(dead_code)] // TODO: remove once synchronous block path is fully retired
50    pub(crate) fn read<R: ClickHouseBytesRead>(reader: &mut R) -> Result<Self> {
51        let mut new = Self::default();
52        loop {
53            let field_num = reader.try_get_var_uint()?;
54            match field_num {
55                0 => break,
56                1 => {
57                    new.is_overflows = reader.try_get_u8()? != 0;
58                }
59                2 => {
60                    new.bucket_num = reader.try_get_i32_le()?;
61                }
62                field_num => {
63                    return Err(Error::Protocol(format!(
64                        "unknown block info field number: {field_num}"
65                    )));
66                }
67            }
68        }
69        Ok(new)
70    }
71
72    pub(crate) fn write<W: ClickHouseBytesWrite>(self, writer: &mut W) -> Result<()> {
73        writer.put_var_uint(1)?; // Block info version
74        writer.put_u8(if self.is_overflows { 1 } else { 2 }); // Is overflows
75        writer.put_var_uint(2)?; // Bucket num
76        writer.put_i32_le(self.bucket_num); // Bucket num
77        writer.put_var_uint(0)?; // End field
78        Ok(())
79    }
80}