Skip to main content

clickhouse_driver/protocol/
block.rs

1use core::marker::PhantomData;
2use std::{
3    fmt,
4    io::{self, Write},
5    iter::Iterator,
6};
7
8use super::code::*;
9use super::column::{AsInColumn, ColumnDataAdapter, Deserialize, Row};
10use super::encoder::Encoder;
11use super::value::IntoColumn;
12use super::ServerWriter;
13use crate::client::ServerInfo;
14use crate::compression::LZ4CompressionWrapper;
15use crate::types::{Field, FIELD_NONE, FIELD_NULLABLE};
16use chrono_tz::Tz;
17
18pub struct RowIterator<'a> {
19    block: &'a ServerBlock,
20    id: u64,
21}
22
23impl<'a> Iterator for RowIterator<'a> {
24    type Item = Row<'a>;
25    fn next(&mut self) -> Option<Row<'a>> {
26        if self.id >= self.block.rows {
27            None
28        } else {
29            let id = self.id;
30            self.id += 1;
31            let row = unsafe { Row::create(self.block, id) };
32            Some(row)
33        }
34    }
35}
36
37pub struct ItemIterator<'a, D: Deserialize> {
38    block: &'a ServerBlock,
39    id: u64,
40    phantom: PhantomData<&'a D>,
41}
42
43impl<'a, D: Deserialize> Iterator for ItemIterator<'a, D> {
44    type Item = D;
45    fn next(&mut self) -> Option<D> {
46        if self.id >= self.block.rows {
47            None
48        } else {
49            let id = self.id;
50            self.id += 1;
51            let row = unsafe { Row::create(self.block, id) };
52            Some(<D as Deserialize>::deserialize(row).expect("unexpected deserialization error"))
53        }
54    }
55}
56
57/// Inserting data column struct
58/// Clickhouse serialization format
59/// ------column-------------
60///  FNM TNM  DATA
61/// |---|---|---------|
62/// FNM - field name, Clickhouse table column name
63/// TNM - type name, Clickhouse sql serialized field type (Int64, String, FixedString(10)...)
64/// DATA - serialized data array, has data specific format. Integer data (u)int8|16|32|64|128,
65///   float f32|f64, Decimal and other fixed length data
66///   (Date, DateTime, UUID, Enum8, Enum16...) are serialized as array of little-endian binary representation.
67///   String column is serialized as Variant String - VarInt string length + string byte array
68///   FixedString is serialized as array of string data (without length)
69///   Nullable data type precedes array of null flags represented as  array of u8 where 0-null, 1-notnull
70pub(crate) struct BlockInfo {
71    pub(super) cols: u64,
72    pub(super) rows: u64,
73    pub(super) overflow: bool,
74    pub(super) bucket: u32,
75}
76
77impl std::fmt::Debug for BlockInfo {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("BlockInfo")
80            .field("columns", &self.cols)
81            .field("rows", &self.rows)
82            .field("overflow", &self.overflow)
83            .field("bucket", &self.bucket)
84            .finish()
85    }
86}
87
88impl BlockInfo {
89    /// Clickhouse empty block is used as a marker of end of data transfer
90    pub(super) fn is_empty(&self) -> bool {
91        self.rows == 0 && self.cols == 0
92    }
93}
94/// Output data holder
95pub struct Block<'b> {
96    columns: Vec<ColumnDataAdapter<'b>>,
97    /// Clickhouse overflow flag. @note not used now
98    overflow: u8,
99    /// The number of rows in each column
100    rows: usize,
101    /// Clickhouse table name
102    pub(crate) table: &'b str,
103}
104
105impl<'b> Block<'b> {
106    pub fn new(table: &'b str) -> Block<'b> {
107        Block {
108            overflow: 0,
109            columns: Vec::new(),
110            rows: 0,
111            table,
112        }
113    }
114
115    /// Returns the number of columns
116    #[inline]
117    pub fn column_count(&self) -> usize {
118        self.columns.len()
119    }
120    /// Returns the number of rows
121    #[inline]
122    pub fn row_count(&self) -> usize {
123        self.rows
124    }
125
126    /// Returns whether the block has any columns
127    #[inline]
128    pub fn is_empty(&self) -> bool {
129        self.columns.is_empty()
130    }
131    /// Iterate over collection of columns.
132    /// Each column has it own data, so it's wrapped by adapter that provides common interface
133    /// for data encoding and type check
134    pub fn column_iter(&self) -> std::slice::Iter<ColumnDataAdapter> {
135        self.columns.iter()
136    }
137
138    /// Check if the added column has the same number of rows as the others
139    fn set_rows(&mut self, rows: usize) {
140        if !self.columns.is_empty() {
141            if self.rows != rows {
142                panic!("block columns must have the same length")
143            }
144        } else {
145            self.rows = rows;
146        };
147    }
148    /// Add new column to the block
149    /// NOTE! columns should be added in order of INSERT query
150    pub fn add<T: 'b>(mut self, name: &'b str, data: Vec<T>) -> Self
151    where
152        T: IntoColumn<'b>,
153    {
154        self.set_rows(data.len());
155
156        self.columns.push(ColumnDataAdapter {
157            name,
158            flag: FIELD_NONE,
159            data: IntoColumn::to_column(data),
160        });
161        self
162    }
163    /// Add new column to block.
164    /// In contrast to `add` method, here we add column for Nullable data types.
165    /// Option None value is used  as null data.
166    pub fn add_nullable<T: 'b>(mut self, name: &'b str, data: Vec<Option<T>>) -> Self
167    where
168        Option<T>: IntoColumn<'b>,
169        T: Default,
170    {
171        self.set_rows(data.len());
172
173        self.columns.push(ColumnDataAdapter {
174            name,
175            flag: FIELD_NULLABLE,
176            data: IntoColumn::to_column(data),
177        });
178        self
179    }
180}
181/// Block column name and type.
182///
183///
184/// BlockColumnHeader is a part of column set of ServerBlock and used to
185/// retrieve raw server data and convert it to rust data type for SELECT statements;
186/// or to validate output data for INSERT statements.
187/// To prevent superfluous server load by provided bunch of incorrect data
188/// driver should validate the Block against server table structure
189/// before it been sent to server.
190pub struct BlockColumnHeader {
191    pub(crate) field: Field,
192    pub(crate) name: String,
193}
194
195impl fmt::Debug for BlockColumnHeader {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("Pool")
198            .field("name", &self.name)
199            .field("field.type", &self.field.sql_type)
200            .finish()
201    }
202}
203
204/// input Block column data.
205pub struct BlockColumn {
206    pub(crate) header: BlockColumnHeader,
207    pub(crate) data: Box<dyn AsInColumn>,
208}
209
210impl BlockColumn {
211    pub(crate) fn into_header(self) -> BlockColumnHeader {
212        self.header
213    }
214}
215
216impl std::fmt::Debug for BlockColumn {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        f.debug_struct("BlockColumn")
219            .field("sql_type", &self.header.field.sql_type)
220            .field("name", &self.header.name)
221            .finish()
222    }
223}
224
225#[derive(Debug)]
226pub struct ServerBlock {
227    pub(crate) columns: Vec<BlockColumn>,
228    pub(crate) rows: u64,
229    pub(crate) timezone: Tz,
230}
231
232impl ServerBlock {
233    #[inline]
234    pub(crate) fn into_columns(self) -> Vec<BlockColumn> {
235        self.columns
236    }
237
238    #[inline]
239    pub(crate) fn into_headers(self) -> Vec<BlockColumnHeader> {
240        self.into_columns()
241            .into_iter()
242            .map(|c| c.into_header())
243            .collect()
244    }
245
246    pub fn iter_rows(&self) -> RowIterator {
247        RowIterator { block: self, id: 0 }
248    }
249    pub fn iter<D: Deserialize>(&self) -> ItemIterator<D> {
250        ItemIterator {
251            block: self,
252            id: 0,
253            phantom: PhantomData,
254        }
255    }
256
257    #[inline]
258    pub fn column_count(&self) -> u64 {
259        self.columns.len() as u64
260    }
261    #[inline]
262    pub fn row_count(&self) -> u64 {
263        self.rows
264    }
265}
266
267/// Provide specific implementation of ServerWriter trait for Empty and Data blocks
268pub(crate) trait AsBlock {
269    fn dump(&self, cx: &ServerInfo, writer: &mut dyn Write) -> std::io::Result<()>;
270}
271
272impl<B: AsBlock> ServerWriter for B {
273    /// Serialize Block according to  server capabilities and client settings.
274    /// Compress content by LZ4 if corresponding option is set
275    fn write(&self, cx: &ServerInfo, writer: &mut dyn Write) -> std::io::Result<()> {
276        CLIENT_DATA.encode(writer)?;
277
278        // Temporary table
279        if cx.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES {
280            ().encode(writer)?;
281        }
282
283        if !cx.compression.is_none() {
284            let mut compress = LZ4CompressionWrapper::new(writer);
285            self.dump(cx, &mut compress)?;
286            compress.flush()
287        } else {
288            self.dump(cx, writer)?;
289            writer.flush()
290        }
291    }
292}
293/// Empty block message is used as specific SERVER_DATA message
294/// and indicate the end of data stream
295/// In response to this message Clickhouse returns success of failure status
296pub struct EmptyBlock;
297
298/// Optimized byte sequence for empty block
299impl AsBlock for EmptyBlock {
300    /// Write block content to output stream
301    #[inline]
302    fn dump(&self, cx: &ServerInfo, writer: &mut dyn Write) -> std::io::Result<()> {
303        let revision = cx.revision;
304        if revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
305            // 1  [0]    - ?
306            // 0  [1]    - overflow
307            // 2  [2]    - ?
308            // -1 [3..7] - bucket num as int32
309            // 0  [8]    - ?
310
311            [1u8, 0, 2, 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0]
312                .as_ref()
313                .encode(writer)
314        } else {
315            // columns, rows
316            [0u8, 0u8].as_ref().encode(writer)
317        }
318    }
319}
320/// OutputBlockWrapper is adapter that combine provided by caller
321/// columns data (Block) and
322/// Clickhouse server table metadata (   BlockColumnHeader[] )
323pub(super) struct OutputBlockWrapper<'b> {
324    pub(super) inner: &'b Block<'b>,
325    pub(super) columns: &'b Vec<BlockColumnHeader>,
326}
327
328impl OutputBlockWrapper<'_> {
329    fn is_empty(&self) -> bool {
330        self.columns.is_empty()
331    }
332}
333
334impl<'b> AsBlock for OutputBlockWrapper<'b> {
335    /// Write block content to output stream
336    fn dump(&self, cx: &ServerInfo, writer: &mut dyn Write) -> io::Result<()> {
337        if self.is_empty() {
338            return EmptyBlock.dump(cx, writer);
339        }
340
341        let revision = cx.revision;
342        if revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
343            [1u8, self.inner.overflow, 2, 0xFF, 0xFF, 0xFF, 0xFF, 0]
344                .as_ref()
345                .encode(writer)?;
346        };
347
348        (self.columns.len() as u64).encode(writer)?;
349        (self.inner.rows as u64).encode(writer)?;
350
351        for (head, col) in self.columns.iter().zip(self.inner.columns.iter()) {
352            head.name.encode(writer)?;
353            head.field.encode(writer)?;
354            col.data.encode(&head.field, writer)?;
355        }
356
357        Ok(())
358    }
359}