polars_arrow/io/avro/read/
mod.rs1use std::io::Read;
3
4use avro_schema::file::FileMetadata;
5use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;
6use avro_schema::read::{BlockStreamingIterator, block_iterator};
7use avro_schema::schema::Field as AvroField;
8
9mod deserialize;
10pub use deserialize::deserialize;
11use polars_error::PolarsResult;
12
13mod nested;
14mod schema;
15mod util;
16
17pub use schema::infer_schema;
18
19use crate::array::Array;
20use crate::datatypes::ArrowSchema;
21use crate::record_batch::RecordBatchT;
22
23pub struct Reader<R: Read> {
25 iter: BlockStreamingIterator<R>,
26 avro_fields: Vec<AvroField>,
27 fields: ArrowSchema,
28 projection: Vec<bool>,
29}
30
31impl<R: Read> Reader<R> {
32 pub fn new(
34 reader: R,
35 metadata: FileMetadata,
36 fields: ArrowSchema,
37 projection: Option<Vec<bool>>,
38 ) -> Self {
39 let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());
40
41 Self {
42 iter: block_iterator(reader, metadata.compression, metadata.marker),
43 avro_fields: metadata.record.fields,
44 fields,
45 projection,
46 }
47 }
48
49 pub fn into_inner(self) -> R {
51 self.iter.into_inner()
52 }
53}
54
55impl<R: Read> Iterator for Reader<R> {
56 type Item = PolarsResult<RecordBatchT<Box<dyn Array>>>;
57
58 fn next(&mut self) -> Option<Self::Item> {
59 let fields = &self.fields;
60 let avro_fields = &self.avro_fields;
61 let projection = &self.projection;
62
63 self.iter
64 .next()
65 .transpose()
66 .map(|maybe_block| deserialize(maybe_block?, fields, avro_fields, projection))
67 }
68}