clickhouse_readonly/block/
mod.rs

1use std::{cmp, default::Default, fmt, io::Read, marker::PhantomData};
2
3use ethnum::I256;
4
5use crate::{
6    binary::{Encoder, ReadEx},
7    column::{self, ArcColumnWrapper, Column, ColumnFrom, ColumnType, Simple},
8    error::{Error, FromSqlError, Result},
9    protocol,
10    types::{FromSql, SqlType},
11    Complex,
12};
13
14use self::chunk_iterator::ChunkIterator;
15pub(crate) use self::row::BlockRef;
16pub use self::{
17    block_info::BlockInfo,
18    builder::{RCons, RNil, RowBuilder},
19    row::{Row, Rows},
20};
21
22mod block_info;
23mod builder;
24mod chunk_iterator;
25mod row;
26
27const INSERT_BLOCK_SIZE: usize = 1_048_576;
28const DEFAULT_CAPACITY: usize = 100;
29
30pub trait ColumnIdx {
31    fn get_index<K: ColumnType>(&self, columns: &[Column<K>]) -> Result<usize>;
32}
33
34pub trait Sliceable {
35    fn slice_type() -> SqlType;
36}
37
38macro_rules! sliceable {
39    ( $($t:ty: $k:ident),* ) => {
40        $(
41            impl Sliceable for $t {
42                fn slice_type() -> SqlType {
43                    SqlType::$k
44                }
45            }
46        )*
47    };
48}
49
50sliceable! {
51    u8: UInt8,
52    u16: UInt16,
53    u32: UInt32,
54    u64: UInt64,
55
56    i8: Int8,
57    i16: Int16,
58    i32: Int32,
59    i64: Int64,
60    I256: Int256
61}
62
63/// Represents Clickhouse Block
64#[derive(Default)]
65pub struct Block<K: ColumnType = Simple> {
66    info: BlockInfo,
67    columns: Vec<Column<K>>,
68    capacity: usize,
69}
70
71impl Block<Simple> {
72    pub(crate) fn concat(blocks: &[Self]) -> Block<Complex> {
73        let first = blocks.first().expect("blocks should not be empty.");
74
75        for block in blocks {
76            assert_eq!(
77                first.column_count(),
78                block.column_count(),
79                "all columns should have the same size."
80            );
81        }
82
83        let num_columns = first.column_count();
84        let mut columns = Vec::with_capacity(num_columns);
85        for i in 0_usize..num_columns {
86            let chunks = blocks.iter().map(|block| &block.columns[i]);
87            columns.push(Column::concat(chunks));
88        }
89
90        Block {
91            info: first.info,
92            columns,
93            capacity: blocks.iter().map(|b| b.capacity).sum(),
94        }
95    }
96}
97
98impl<L: ColumnType, R: ColumnType> PartialEq<Block<R>> for Block<L> {
99    fn eq(&self, other: &Block<R>) -> bool {
100        if self.columns.len() != other.columns.len() {
101            return false;
102        }
103
104        for i in 0..self.columns.len() {
105            if self.columns[i] != other.columns[i] {
106                return false;
107            }
108        }
109
110        true
111    }
112}
113
114impl<K: ColumnType> Clone for Block<K> {
115    fn clone(&self) -> Self {
116        Self {
117            info: self.info,
118            columns: self.columns.iter().map(|c| (*c).clone()).collect(),
119            capacity: self.capacity,
120        }
121    }
122}
123
124impl<K: ColumnType> AsRef<Block<K>> for Block<K> {
125    fn as_ref(&self) -> &Self {
126        self
127    }
128}
129
130impl ColumnIdx for usize {
131    #[inline(always)]
132    fn get_index<K: ColumnType>(&self, _: &[Column<K>]) -> Result<usize> {
133        Ok(*self)
134    }
135}
136
137impl<'a> ColumnIdx for &'a str {
138    fn get_index<K: ColumnType>(&self, columns: &[Column<K>]) -> Result<usize> {
139        match columns
140            .iter()
141            .enumerate()
142            .find(|(_, column)| column.name() == *self)
143        {
144            None => Err(Error::FromSql(FromSqlError::OutOfRange)),
145            Some((index, _)) => Ok(index),
146        }
147    }
148}
149
150impl ColumnIdx for String {
151    fn get_index<K: ColumnType>(&self, columns: &[Column<K>]) -> Result<usize> {
152        self.as_str().get_index(columns)
153    }
154}
155
156impl Block {
157    /// Constructs a new, empty `Block`.
158    pub fn new() -> Self {
159        Self::with_capacity(DEFAULT_CAPACITY)
160    }
161
162    /// Constructs a new, empty `Block` with the specified capacity.
163    pub fn with_capacity(capacity: usize) -> Self {
164        Self {
165            info: Default::default(),
166            columns: vec![],
167            capacity,
168        }
169    }
170
171    pub(crate) fn load<R>(reader: &mut R, tz: chrono_tz::Tz) -> Result<Self>
172    where
173        R: Read + ReadEx,
174    {
175        Self::raw_load(reader, tz)
176    }
177
178    fn raw_load<R>(reader: &mut R, tz: chrono_tz::Tz) -> Result<Block<Simple>>
179    where
180        R: ReadEx,
181    {
182        let mut block = Block::new();
183        block.info = BlockInfo::read(reader)?;
184
185        let num_columns = reader.read_uvarint()?;
186        let num_rows = reader.read_uvarint()?;
187
188        for _ in 0..num_columns {
189            let column = Column::read(reader, num_rows as usize, tz)?;
190            block.append_column(column);
191        }
192
193        Ok(block)
194    }
195}
196
197impl<K: ColumnType> Block<K> {
198    /// Return the number of rows in the current block.
199    pub fn row_count(&self) -> usize {
200        match self.columns.first() {
201            None => 0,
202            Some(column) => column.len(),
203        }
204    }
205
206    /// Return the number of columns in the current block.
207    pub fn column_count(&self) -> usize {
208        self.columns.len()
209    }
210
211    /// This method returns a slice of columns.
212    #[inline(always)]
213    pub fn columns(&self) -> &[Column<K>] {
214        &self.columns
215    }
216
217    fn append_column(&mut self, column: Column<K>) {
218        let column_len = column.len();
219
220        if !self.columns.is_empty() && self.row_count() != column_len {
221            panic!("all columns in block must have same size.")
222        }
223
224        self.columns.push(column);
225    }
226
227    /// Get the value of a particular cell of the block.
228    pub fn get<'a, T, I>(&'a self, row: usize, col: I) -> Result<T>
229    where
230        T: FromSql<'a>,
231        I: ColumnIdx + Copy,
232    {
233        let column_index = col.get_index(self.columns())?;
234        T::from_sql(self.columns[column_index].at(row))
235    }
236
237    /// Add new column into this block
238    pub fn add_column<S>(self, name: &str, values: S) -> Self
239    where
240        S: ColumnFrom,
241    {
242        self.column(name, values)
243    }
244
245    /// Add new column into this block
246    pub fn column<S>(mut self, name: &str, values: S) -> Self
247    where
248        S: ColumnFrom,
249    {
250        let data = S::column_from::<ArcColumnWrapper>(values);
251        let column = column::new_column(name, data);
252
253        self.append_column(column);
254        self
255    }
256
257    /// Returns true if the block contains no elements.
258    pub fn is_empty(&self) -> bool {
259        self.columns.is_empty()
260    }
261
262    /// This method returns a iterator of rows.
263    pub fn rows(&self) -> Rows<K> {
264        Rows {
265            row: 0,
266            block_ref: BlockRef::Borrowed(self),
267            kind: PhantomData,
268        }
269    }
270
271    /// This method is a convenient way to pass row into a block.
272    pub fn push<B: RowBuilder>(&mut self, row: B) -> Result<()> {
273        row.apply(self)
274    }
275
276    /// This method finds a column by identifier.
277    pub fn get_column<I>(&self, col: I) -> Result<&Column<K>>
278    where
279        I: ColumnIdx + Copy,
280    {
281        let column_index = col.get_index(self.columns())?;
282        let column = &self.columns[column_index];
283        Ok(column)
284    }
285}
286
287impl<K: ColumnType> Block<K> {
288    pub(crate) fn write(&self, encoder: &mut Encoder) {
289        self.info.write(encoder);
290        encoder.uvarint(self.column_count() as u64);
291        encoder.uvarint(self.row_count() as u64);
292
293        for column in &self.columns {
294            column.write(encoder);
295        }
296    }
297
298    pub(crate) fn send_data(&self, encoder: &mut Encoder) {
299        encoder.uvarint(protocol::CLIENT_DATA);
300        encoder.string(""); // temporary table
301        for chunk in self.chunks(INSERT_BLOCK_SIZE) {
302            chunk.write(encoder);
303        }
304    }
305
306    pub(crate) fn chunks(&self, n: usize) -> ChunkIterator<K> {
307        ChunkIterator::new(n, self)
308    }
309}
310
311impl<K: ColumnType> fmt::Debug for Block<K> {
312    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
313        let titles: Vec<&str> = self.columns.iter().map(|column| column.name()).collect();
314
315        let cells: Vec<_> = self.columns.iter().map(|col| text_cells(col)).collect();
316
317        let titles_len: Vec<_> = titles
318            .iter()
319            .map(|t| t.chars().count())
320            .zip(cells.iter().map(|w| column_width(w)))
321            .map(|(a, b)| cmp::max(a, b))
322            .collect();
323
324        print_line(f, &titles_len, "\n\u{250c}", '┬', "\u{2510}\n")?;
325
326        for (i, title) in titles.iter().enumerate() {
327            write!(f, "\u{2502}{:>width$} ", title, width = titles_len[i] + 1)?;
328        }
329        write!(f, "\u{2502}")?;
330
331        if self.row_count() > 0 {
332            print_line(f, &titles_len, "\n\u{251c}", '┼', "\u{2524}\n")?;
333        }
334
335        for j in 0..self.row_count() {
336            for (i, col) in cells.iter().enumerate() {
337                write!(f, "\u{2502}{:>width$} ", col[j], width = titles_len[i] + 1)?;
338            }
339
340            let new_line = (j + 1) != self.row_count();
341            write!(f, "\u{2502}{}", if new_line { "\n" } else { "" })?;
342        }
343
344        print_line(f, &titles_len, "\n\u{2514}", '┴', "\u{2518}")
345    }
346}
347
348fn column_width(column: &[String]) -> usize {
349    column.iter().map(|cell| cell.len()).max().unwrap_or(0)
350}
351
352fn print_line(
353    f: &mut fmt::Formatter,
354    lens: &[usize],
355    left: &str,
356    center: char,
357    right: &str,
358) -> fmt::Result {
359    write!(f, "{}", left)?;
360    for (i, len) in lens.iter().enumerate() {
361        if i != 0 {
362            write!(f, "{}", center)?;
363        }
364
365        write!(f, "{:\u{2500}>width$}", "", width = len + 2)?;
366    }
367    write!(f, "{}", right)
368}
369
370fn text_cells<K: ColumnType>(data: &Column<K>) -> Vec<String> {
371    (0..data.len()).map(|i| format!("{}", data.at(i))).collect()
372}