clickhouse_driver/protocol/
block.rs1use core::marker::PhantomData;
2use std::{
3 fmt,
4 io::{self, Write},
5 iter::Iterator,
6};
7
8use super::code::*;
9use super::column::{AsInColumn, ColumnDataAdapter, Deserialize, Row};
10use super::encoder::Encoder;
11use super::value::IntoColumn;
12use super::ServerWriter;
13use crate::client::ServerInfo;
14use crate::compression::LZ4CompressionWrapper;
15use crate::types::{Field, FIELD_NONE, FIELD_NULLABLE};
16use chrono_tz::Tz;
17
18pub struct RowIterator<'a> {
19 block: &'a ServerBlock,
20 id: u64,
21}
22
23impl<'a> Iterator for RowIterator<'a> {
24 type Item = Row<'a>;
25 fn next(&mut self) -> Option<Row<'a>> {
26 if self.id >= self.block.rows {
27 None
28 } else {
29 let id = self.id;
30 self.id += 1;
31 let row = unsafe { Row::create(self.block, id) };
32 Some(row)
33 }
34 }
35}
36
37pub struct ItemIterator<'a, D: Deserialize> {
38 block: &'a ServerBlock,
39 id: u64,
40 phantom: PhantomData<&'a D>,
41}
42
43impl<'a, D: Deserialize> Iterator for ItemIterator<'a, D> {
44 type Item = D;
45 fn next(&mut self) -> Option<D> {
46 if self.id >= self.block.rows {
47 None
48 } else {
49 let id = self.id;
50 self.id += 1;
51 let row = unsafe { Row::create(self.block, id) };
52 Some(<D as Deserialize>::deserialize(row).expect("unexpected deserialization error"))
53 }
54 }
55}
56
57pub(crate) struct BlockInfo {
71 pub(super) cols: u64,
72 pub(super) rows: u64,
73 pub(super) overflow: bool,
74 pub(super) bucket: u32,
75}
76
77impl std::fmt::Debug for BlockInfo {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("BlockInfo")
80 .field("columns", &self.cols)
81 .field("rows", &self.rows)
82 .field("overflow", &self.overflow)
83 .field("bucket", &self.bucket)
84 .finish()
85 }
86}
87
88impl BlockInfo {
89 pub(super) fn is_empty(&self) -> bool {
91 self.rows == 0 && self.cols == 0
92 }
93}
94pub struct Block<'b> {
96 columns: Vec<ColumnDataAdapter<'b>>,
97 overflow: u8,
99 rows: usize,
101 pub(crate) table: &'b str,
103}
104
105impl<'b> Block<'b> {
106 pub fn new(table: &'b str) -> Block<'b> {
107 Block {
108 overflow: 0,
109 columns: Vec::new(),
110 rows: 0,
111 table,
112 }
113 }
114
115 #[inline]
117 pub fn column_count(&self) -> usize {
118 self.columns.len()
119 }
120 #[inline]
122 pub fn row_count(&self) -> usize {
123 self.rows
124 }
125
126 #[inline]
128 pub fn is_empty(&self) -> bool {
129 self.columns.is_empty()
130 }
131 pub fn column_iter(&self) -> std::slice::Iter<ColumnDataAdapter> {
135 self.columns.iter()
136 }
137
138 fn set_rows(&mut self, rows: usize) {
140 if !self.columns.is_empty() {
141 if self.rows != rows {
142 panic!("block columns must have the same length")
143 }
144 } else {
145 self.rows = rows;
146 };
147 }
148 pub fn add<T: 'b>(mut self, name: &'b str, data: Vec<T>) -> Self
151 where
152 T: IntoColumn<'b>,
153 {
154 self.set_rows(data.len());
155
156 self.columns.push(ColumnDataAdapter {
157 name,
158 flag: FIELD_NONE,
159 data: IntoColumn::to_column(data),
160 });
161 self
162 }
163 pub fn add_nullable<T: 'b>(mut self, name: &'b str, data: Vec<Option<T>>) -> Self
167 where
168 Option<T>: IntoColumn<'b>,
169 T: Default,
170 {
171 self.set_rows(data.len());
172
173 self.columns.push(ColumnDataAdapter {
174 name,
175 flag: FIELD_NULLABLE,
176 data: IntoColumn::to_column(data),
177 });
178 self
179 }
180}
181pub struct BlockColumnHeader {
191 pub(crate) field: Field,
192 pub(crate) name: String,
193}
194
195impl fmt::Debug for BlockColumnHeader {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("Pool")
198 .field("name", &self.name)
199 .field("field.type", &self.field.sql_type)
200 .finish()
201 }
202}
203
204pub struct BlockColumn {
206 pub(crate) header: BlockColumnHeader,
207 pub(crate) data: Box<dyn AsInColumn>,
208}
209
210impl BlockColumn {
211 pub(crate) fn into_header(self) -> BlockColumnHeader {
212 self.header
213 }
214}
215
216impl std::fmt::Debug for BlockColumn {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 f.debug_struct("BlockColumn")
219 .field("sql_type", &self.header.field.sql_type)
220 .field("name", &self.header.name)
221 .finish()
222 }
223}
224
225#[derive(Debug)]
226pub struct ServerBlock {
227 pub(crate) columns: Vec<BlockColumn>,
228 pub(crate) rows: u64,
229 pub(crate) timezone: Tz,
230}
231
232impl ServerBlock {
233 #[inline]
234 pub(crate) fn into_columns(self) -> Vec<BlockColumn> {
235 self.columns
236 }
237
238 #[inline]
239 pub(crate) fn into_headers(self) -> Vec<BlockColumnHeader> {
240 self.into_columns()
241 .into_iter()
242 .map(|c| c.into_header())
243 .collect()
244 }
245
246 pub fn iter_rows(&self) -> RowIterator {
247 RowIterator { block: self, id: 0 }
248 }
249 pub fn iter<D: Deserialize>(&self) -> ItemIterator<D> {
250 ItemIterator {
251 block: self,
252 id: 0,
253 phantom: PhantomData,
254 }
255 }
256
257 #[inline]
258 pub fn column_count(&self) -> u64 {
259 self.columns.len() as u64
260 }
261 #[inline]
262 pub fn row_count(&self) -> u64 {
263 self.rows
264 }
265}
266
267pub(crate) trait AsBlock {
269 fn dump(&self, cx: &ServerInfo, writer: &mut dyn Write) -> std::io::Result<()>;
270}
271
272impl<B: AsBlock> ServerWriter for B {
273 fn write(&self, cx: &ServerInfo, writer: &mut dyn Write) -> std::io::Result<()> {
276 CLIENT_DATA.encode(writer)?;
277
278 if cx.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES {
280 ().encode(writer)?;
281 }
282
283 if !cx.compression.is_none() {
284 let mut compress = LZ4CompressionWrapper::new(writer);
285 self.dump(cx, &mut compress)?;
286 compress.flush()
287 } else {
288 self.dump(cx, writer)?;
289 writer.flush()
290 }
291 }
292}
293pub struct EmptyBlock;
297
298impl AsBlock for EmptyBlock {
300 #[inline]
302 fn dump(&self, cx: &ServerInfo, writer: &mut dyn Write) -> std::io::Result<()> {
303 let revision = cx.revision;
304 if revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
305 [1u8, 0, 2, 0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0]
312 .as_ref()
313 .encode(writer)
314 } else {
315 [0u8, 0u8].as_ref().encode(writer)
317 }
318 }
319}
320pub(super) struct OutputBlockWrapper<'b> {
324 pub(super) inner: &'b Block<'b>,
325 pub(super) columns: &'b Vec<BlockColumnHeader>,
326}
327
328impl OutputBlockWrapper<'_> {
329 fn is_empty(&self) -> bool {
330 self.columns.is_empty()
331 }
332}
333
334impl<'b> AsBlock for OutputBlockWrapper<'b> {
335 fn dump(&self, cx: &ServerInfo, writer: &mut dyn Write) -> io::Result<()> {
337 if self.is_empty() {
338 return EmptyBlock.dump(cx, writer);
339 }
340
341 let revision = cx.revision;
342 if revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
343 [1u8, self.inner.overflow, 2, 0xFF, 0xFF, 0xFF, 0xFF, 0]
344 .as_ref()
345 .encode(writer)?;
346 };
347
348 (self.columns.len() as u64).encode(writer)?;
349 (self.inner.rows as u64).encode(writer)?;
350
351 for (head, col) in self.columns.iter().zip(self.inner.columns.iter()) {
352 head.name.encode(writer)?;
353 head.field.encode(writer)?;
354 col.data.encode(&head.field, writer)?;
355 }
356
357 Ok(())
358 }
359}