polars_arrow/io/avro/read/
mod.rs

1//! APIs to read from Avro format to arrow.
2use std::io::Read;
3
4use avro_schema::file::FileMetadata;
5use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;
6use avro_schema::read::{BlockStreamingIterator, block_iterator};
7use avro_schema::schema::Field as AvroField;
8
9mod deserialize;
10pub use deserialize::deserialize;
11use polars_error::PolarsResult;
12
13mod nested;
14mod schema;
15mod util;
16
17pub use schema::infer_schema;
18
19use crate::array::Array;
20use crate::datatypes::ArrowSchema;
21use crate::record_batch::RecordBatchT;
22
23/// Single threaded, blocking reader of Avro; [`Iterator`] of [`RecordBatchT`].
24pub struct Reader<R: Read> {
25    iter: BlockStreamingIterator<R>,
26    avro_fields: Vec<AvroField>,
27    fields: ArrowSchema,
28    projection: Vec<bool>,
29}
30
31impl<R: Read> Reader<R> {
32    /// Creates a new [`Reader`].
33    pub fn new(
34        reader: R,
35        metadata: FileMetadata,
36        fields: ArrowSchema,
37        projection: Option<Vec<bool>>,
38    ) -> Self {
39        let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());
40
41        Self {
42            iter: block_iterator(reader, metadata.compression, metadata.marker),
43            avro_fields: metadata.record.fields,
44            fields,
45            projection,
46        }
47    }
48
49    /// Deconstructs itself into its internal reader
50    pub fn into_inner(self) -> R {
51        self.iter.into_inner()
52    }
53}
54
55impl<R: Read> Iterator for Reader<R> {
56    type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
57
58    fn next(&mut self) -> Option<Self::Item> {
59        let fields = &self.fields;
60        let avro_fields = &self.avro_fields;
61        let projection = &self.projection;
62
63        self.iter
64            .next()
65            .transpose()
66            .map(|maybe_block| deserialize(maybe_block?, fields, avro_fields, projection))
67    }
68}