clickhouse_arrow/native/
block.rs

1use std::str::FromStr;
2
3use indexmap::IndexMap;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use super::block_info::BlockInfo;
7use super::protocol::DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION;
8use crate::deserialize::ClickHouseNativeDeserializer;
9use crate::formats::protocol_data::ProtocolData;
10use crate::formats::{DeserializerState, SerializerState};
11use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
12use crate::native::values::Value;
13use crate::prelude::*;
14use crate::serialize::ClickHouseNativeSerializer;
15use crate::{Error, Result, Row, Type};
16
17#[derive(Debug, Clone, Default)]
18/// A chunk of data in columnar form.
19pub struct Block {
20    /// Metadata about the block
21    pub info:         BlockInfo,
22    /// The number of rows contained in the block
23    pub rows:         u64,
24    /// The type of each column by name, in order.
25    pub column_types: Vec<(String, Type)>,
26    /// The data of each column by name, in order. All `Value` should correspond to the associated
27    /// type in `column_types`.
28    pub column_data:  Vec<Value>,
29}
30
31// Iterator type for `take_iter_rows`
32pub struct BlockRowValueIter<'a, I>
33where
34    I: Iterator<Item = Value>,
35{
36    column_data: Vec<(&'a str, &'a Type, I)>,
37}
38
39impl<'a, I> Iterator for BlockRowValueIter<'a, I>
40where
41    I: Iterator<Item = Value>,
42{
43    type Item = Vec<(&'a str, &'a Type, Value)>;
44
45    fn next(&mut self) -> Option<Self::Item> {
46        if self.column_data.is_empty() {
47            return None;
48        }
49        let mut out = Vec::new();
50        for (name, type_, pop) in &mut self.column_data {
51            out.push((*name, *type_, pop.next()?));
52        }
53        Some(out)
54    }
55}
56
57impl Block {
58    /// Iterate over all rows with owned values.
59    pub fn take_iter_rows(&mut self) -> BlockRowValueIter<'_, impl Iterator<Item = Value>> {
60        #[allow(clippy::cast_possible_truncation)]
61        let rows = self.rows as usize;
62        let mut column_data = std::mem::take(&mut self.column_data);
63        let mut out = Vec::with_capacity(rows);
64        for (name, type_) in &self.column_types {
65            let mut column = Vec::with_capacity(rows);
66            let column_slice = column_data.drain(..rows);
67            column.extend(column_slice);
68            out.push((&**name, type_.strip_low_cardinality(), column.into_iter()));
69        }
70        BlockRowValueIter { column_data: out }
71    }
72
73    /// Estimate the serialized size of this block for buffer allocation
74    pub fn estimate_size(&self) -> usize {
75        let mut size = 16; // BlockInfo + columns count + rows count
76
77        #[allow(clippy::cast_possible_truncation)]
78        let rows = self.rows as usize;
79
80        for (name, type_) in &self.column_types {
81            // Column name + type string
82            size += name.len() + type_.to_string().len() + 10; // +10 for length prefixes and overhead
83
84            // Estimate data size
85            size += rows * type_.estimate_capacity();
86        }
87
88        // Add 20% buffer for overhead
89        size * 6 / 5
90    }
91
92    /// Create a block from a vector of rows and a schema.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the number of rows does not match the number of columns, serializing
97    /// fails, or the field cannot be found in the schema.
98    pub fn from_rows<T: Row>(rows: Vec<T>, schema: Vec<(String, Type)>) -> Result<Self> {
99        let row_len = rows.len();
100        let row_col_len = schema.len() * rows.len();
101
102        let mut columns = schema
103            .iter()
104            .map(|(name, _)| (name.clone(), Vec::with_capacity(rows.len())))
105            .collect::<IndexMap<String, Vec<_>>>();
106
107        rows.into_iter()
108            .enumerate()
109            .map(|(i, x)| {
110                x.serialize_row(&schema)
111                    .inspect_err(|error| error!(?error, "serialize error during insert (ROW {i})"))
112                    .map(|r| (i, r))
113            })
114            .try_for_each(|result| -> Result<()> {
115                let (i, x) = result?;
116                for (key, value) in x {
117                    let type_ = &schema
118                        .iter()
119                        .find(|(n, _)| n == &*key)
120                        .ok_or_else(|| {
121                            Error::Protocol(format!(
122                                "missing type for data in row {i}, column: {key}"
123                            ))
124                        })?
125                        .1;
126                    type_.validate_value(&value).inspect_err(|error| {
127                        tracing::error!(
128                            ?error,
129                            ?value,
130                            ?key,
131                            ?type_,
132                            "Value validation failed for row {i}"
133                        );
134                    })?;
135                    let column = columns.get_mut(key.as_ref()).ok_or(Error::Protocol(format!(
136                        "missing column for data in row {i}, column: {key}"
137                    )))?;
138                    column.push(value);
139                }
140                Ok(())
141            })?;
142
143        let mut column_data = Vec::with_capacity(row_col_len);
144
145        // Move the values into a flattened vector
146        for (_, mut values) in columns.drain(..) {
147            column_data.append(&mut values);
148        }
149
150        Ok(Block {
151            info: BlockInfo::default(),
152            rows: row_len as u64,
153            column_types: schema,
154            column_data,
155        })
156    }
157}
158
159impl ProtocolData<Self, ()> for Block {
160    type Options = ();
161
162    async fn write_async<W: ClickHouseWrite>(
163        mut self,
164        writer: &mut W,
165        revision: u64,
166        _header: Option<&[(String, Type)]>,
167        _options: (),
168    ) -> Result<()> {
169        if revision > 0 {
170            self.info.write_async(writer).await?;
171        }
172
173        let columns = self.column_types.len();
174
175        #[allow(clippy::cast_possible_truncation)]
176        let rows = self.rows as usize;
177
178        writer.write_var_uint(columns as u64).await?;
179        writer.write_var_uint(self.rows).await?;
180
181        for (name, type_) in self.column_types {
182            let mut values = Vec::with_capacity(rows);
183            values.extend(self.column_data.drain(..rows));
184
185            if values.len() != rows {
186                return Err(Error::Protocol(format!(
187                    "row and column length mismatch. {} != {}",
188                    values.len(),
189                    rows
190                )));
191            }
192
193            // EncodeStart
194            writer.write_string(&name).await?;
195            writer.write_string(type_.to_string()).await?;
196
197            if self.rows > 0 {
198                if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
199                    writer.write_u8(0).await?;
200                }
201
202                let mut state = SerializerState::default();
203                type_.serialize_prefix_async(writer, &mut state).await?;
204                type_.serialize_column(values, writer, &mut state).await?;
205            }
206        }
207        Ok(())
208    }
209
210    fn write<W: ClickHouseBytesWrite>(
211        mut self,
212        writer: &mut W,
213        revision: u64,
214        _header: Option<&[(String, Type)]>,
215        _options: (),
216    ) -> Result<()> {
217        if revision > 0 {
218            self.info.write(writer)?;
219        }
220
221        let columns = self.column_types.len();
222
223        #[allow(clippy::cast_possible_truncation)]
224        let rows = self.rows as usize;
225
226        writer.put_var_uint(columns as u64)?;
227        writer.put_var_uint(self.rows)?;
228
229        for (name, type_) in self.column_types {
230            let mut values = Vec::with_capacity(rows);
231            values.extend(self.column_data.drain(..rows));
232
233            if values.len() != rows {
234                return Err(Error::Protocol(format!(
235                    "row and column length mismatch. {} != {}",
236                    values.len(),
237                    rows
238                )));
239            }
240
241            // EncodeStart
242            writer.put_string(&name)?;
243            writer.put_string(type_.to_string())?;
244
245            if self.rows > 0 {
246                if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
247                    writer.put_u8(0);
248                }
249
250                let mut state = SerializerState::default();
251                type_.serialize_prefix(writer, &mut state);
252                type_.serialize_column_sync(values, writer, &mut state)?;
253            }
254        }
255        Ok(())
256    }
257
258    async fn read_async<R: ClickHouseRead>(
259        reader: &mut R,
260        revision: u64,
261        _options: (),
262        state: &mut DeserializerState,
263    ) -> Result<Self> {
264        let info =
265            if revision > 0 { BlockInfo::read_async(reader).await? } else { BlockInfo::default() };
266
267        #[allow(clippy::cast_possible_truncation)]
268        let columns = reader.read_var_uint().await? as usize;
269        let rows = reader.read_var_uint().await?;
270
271        let mut block = Block {
272            info,
273            rows,
274            column_types: Vec::with_capacity(columns),
275            column_data: Vec::with_capacity(columns),
276        };
277
278        for i in 0..columns {
279            let name = reader
280                .read_utf8_string()
281                .await
282                .inspect_err(|e| error!("reading column name (index {i}): {e}"))?;
283
284            let type_name = reader
285                .read_utf8_string()
286                .await
287                .inspect_err(|e| error!("reading column type (name {name}): {e}"))?;
288
289            // TODO: implement
290            let mut _has_custom_serialization = false;
291            if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
292                _has_custom_serialization = reader.read_u8().await? != 0;
293            }
294
295            let type_ = Type::from_str(&type_name).inspect_err(|error| {
296                error!(?error, "Type deserialize failed: name={name}, type={type_name}");
297            })?;
298
299            let mut row_data = if rows > 0 {
300                type_.deserialize_prefix_async(reader, state).await?;
301
302                #[allow(clippy::cast_possible_truncation)]
303                type_
304                    .deserialize_column(reader, rows as usize, state)
305                    .await
306                    .inspect_err(|e| error!("deserialize (name {name}): {e}"))?
307            } else {
308                vec![]
309            };
310
311            block.column_types.push((name, type_));
312            block.column_data.append(&mut row_data);
313        }
314
315        Ok(block)
316    }
317
318    fn read<R: ClickHouseBytesRead + 'static>(
319        reader: &mut R,
320        revision: u64,
321        _options: (),
322        state: &mut DeserializerState,
323    ) -> Result<Self> {
324        let info = if revision > 0 { BlockInfo::read(reader)? } else { BlockInfo::default() };
325
326        #[allow(clippy::cast_possible_truncation)]
327        let columns = reader.try_get_var_uint()? as usize;
328        let rows = reader.try_get_var_uint()?;
329
330        let mut block = Block {
331            info,
332            rows,
333            column_types: Vec::with_capacity(columns),
334            column_data: Vec::with_capacity(columns),
335        };
336
337        for i in 0..columns {
338            let name = String::from_utf8(
339                reader
340                    .try_get_string()
341                    .inspect_err(|e| error!("reading column name (index {i}): {e}"))?
342                    .to_vec(),
343            )?;
344
345            let type_name = String::from_utf8(
346                reader
347                    .try_get_string()
348                    .inspect_err(|e| error!("reading column type (name {name}): {e}"))?
349                    .to_vec(),
350            )?;
351
352            // TODO: implement
353            let mut _has_custom_serialization = false;
354            if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
355                _has_custom_serialization = reader.try_get_u8()? != 0;
356            }
357
358            let type_ = Type::from_str(&type_name).inspect_err(|error| {
359                error!(?error, "Type deserialize failed: name={name}, type={type_name}");
360            })?;
361
362            #[allow(clippy::cast_possible_truncation)]
363            let mut row_data = if rows > 0 {
364                type_.deserialize_prefix(reader)?;
365                type_
366                    .deserialize_column_sync(reader, rows as usize, state)
367                    .inspect_err(|e| error!("deserialize (name {name}): {e}"))?
368            } else {
369                vec![]
370            };
371
372            block.column_types.push((name, type_));
373            block.column_data.append(&mut row_data);
374        }
375
376        Ok(block)
377    }
378}