katniss_pb2arrow/
record_conversion.rs1use arrow_array::builder::*;
2use arrow_array::RecordBatch;
3use arrow_schema::SchemaRef;
4use prost_reflect::DynamicMessage;
5
6use self::builder_appending::append_all_fields;
7use self::builder_creation::BuilderFactory;
8use crate::ArrowBatchProps;
9use crate::KatnissArrowError;
10use crate::Result;
11
12mod builder_appending;
13mod builder_creation;
14
15pub struct RecordConverter {
18 pub(crate) schema: SchemaRef,
19 builder: StructBuilder, }
21
22impl RecordConverter {
23 pub fn try_new(props: &ArrowBatchProps) -> Result<Self> {
24 let batch_size = props.records_per_arrow_batch;
25 let factory = BuilderFactory::new_with_dictionary(props.dictionaries.clone());
26 let builder = factory.try_from_fields(props.schema.fields().to_owned(), batch_size)?;
27 Ok(Self {
28 schema: props.schema.clone(),
29 builder,
30 })
31 }
32
33 pub fn append_message(&mut self, msg: &DynamicMessage) -> Result<()> {
35 append_all_fields(self.schema.fields(), &mut self.builder, Some(msg))
36 }
37
38 pub fn records(&mut self) -> RecordBatch {
40 let struct_array = self.builder.finish();
41 RecordBatch::from(&struct_array)
42 }
43
44 pub fn len(&self) -> usize {
46 self.builder.len()
47 }
48
49 #[must_use]
50 pub fn is_empty(&self) -> bool {
51 self.len() == 0
52 }
53}
54
55impl TryFrom<&ArrowBatchProps> for RecordConverter {
56 type Error = KatnissArrowError;
57 fn try_from(props: &ArrowBatchProps) -> Result<Self> {
58 RecordConverter::try_new(props)
59 }
60}