clickhouse_readonly/block/
mod.rs1use std::{cmp, default::Default, fmt, io::Read, marker::PhantomData};
2
3use ethnum::I256;
4
5use crate::{
6 binary::{Encoder, ReadEx},
7 column::{self, ArcColumnWrapper, Column, ColumnFrom, ColumnType, Simple},
8 error::{Error, FromSqlError, Result},
9 protocol,
10 types::{FromSql, SqlType},
11 Complex,
12};
13
14use self::chunk_iterator::ChunkIterator;
15pub(crate) use self::row::BlockRef;
16pub use self::{
17 block_info::BlockInfo,
18 builder::{RCons, RNil, RowBuilder},
19 row::{Row, Rows},
20};
21
22mod block_info;
23mod builder;
24mod chunk_iterator;
25mod row;
26
27const INSERT_BLOCK_SIZE: usize = 1_048_576;
28const DEFAULT_CAPACITY: usize = 100;
29
30pub trait ColumnIdx {
31 fn get_index<K: ColumnType>(&self, columns: &[Column<K>]) -> Result<usize>;
32}
33
34pub trait Sliceable {
35 fn slice_type() -> SqlType;
36}
37
38macro_rules! sliceable {
39 ( $($t:ty: $k:ident),* ) => {
40 $(
41 impl Sliceable for $t {
42 fn slice_type() -> SqlType {
43 SqlType::$k
44 }
45 }
46 )*
47 };
48}
49
50sliceable! {
51 u8: UInt8,
52 u16: UInt16,
53 u32: UInt32,
54 u64: UInt64,
55
56 i8: Int8,
57 i16: Int16,
58 i32: Int32,
59 i64: Int64,
60 I256: Int256
61}
62
63#[derive(Default)]
65pub struct Block<K: ColumnType = Simple> {
66 info: BlockInfo,
67 columns: Vec<Column<K>>,
68 capacity: usize,
69}
70
71impl Block<Simple> {
72 pub(crate) fn concat(blocks: &[Self]) -> Block<Complex> {
73 let first = blocks.first().expect("blocks should not be empty.");
74
75 for block in blocks {
76 assert_eq!(
77 first.column_count(),
78 block.column_count(),
79 "all columns should have the same size."
80 );
81 }
82
83 let num_columns = first.column_count();
84 let mut columns = Vec::with_capacity(num_columns);
85 for i in 0_usize..num_columns {
86 let chunks = blocks.iter().map(|block| &block.columns[i]);
87 columns.push(Column::concat(chunks));
88 }
89
90 Block {
91 info: first.info,
92 columns,
93 capacity: blocks.iter().map(|b| b.capacity).sum(),
94 }
95 }
96}
97
98impl<L: ColumnType, R: ColumnType> PartialEq<Block<R>> for Block<L> {
99 fn eq(&self, other: &Block<R>) -> bool {
100 if self.columns.len() != other.columns.len() {
101 return false;
102 }
103
104 for i in 0..self.columns.len() {
105 if self.columns[i] != other.columns[i] {
106 return false;
107 }
108 }
109
110 true
111 }
112}
113
114impl<K: ColumnType> Clone for Block<K> {
115 fn clone(&self) -> Self {
116 Self {
117 info: self.info,
118 columns: self.columns.iter().map(|c| (*c).clone()).collect(),
119 capacity: self.capacity,
120 }
121 }
122}
123
124impl<K: ColumnType> AsRef<Block<K>> for Block<K> {
125 fn as_ref(&self) -> &Self {
126 self
127 }
128}
129
130impl ColumnIdx for usize {
131 #[inline(always)]
132 fn get_index<K: ColumnType>(&self, _: &[Column<K>]) -> Result<usize> {
133 Ok(*self)
134 }
135}
136
137impl<'a> ColumnIdx for &'a str {
138 fn get_index<K: ColumnType>(&self, columns: &[Column<K>]) -> Result<usize> {
139 match columns
140 .iter()
141 .enumerate()
142 .find(|(_, column)| column.name() == *self)
143 {
144 None => Err(Error::FromSql(FromSqlError::OutOfRange)),
145 Some((index, _)) => Ok(index),
146 }
147 }
148}
149
150impl ColumnIdx for String {
151 fn get_index<K: ColumnType>(&self, columns: &[Column<K>]) -> Result<usize> {
152 self.as_str().get_index(columns)
153 }
154}
155
156impl Block {
157 pub fn new() -> Self {
159 Self::with_capacity(DEFAULT_CAPACITY)
160 }
161
162 pub fn with_capacity(capacity: usize) -> Self {
164 Self {
165 info: Default::default(),
166 columns: vec![],
167 capacity,
168 }
169 }
170
171 pub(crate) fn load<R>(reader: &mut R, tz: chrono_tz::Tz) -> Result<Self>
172 where
173 R: Read + ReadEx,
174 {
175 Self::raw_load(reader, tz)
176 }
177
178 fn raw_load<R>(reader: &mut R, tz: chrono_tz::Tz) -> Result<Block<Simple>>
179 where
180 R: ReadEx,
181 {
182 let mut block = Block::new();
183 block.info = BlockInfo::read(reader)?;
184
185 let num_columns = reader.read_uvarint()?;
186 let num_rows = reader.read_uvarint()?;
187
188 for _ in 0..num_columns {
189 let column = Column::read(reader, num_rows as usize, tz)?;
190 block.append_column(column);
191 }
192
193 Ok(block)
194 }
195}
196
197impl<K: ColumnType> Block<K> {
198 pub fn row_count(&self) -> usize {
200 match self.columns.first() {
201 None => 0,
202 Some(column) => column.len(),
203 }
204 }
205
206 pub fn column_count(&self) -> usize {
208 self.columns.len()
209 }
210
211 #[inline(always)]
213 pub fn columns(&self) -> &[Column<K>] {
214 &self.columns
215 }
216
217 fn append_column(&mut self, column: Column<K>) {
218 let column_len = column.len();
219
220 if !self.columns.is_empty() && self.row_count() != column_len {
221 panic!("all columns in block must have same size.")
222 }
223
224 self.columns.push(column);
225 }
226
227 pub fn get<'a, T, I>(&'a self, row: usize, col: I) -> Result<T>
229 where
230 T: FromSql<'a>,
231 I: ColumnIdx + Copy,
232 {
233 let column_index = col.get_index(self.columns())?;
234 T::from_sql(self.columns[column_index].at(row))
235 }
236
237 pub fn add_column<S>(self, name: &str, values: S) -> Self
239 where
240 S: ColumnFrom,
241 {
242 self.column(name, values)
243 }
244
245 pub fn column<S>(mut self, name: &str, values: S) -> Self
247 where
248 S: ColumnFrom,
249 {
250 let data = S::column_from::<ArcColumnWrapper>(values);
251 let column = column::new_column(name, data);
252
253 self.append_column(column);
254 self
255 }
256
257 pub fn is_empty(&self) -> bool {
259 self.columns.is_empty()
260 }
261
262 pub fn rows(&self) -> Rows<K> {
264 Rows {
265 row: 0,
266 block_ref: BlockRef::Borrowed(self),
267 kind: PhantomData,
268 }
269 }
270
271 pub fn push<B: RowBuilder>(&mut self, row: B) -> Result<()> {
273 row.apply(self)
274 }
275
276 pub fn get_column<I>(&self, col: I) -> Result<&Column<K>>
278 where
279 I: ColumnIdx + Copy,
280 {
281 let column_index = col.get_index(self.columns())?;
282 let column = &self.columns[column_index];
283 Ok(column)
284 }
285}
286
287impl<K: ColumnType> Block<K> {
288 pub(crate) fn write(&self, encoder: &mut Encoder) {
289 self.info.write(encoder);
290 encoder.uvarint(self.column_count() as u64);
291 encoder.uvarint(self.row_count() as u64);
292
293 for column in &self.columns {
294 column.write(encoder);
295 }
296 }
297
298 pub(crate) fn send_data(&self, encoder: &mut Encoder) {
299 encoder.uvarint(protocol::CLIENT_DATA);
300 encoder.string(""); for chunk in self.chunks(INSERT_BLOCK_SIZE) {
302 chunk.write(encoder);
303 }
304 }
305
306 pub(crate) fn chunks(&self, n: usize) -> ChunkIterator<K> {
307 ChunkIterator::new(n, self)
308 }
309}
310
311impl<K: ColumnType> fmt::Debug for Block<K> {
312 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
313 let titles: Vec<&str> = self.columns.iter().map(|column| column.name()).collect();
314
315 let cells: Vec<_> = self.columns.iter().map(|col| text_cells(col)).collect();
316
317 let titles_len: Vec<_> = titles
318 .iter()
319 .map(|t| t.chars().count())
320 .zip(cells.iter().map(|w| column_width(w)))
321 .map(|(a, b)| cmp::max(a, b))
322 .collect();
323
324 print_line(f, &titles_len, "\n\u{250c}", '┬', "\u{2510}\n")?;
325
326 for (i, title) in titles.iter().enumerate() {
327 write!(f, "\u{2502}{:>width$} ", title, width = titles_len[i] + 1)?;
328 }
329 write!(f, "\u{2502}")?;
330
331 if self.row_count() > 0 {
332 print_line(f, &titles_len, "\n\u{251c}", '┼', "\u{2524}\n")?;
333 }
334
335 for j in 0..self.row_count() {
336 for (i, col) in cells.iter().enumerate() {
337 write!(f, "\u{2502}{:>width$} ", col[j], width = titles_len[i] + 1)?;
338 }
339
340 let new_line = (j + 1) != self.row_count();
341 write!(f, "\u{2502}{}", if new_line { "\n" } else { "" })?;
342 }
343
344 print_line(f, &titles_len, "\n\u{2514}", '┴', "\u{2518}")
345 }
346}
347
348fn column_width(column: &[String]) -> usize {
349 column.iter().map(|cell| cell.len()).max().unwrap_or(0)
350}
351
352fn print_line(
353 f: &mut fmt::Formatter,
354 lens: &[usize],
355 left: &str,
356 center: char,
357 right: &str,
358) -> fmt::Result {
359 write!(f, "{}", left)?;
360 for (i, len) in lens.iter().enumerate() {
361 if i != 0 {
362 write!(f, "{}", center)?;
363 }
364
365 write!(f, "{:\u{2500}>width$}", "", width = len + 2)?;
366 }
367 write!(f, "{}", right)
368}
369
370fn text_cells<K: ColumnType>(data: &Column<K>) -> Vec<String> {
371 (0..data.len()).map(|i| format!("{}", data.at(i))).collect()
372}