1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
mod iterator;
mod serialize;
use iterator::StreamingIterator;
use std::io::Write;
pub use csv::{ByteRecord, Writer, WriterBuilder};
pub use serialize::*;
use crate::record_batch::RecordBatch;
use crate::{datatypes::Schema, error::Result};
fn new_serializers<'a>(
batch: &'a RecordBatch,
options: &'a SerializeOptions,
) -> Result<Vec<Box<dyn StreamingIterator<Item = [u8]> + 'a>>> {
batch
.columns()
.iter()
.map(|column| new_serializer(column.as_ref(), options))
.collect()
}
pub fn serialize(batch: &RecordBatch, options: &SerializeOptions) -> Result<Vec<ByteRecord>> {
let mut serializers = new_serializers(batch, options)?;
let mut records = vec![ByteRecord::with_capacity(0, batch.num_columns()); batch.num_rows()];
records.iter_mut().for_each(|record| {
serializers
.iter_mut()
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
}
pub fn write_batch<W: Write>(
writer: &mut Writer<W>,
batch: &RecordBatch,
options: &SerializeOptions,
) -> Result<()> {
let mut serializers = new_serializers(batch, options)?;
let mut record = ByteRecord::with_capacity(0, batch.num_columns());
(0..batch.num_rows()).try_for_each(|_| {
serializers
.iter_mut()
.for_each(|iter| record.push_field(iter.next().unwrap()));
writer.write_byte_record(&record)?;
record.clear();
Result::Ok(())
})?;
Ok(())
}
pub fn write_header<W: Write>(writer: &mut Writer<W>, schema: &Schema) -> Result<()> {
let fields = schema
.fields()
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>();
writer.write_record(&fields)?;
Ok(())
}