sql_splitter/writer/
mod.rs1use ahash::AHashMap;
2use std::fs::{self, File};
3use std::io::{BufWriter, Write};
4use std::path::{Path, PathBuf};
5
6pub const WRITER_BUFFER_SIZE: usize = 256 * 1024;
7pub const STMT_BUFFER_COUNT: usize = 100;
8
9pub struct TableWriter {
10 writer: BufWriter<File>,
11 write_count: usize,
12 max_stmt_buffer: usize,
13}
14
15impl TableWriter {
16 pub fn new(filename: &Path) -> std::io::Result<Self> {
17 let file = File::create(filename)?;
18 let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, file);
19
20 Ok(Self {
21 writer,
22 write_count: 0,
23 max_stmt_buffer: STMT_BUFFER_COUNT,
24 })
25 }
26
27 pub fn write_statement(&mut self, stmt: &[u8]) -> std::io::Result<()> {
28 self.writer.write_all(stmt)?;
29 self.writer.write_all(b"\n")?;
30
31 self.write_count += 1;
32 if self.write_count >= self.max_stmt_buffer {
33 self.write_count = 0;
34 self.writer.flush()?;
35 }
36
37 Ok(())
38 }
39
40 pub fn flush(&mut self) -> std::io::Result<()> {
41 self.write_count = 0;
42 self.writer.flush()
43 }
44}
45
46pub struct WriterPool {
47 output_dir: PathBuf,
48 writers: AHashMap<String, TableWriter>,
49}
50
51impl WriterPool {
52 pub fn new(output_dir: PathBuf) -> Self {
53 Self {
54 output_dir,
55 writers: AHashMap::new(),
56 }
57 }
58
59 pub fn ensure_output_dir(&self) -> std::io::Result<()> {
60 fs::create_dir_all(&self.output_dir)
61 }
62
63 pub fn get_writer(&mut self, table_name: &str) -> std::io::Result<&mut TableWriter> {
64 if !self.writers.contains_key(table_name) {
65 let filename = self.output_dir.join(format!("{}.sql", table_name));
66 let writer = TableWriter::new(&filename)?;
67 self.writers.insert(table_name.to_string(), writer);
68 }
69
70 Ok(self.writers.get_mut(table_name).unwrap())
71 }
72
73 pub fn write_statement(&mut self, table_name: &str, stmt: &[u8]) -> std::io::Result<()> {
74 let writer = self.get_writer(table_name)?;
75 writer.write_statement(stmt)
76 }
77
78 pub fn close_all(&mut self) -> std::io::Result<()> {
79 for (_, writer) in self.writers.iter_mut() {
80 writer.flush()?;
81 }
82 Ok(())
83 }
84}