clickhouse_arrow/formats/
native.rs1use bytes::BytesMut;
2
3use super::DeserializerState;
4use super::protocol_data::{EmptyBlock, ProtocolData};
5use crate::Type;
6use crate::client::connection::ClientMetadata;
7use crate::compression::{compress_data_sync, decompress_data_async};
8use crate::io::{ClickHouseRead, ClickHouseWrite};
9use crate::native::block::Block;
10use crate::native::protocol::CompressionMethod;
11use crate::prelude::*;
12
13#[derive(Debug, Clone, Copy)]
18pub struct NativeFormat {}
19
20impl ClientFormat for NativeFormat {
21 type Data = Block;
22
23 const FORMAT: &'static str = "Native";
24}
25
26impl super::sealed::ClientFormatImpl<Block> for NativeFormat {
27 type Deser = ();
28 type Schema = Vec<(String, Type)>;
29 type Ser = ();
30
31 async fn read<R: ClickHouseRead + 'static>(
32 reader: &mut R,
33 revision: u64,
34 metadata: ClientMetadata,
35 state: &mut DeserializerState,
36 ) -> Result<Option<Block>> {
37 Ok(if let CompressionMethod::None = metadata.compression {
38 Block::read_async(reader, revision, (), state).await?.into_option()
39 } else {
40 let mut buffer =
41 BytesMut::from_iter(decompress_data_async(reader, metadata.compression).await?);
42 Block::read(&mut buffer, revision, (), state)?.into_option()
43 })
44 }
45
46 async fn write<W: ClickHouseWrite>(
47 writer: &mut W,
48 data: Block,
49 qid: Qid,
50 header: Option<&[(String, Type)]>,
51 revision: u64,
52 metadata: ClientMetadata,
53 ) -> Result<()> {
54 if let CompressionMethod::None = metadata.compression {
55 data.write_async(writer, revision, header, ())
56 .instrument(trace_span!("serialize_block"))
57 .await
58 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "(block:uncompressed)"))
59 } else {
60 let estimated_size = data.estimate_size();
62 let mut buffer = BytesMut::with_capacity(estimated_size);
63
64 data.write(&mut buffer, revision, header, ())
65 .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "(block:compressed)"))?;
66
67 compress_data_sync(writer, buffer.freeze(), metadata.compression)
68 .instrument(trace_span!("compress_block"))
69 .await
70 .inspect_err(|error| error!(?error, {ATT_QID} = %qid, "compressing"))
71 }
72 }
73}