clickhouse_arrow/formats/
arrow.rs

1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::BytesMut;
4
5use super::DeserializerState;
6use super::protocol_data::{EmptyBlock, ProtocolData};
7use crate::Type;
8use crate::arrow::ArrowDeserializerState;
9use crate::compression::{DecompressionReader, compress_data_sync};
10use crate::connection::ClientMetadata;
11use crate::io::{ClickHouseRead, ClickHouseWrite};
12use crate::native::protocol::CompressionMethod;
13use crate::prelude::*;
14
15/// Marker trait for Arrow format.
16///
17/// Read native `ClickHouse` blocks into arrow `RecordBatch`es and write arrow `RecordBatch`es into
18/// native blocks.
19#[derive(Debug, Clone, Copy)]
20pub struct ArrowFormat {}
21
22impl ClientFormat for ArrowFormat {
23    type Data = RecordBatch;
24
25    const FORMAT: &'static str = "Arrow";
26}
27
28impl super::sealed::ClientFormatImpl<RecordBatch> for ArrowFormat {
29    type Deser = ArrowDeserializerState;
30    type Schema = SchemaRef;
31    type Ser = ();
32
33    fn finish_deser(state: &mut DeserializerState<Self::Deser>) {
34        state.deserializer().builders.clear();
35        state.deserializer().buffer.clear();
36    }
37
38    async fn write<W: ClickHouseWrite>(
39        writer: &mut W,
40        batch: RecordBatch,
41        qid: Qid,
42        header: Option<&[(String, Type)]>,
43        revision: u64,
44        metadata: ClientMetadata,
45    ) -> Result<()> {
46        if let CompressionMethod::None = metadata.compression {
47            batch
48                .write_async(writer, revision, header, metadata.arrow_options)
49                .instrument(trace_span!("serialize_block"))
50                .await
51                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "serialize"))?;
52        } else {
53            let mut raw = BytesMut::with_capacity(batch.get_array_memory_size());
54            batch
55                .write(&mut raw, revision, header, metadata.arrow_options)
56                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "serialize"))?;
57            compress_data_sync(writer, raw.freeze(), metadata.compression)
58                .await
59                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "compressing"))?;
60        }
61
62        Ok(())
63    }
64
65    async fn read<R: ClickHouseRead + 'static>(
66        reader: &mut R,
67        revision: u64,
68        metadata: ClientMetadata,
69        state: &mut DeserializerState<Self::Deser>,
70    ) -> Result<Option<RecordBatch>> {
71        let arrow_options = metadata.arrow_options;
72        if let CompressionMethod::None = metadata.compression {
73            RecordBatch::read_async(reader, revision, arrow_options, state).await
74        } else {
75            let mut decompressor = DecompressionReader::new(metadata.compression, reader).await?;
76            RecordBatch::read_async(&mut decompressor, revision, arrow_options, state).await
77        }
78        .inspect_err(|error| error!(?error, "deserializing arrow record batch"))
79        .map(RecordBatch::into_option)
80    }
81}