clickhouse_arrow/formats/
arrow.rs1use arrow::array::RecordBatch;
2use arrow::datatypes::SchemaRef;
3use bytes::BytesMut;
4
5use super::DeserializerState;
6use super::protocol_data::{EmptyBlock, ProtocolData};
7use crate::Type;
8use crate::arrow::ArrowDeserializerState;
9use crate::compression::{DecompressionReader, compress_data_sync};
10use crate::connection::ClientMetadata;
11use crate::io::{ClickHouseRead, ClickHouseWrite};
12use crate::native::protocol::CompressionMethod;
13use crate::prelude::*;
14
15#[derive(Debug, Clone, Copy)]
20pub struct ArrowFormat {}
21
22impl ClientFormat for ArrowFormat {
23 type Data = RecordBatch;
24
25 const FORMAT: &'static str = "Arrow";
26}
27
28impl super::sealed::ClientFormatImpl<RecordBatch> for ArrowFormat {
29 type Deser = ArrowDeserializerState;
30 type Schema = SchemaRef;
31 type Ser = ();
32
33 fn finish_deser(state: &mut DeserializerState<Self::Deser>) {
34 state.deserializer().builders.clear();
35 state.deserializer().buffer.clear();
36 }
37
38 async fn write<W: ClickHouseWrite>(
39 writer: &mut W,
40 batch: RecordBatch,
41 qid: Qid,
42 header: Option<&[(String, Type)]>,
43 revision: u64,
44 metadata: ClientMetadata,
45 ) -> Result<()> {
46 if let CompressionMethod::None = metadata.compression {
47 batch
48 .write_async(writer, revision, header, metadata.arrow_options)
49 .instrument(trace_span!("serialize_block"))
50 .await
51 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "serialize"))?;
52 } else {
53 let mut raw = BytesMut::with_capacity(batch.get_array_memory_size());
54 batch
55 .write(&mut raw, revision, header, metadata.arrow_options)
56 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "serialize"))?;
57 compress_data_sync(writer, raw.freeze(), metadata.compression)
58 .await
59 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "compressing"))?;
60 }
61
62 Ok(())
63 }
64
65 async fn read<R: ClickHouseRead + 'static>(
66 reader: &mut R,
67 revision: u64,
68 metadata: ClientMetadata,
69 state: &mut DeserializerState<Self::Deser>,
70 ) -> Result<Option<RecordBatch>> {
71 let arrow_options = metadata.arrow_options;
72 if let CompressionMethod::None = metadata.compression {
73 RecordBatch::read_async(reader, revision, arrow_options, state).await
74 } else {
75 let mut decompressor = DecompressionReader::new(metadata.compression, reader).await?;
76 RecordBatch::read_async(&mut decompressor, revision, arrow_options, state).await
77 }
78 .inspect_err(|error| error!(?error, "deserializing arrow record batch"))
79 .map(RecordBatch::into_option)
80 }
81}