pack_it/
packer.rs

1use std::io::Write;
2
3use anyhow::Result;
4use log::{debug, info};
5
6use crate::{Table, TableField, Writer};
7
8pub struct Packer<W> {
9    writer: Writer<W>,
10    table: Table,
11}
12
13impl<W: Write + Send + 'static> Packer<W> {
14    pub fn new(inner: W, schema: &[TableField]) -> Result<Self> {
15        Ok(Self {
16            writer: Writer::new(vec![inner], schema)?,
17            table: Table::with_capacity(&schema.iter().map(|f| f.kind).collect::<Vec<_>>(), 0),
18        })
19    }
20
21    pub fn table(&mut self) -> &mut Table {
22        &mut self.table
23    }
24
25    pub fn find_field(&self, name: &str) -> Option<(usize, &TableField)> {
26        self.writer.find_field(name)
27    }
28
29    pub fn consider_flushing(&mut self) -> Result<()> {
30        self.table.check_consistent()?;
31
32        if self.table.mem_estimate() > 512 * 1024 * 1024 {
33            self.flush()?;
34        } else if self.table.rows() % (64 * 1024) == 0 {
35            let before = self.table.mem_estimate();
36            self.table.finish_bulk_push()?;
37            let mem_estimate = self.table.mem_estimate();
38            let rows = self.table.rows().max(1);
39            debug!(
40                "didn't flush ({} rows, ~{}MB (est: ~{}MB), ~{}bytes/row)",
41                rows,
42                mem_estimate / 1024 / 1024,
43                before / 1024 / 1024,
44                mem_estimate / rows
45            );
46        }
47
48        Ok(())
49    }
50
51    pub fn flush(&mut self) -> Result<()> {
52        let rows = self.table.rows();
53        if 0 == rows {
54            return Ok(());
55        }
56
57        // update the memory estimate, and check consistent
58        self.table.finish_bulk_push()?;
59        let mem_estimate = self.table.mem_estimate();
60
61        info!(
62            "submitting row group ({} rows, ~{}MB, ~{}bytes/row)",
63            rows,
64            mem_estimate / 1024 / 1024,
65            mem_estimate / rows
66        );
67
68        let batch = self.table.take_batch();
69
70        self.writer.submit_batch(batch)?;
71
72        Ok(())
73    }
74
75    pub fn finish(mut self) -> Result<W> {
76        self.flush()?;
77        Ok(self.writer.finish()?.pop().expect("exactly one"))
78    }
79}