clickhouse_arrow/formats/
native.rs

1use bytes::BytesMut;
2
3use super::DeserializerState;
4use super::protocol_data::{EmptyBlock, ProtocolData};
5use crate::Type;
6use crate::client::connection::ClientMetadata;
7use crate::compression::{compress_data_sync, decompress_data_async};
8use crate::io::{ClickHouseRead, ClickHouseWrite};
9use crate::native::block::Block;
10use crate::native::protocol::CompressionMethod;
11use crate::prelude::*;
12
13/// Marker for Native format.
14///
15/// Read native `ClickHouse` blocks into this library's `Block` struct and write `Block`s into the
16/// provided writer.
17#[derive(Debug, Clone, Copy)]
18pub struct NativeFormat {}
19
20impl ClientFormat for NativeFormat {
21    type Data = Block;
22
23    const FORMAT: &'static str = "Native";
24}
25
26impl super::sealed::ClientFormatImpl<Block> for NativeFormat {
27    type Deser = ();
28    type Schema = Vec<(String, Type)>;
29    type Ser = ();
30
31    async fn read<R: ClickHouseRead + 'static>(
32        reader: &mut R,
33        revision: u64,
34        metadata: ClientMetadata,
35        state: &mut DeserializerState,
36    ) -> Result<Option<Block>> {
37        Ok(if let CompressionMethod::None = metadata.compression {
38            Block::read_async(reader, revision, (), state).await?.into_option()
39        } else {
40            let mut buffer =
41                BytesMut::from_iter(decompress_data_async(reader, metadata.compression).await?);
42            Block::read(&mut buffer, revision, (), state)?.into_option()
43        })
44    }
45
46    async fn write<W: ClickHouseWrite>(
47        writer: &mut W,
48        data: Block,
49        qid: Qid,
50        header: Option<&[(String, Type)]>,
51        revision: u64,
52        metadata: ClientMetadata,
53    ) -> Result<()> {
54        if let CompressionMethod::None = metadata.compression {
55            data.write_async(writer, revision, header, ())
56                .instrument(trace_span!("serialize_block"))
57                .await
58                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "(block:uncompressed)"))
59        } else {
60            // Hybrid approach: sync serialization + async compression
61            let estimated_size = data.estimate_size();
62            let mut buffer = BytesMut::with_capacity(estimated_size);
63
64            data.write(&mut buffer, revision, header, ())
65                .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "(block:compressed)"))?;
66
67            compress_data_sync(writer, buffer.freeze(), metadata.compression)
68                .instrument(trace_span!("compress_block"))
69                .await
70                .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "compressing"))
71        }
72    }
73}