clickhouse_arrow/formats/
native.rs1use bytes::BytesMut;
2
3use super::DeserializerState;
4use super::protocol_data::{EmptyBlock, ProtocolData};
5use crate::Type;
6use crate::client::connection::ClientMetadata;
7use crate::compression::{DecompressionReader, compress_data_sync};
8use crate::io::{ClickHouseRead, ClickHouseWrite};
9use crate::native::block::Block;
10use crate::native::protocol::CompressionMethod;
11use crate::prelude::*;
12
13#[derive(Debug, Clone, Copy)]
18pub struct NativeFormat {}
19
20impl ClientFormat for NativeFormat {
21 type Data = Block;
22
23 const FORMAT: &'static str = "Native";
24}
25
26impl super::sealed::ClientFormatImpl<Block> for NativeFormat {
27 type Deser = ();
28 type Schema = Vec<(String, Type)>;
29 type Ser = ();
30
31 async fn read<R: ClickHouseRead + 'static>(
32 reader: &mut R,
33 revision: u64,
34 metadata: ClientMetadata,
35 state: &mut DeserializerState,
36 ) -> Result<Option<Block>> {
37 Ok(if let CompressionMethod::None = metadata.compression {
38 Block::read_async(reader, revision, (), state).await?.into_option()
39 } else {
40 let mut decompressor = DecompressionReader::new(metadata.compression, reader).await?;
41 Block::read_async(&mut decompressor, revision, (), state).await?.into_option()
42 })
43 }
44
45 async fn write<W: ClickHouseWrite>(
46 writer: &mut W,
47 data: Block,
48 qid: Qid,
49 header: Option<&[(String, Type)]>,
50 revision: u64,
51 metadata: ClientMetadata,
52 ) -> Result<()> {
53 if let CompressionMethod::None = metadata.compression {
54 data.write_async(writer, revision, header, ())
55 .instrument(trace_span!("serialize_block"))
56 .await
57 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "(block:uncompressed)"))
58 } else {
59 let estimated_size = data.estimate_size();
61 let mut buffer = BytesMut::with_capacity(estimated_size);
62
63 data.write(&mut buffer, revision, header, ())
64 .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "(block:compressed)"))?;
65
66 compress_data_sync(writer, buffer.freeze(), metadata.compression)
67 .instrument(trace_span!("compress_block"))
68 .await
69 .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "compressing"))
70 }
71 }
72}