1use std::io::Write;
2
3use anyhow::Result;
4use log::{debug, info};
5
6use crate::{Table, TableField, Writer};
7
8pub struct Packer<W> {
9 writer: Writer<W>,
10 table: Table,
11}
12
13impl<W: Write + Send + 'static> Packer<W> {
14 pub fn new(inner: W, schema: &[TableField]) -> Result<Self> {
15 Ok(Self {
16 writer: Writer::new(vec![inner], schema)?,
17 table: Table::with_capacity(&schema.iter().map(|f| f.kind).collect::<Vec<_>>(), 0),
18 })
19 }
20
21 pub fn table(&mut self) -> &mut Table {
22 &mut self.table
23 }
24
25 pub fn find_field(&self, name: &str) -> Option<(usize, &TableField)> {
26 self.writer.find_field(name)
27 }
28
29 pub fn consider_flushing(&mut self) -> Result<()> {
30 self.table.check_consistent()?;
31
32 if self.table.mem_estimate() > 512 * 1024 * 1024 {
33 self.flush()?;
34 } else if self.table.rows() % (64 * 1024) == 0 {
35 let before = self.table.mem_estimate();
36 self.table.finish_bulk_push()?;
37 let mem_estimate = self.table.mem_estimate();
38 let rows = self.table.rows().max(1);
39 debug!(
40 "didn't flush ({} rows, ~{}MB (est: ~{}MB), ~{}bytes/row)",
41 rows,
42 mem_estimate / 1024 / 1024,
43 before / 1024 / 1024,
44 mem_estimate / rows
45 );
46 }
47
48 Ok(())
49 }
50
51 pub fn flush(&mut self) -> Result<()> {
52 let rows = self.table.rows();
53 if 0 == rows {
54 return Ok(());
55 }
56
57 self.table.finish_bulk_push()?;
59 let mem_estimate = self.table.mem_estimate();
60
61 info!(
62 "submitting row group ({} rows, ~{}MB, ~{}bytes/row)",
63 rows,
64 mem_estimate / 1024 / 1024,
65 mem_estimate / rows
66 );
67
68 let batch = self.table.take_batch();
69
70 self.writer.submit_batch(batch)?;
71
72 Ok(())
73 }
74
75 pub fn finish(mut self) -> Result<W> {
76 self.flush()?;
77 Ok(self.writer.finish()?.pop().expect("exactly one"))
78 }
79}