1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
pub use arrow::io::avro::avro_schema::file::Compression;
use arrow::io::avro::avro_schema::{self};
use arrow::io::avro::write;
pub use Compression as AvroCompression;
use super::*;
#[must_use]
pub struct AvroWriter<W> {
writer: W,
compression: Option<AvroCompression>,
}
impl<W> AvroWriter<W>
where
W: Write,
{
pub fn with_compression(mut self, compression: Option<AvroCompression>) -> Self {
self.compression = compression;
self
}
}
impl<W> SerWriter<W> for AvroWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self {
Self {
writer,
compression: None,
}
}
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
let schema = df.schema().to_arrow();
let record = write::to_record(&schema)?;
let mut data = vec![];
let mut compressed_block = avro_schema::file::CompressedBlock::default();
for chunk in df.iter_chunks() {
let mut serializers = chunk
.iter()
.zip(record.fields.iter())
.map(|(array, field)| write::new_serializer(array.as_ref(), &field.schema))
.collect::<Vec<_>>();
let mut block =
avro_schema::file::Block::new(chunk.arrays()[0].len(), std::mem::take(&mut data));
write::serialize(&mut serializers, &mut block);
let _was_compressed =
avro_schema::write::compress(&mut block, &mut compressed_block, self.compression)
.map_err(convert_err)?;
avro_schema::write::write_metadata(&mut self.writer, record.clone(), self.compression)
.map_err(convert_err)?;
avro_schema::write::write_block(&mut self.writer, &compressed_block)
.map_err(convert_err)?;
data = block.data;
data.clear();
compressed_block.data.clear();
compressed_block.number_of_rows = 0
}
Ok(())
}
}