1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use std::io::Write;
use anyhow::Result;
use log::{debug, info};
use crate::{Table, TableField, Writer};
pub struct Packer<W> {
writer: Writer<W>,
table: Table,
}
impl<W: Write + Send + 'static> Packer<W> {
pub fn new(inner: W, schema: &[TableField]) -> Result<Self> {
Ok(Self {
writer: Writer::new(vec![inner], schema)?,
table: Table::with_capacity(&schema.iter().map(|f| f.kind).collect::<Vec<_>>(), 0),
})
}
pub fn table(&mut self) -> &mut Table {
&mut self.table
}
pub fn find_field(&self, name: &str) -> Option<(usize, &TableField)> {
self.writer.find_field(name)
}
pub fn consider_flushing(&mut self) -> Result<()> {
self.table.check_consistent()?;
if self.table.mem_estimate() > 512 * 1024 * 1024 {
self.flush()?;
} else if self.table.rows() % (64 * 1024) == 0 {
let before = self.table.mem_estimate();
self.table.finish_bulk_push()?;
let mem_estimate = self.table.mem_estimate();
let rows = self.table.rows().max(1);
debug!(
"didn't flush ({} rows, ~{}MB (est: ~{}MB), ~{}bytes/row)",
rows,
mem_estimate / 1024 / 1024,
before / 1024 / 1024,
mem_estimate / rows
);
}
Ok(())
}
fn flush(&mut self) -> Result<()> {
let rows = self.table.rows();
if 0 == rows {
return Ok(());
}
self.table.finish_bulk_push()?;
let mem_estimate = self.table.mem_estimate();
info!(
"submitting row group ({} rows, ~{}MB, ~{}bytes/row)",
rows,
mem_estimate / 1024 / 1024,
mem_estimate / rows
);
let batch = self.table.take_batch();
self.writer.submit_batch(batch)?;
Ok(())
}
pub fn finish(mut self) -> Result<W> {
self.flush()?;
Ok(self.writer.finish()?.pop().expect("exactly one"))
}
}