katniss_pb2arrow/
record_conversion.rs

1use arrow_array::builder::*;
2use arrow_array::RecordBatch;
3use arrow_schema::SchemaRef;
4use prost_reflect::DynamicMessage;
5
6use self::builder_appending::append_all_fields;
7use self::builder_creation::BuilderFactory;
8use crate::ArrowBatchProps;
9use crate::KatnissArrowError;
10use crate::Result;
11
12mod builder_appending;
13mod builder_creation;
14
15/// Converterts records from protobuf to arrow
16/// Holds records in the builder until records() is called draining builder.
17pub struct RecordConverter {
18    pub(crate) schema: SchemaRef,
19    builder: StructBuilder, // fields align with schema
20}
21
22impl RecordConverter {
23    pub fn try_new(props: &ArrowBatchProps) -> Result<Self> {
24        let batch_size = props.records_per_arrow_batch;
25        let factory = BuilderFactory::new_with_dictionary(props.dictionaries.clone());
26        let builder = factory.try_from_fields(props.schema.fields().to_owned(), batch_size)?;
27        Ok(Self {
28            schema: props.schema.clone(),
29            builder,
30        })
31    }
32
33    /// Append a new protobuf message to this batch
34    pub fn append_message(&mut self, msg: &DynamicMessage) -> Result<()> {
35        append_all_fields(self.schema.fields(), &mut self.builder, Some(msg))
36    }
37
38    /// Returns record batch and resets the builder
39    pub fn records(&mut self) -> RecordBatch {
40        let struct_array = self.builder.finish();
41        RecordBatch::from(&struct_array)
42    }
43
44    /// Number of rows in this batch so far
45    pub fn len(&self) -> usize {
46        self.builder.len()
47    }
48
49    #[must_use]
50    pub fn is_empty(&self) -> bool {
51        self.len() == 0
52    }
53}
54
55impl TryFrom<&ArrowBatchProps> for RecordConverter {
56    type Error = KatnissArrowError;
57    fn try_from(props: &ArrowBatchProps) -> Result<Self> {
58        RecordConverter::try_new(props)
59    }
60}