clickhouse_arrow/formats/
arrow.rs

1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::BytesMut;
4
5use super::DeserializerState;
6use super::protocol_data::{EmptyBlock, ProtocolData};
7use crate::Type;
8use crate::arrow::ArrowDeserializerState;
9use crate::compression::{compress_data_sync, decompress_data_async};
10use crate::connection::ClientMetadata;
11use crate::io::{ClickHouseRead, ClickHouseWrite};
12use crate::native::protocol::CompressionMethod;
13use crate::prelude::*;
14
15/// Marker trait for Arrow format.
16///
17/// Read native `ClickHouse` blocks into arrow `RecordBatch`es and write arrow `RecordBatch`es into
18/// native blocks.
19#[derive(Debug, Clone, Copy)]
20pub struct ArrowFormat {}
21
22impl ClientFormat for ArrowFormat {
23    type Data = RecordBatch;
24
25    const FORMAT: &'static str = "Arrow";
26}
27
28impl super::sealed::ClientFormatImpl<RecordBatch> for ArrowFormat {
29    type Deser = ArrowDeserializerState;
30    type Schema = SchemaRef;
31    type Ser = ();
32
33    fn finish_deser(state: &mut DeserializerState<Self::Deser>) {
34        state.deserializer().builders.clear();
35        state.deserializer().buffer.clear();
36    }
37
38    async fn write<W: ClickHouseWrite>(
39        writer: &mut W,
40        batch: RecordBatch,
41        qid: Qid,
42        header: Option<&[(String, Type)]>,
43        revision: u64,
44        metadata: ClientMetadata,
45    ) -> Result<()> {
46        if let CompressionMethod::None = metadata.compression {
47            batch
48                .write_async(writer, revision, header, metadata.arrow_options)
49                .instrument(trace_span!("serialize_block"))
50                .await
51                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "serialize"))?;
52        } else {
53            let mut raw = BytesMut::with_capacity(batch.get_array_memory_size());
54            batch
55                .write(&mut raw, revision, header, metadata.arrow_options)
56                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "serialize"))?;
57            compress_data_sync(writer, raw.freeze(), metadata.compression)
58                .await
59                .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "compressing"))?;
60        }
61
62        Ok(())
63    }
64
65    async fn read<R: ClickHouseRead + 'static>(
66        reader: &mut R,
67        revision: u64,
68        metadata: ClientMetadata,
69        state: &mut DeserializerState<Self::Deser>,
70    ) -> Result<Option<RecordBatch>> {
71        let arrow_options = metadata.arrow_options;
72        if let CompressionMethod::None = metadata.compression {
73            RecordBatch::read_async(reader, revision, arrow_options, state).await
74        } else {
75            let mut buffer =
76                BytesMut::from_iter(decompress_data_async(reader, metadata.compression).await?);
77            // TODO: Spawn onto an executor "state.executor"
78            RecordBatch::read(&mut buffer, revision, arrow_options, state)
79        }
80        .inspect_err(|error| error!(?error, "deserializing arrow record batch"))
81        .map(RecordBatch::into_option)
82    }
83}