clickhouse_arrow/formats/
native.rs

1use bytes::BytesMut;
2
3use super::DeserializerState;
4use super::protocol_data::{EmptyBlock, ProtocolData};
5use crate::Type;
6use crate::client::connection::ClientMetadata;
7use crate::compression::{DecompressionReader, compress_data_sync};
8use crate::io::{ClickHouseRead, ClickHouseWrite};
9use crate::native::block::Block;
10use crate::native::protocol::CompressionMethod;
11use crate::prelude::*;
12
13/// Marker for Native format.
14///
15/// Read native `ClickHouse` blocks into this library's `Block` struct and write `Block`s into the
16/// provided writer.
17#[derive(Debug, Clone, Copy)]
18pub struct NativeFormat {}
19
20impl ClientFormat for NativeFormat {
21    type Data = Block;
22
23    const FORMAT: &'static str = "Native";
24}
25
26impl super::sealed::ClientFormatImpl<Block> for NativeFormat {
27    type Deser = ();
28    type Schema = Vec<(String, Type)>;
29    type Ser = ();
30
31    async fn read<R: ClickHouseRead + 'static>(
32        reader: &mut R,
33        revision: u64,
34        metadata: ClientMetadata,
35        state: &mut DeserializerState,
36    ) -> Result<Option<Block>> {
37        Ok(if let CompressionMethod::None = metadata.compression {
38            Block::read_async(reader, revision, (), state).await?.into_option()
39        } else {
40            let mut decompressor = DecompressionReader::new(metadata.compression, reader).await?;
41            Block::read_async(&mut decompressor, revision, (), state).await?.into_option()
42        })
43    }
44
45    async fn write<W: ClickHouseWrite>(
46        writer: &mut W,
47        data: Block,
48        qid: Qid,
49        header: Option<&[(String, Type)]>,
50        revision: u64,
51        metadata: ClientMetadata,
52    ) -> Result<()> {
53        if let CompressionMethod::None = metadata.compression {
54            data.write_async(writer, revision, header, ())
55                .instrument(trace_span!("serialize_block"))
56                .await
57                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "(block:uncompressed)"))
58        } else {
59            // Hybrid approach: sync serialization + async compression
60            let estimated_size = data.estimate_size();
61            let mut buffer = BytesMut::with_capacity(estimated_size);
62
63            data.write(&mut buffer, revision, header, ())
64                .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "(block:compressed)"))?;
65
66            compress_data_sync(writer, buffer.freeze(), metadata.compression)
67                .instrument(trace_span!("compress_block"))
68                .await
69                .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "compressing"))
70        }
71    }
72}