sql_splitter/writer/
mod.rs1use ahash::AHashMap;
2use std::fs::{self, File};
3use std::io::{BufWriter, Write};
4use std::path::{Path, PathBuf};
5
6pub const WRITER_BUFFER_SIZE: usize = 256 * 1024;
7pub const STMT_BUFFER_COUNT: usize = 100;
8
9pub struct TableWriter {
10 writer: BufWriter<File>,
11 write_count: usize,
12 max_stmt_buffer: usize,
13}
14
15impl TableWriter {
16 pub fn new(filename: &Path) -> std::io::Result<Self> {
17 let file = File::create(filename)?;
18 let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, file);
19
20 Ok(Self {
21 writer,
22 write_count: 0,
23 max_stmt_buffer: STMT_BUFFER_COUNT,
24 })
25 }
26
27 pub fn write_statement(&mut self, stmt: &[u8]) -> std::io::Result<()> {
28 self.writer.write_all(stmt)?;
29 self.writer.write_all(b"\n")?;
30
31 self.write_count += 1;
32 if self.write_count >= self.max_stmt_buffer {
33 self.write_count = 0;
34 self.writer.flush()?;
35 }
36
37 Ok(())
38 }
39
40 pub fn write_statement_with_suffix(
41 &mut self,
42 stmt: &[u8],
43 suffix: &[u8],
44 ) -> std::io::Result<()> {
45 self.writer.write_all(stmt)?;
46 self.writer.write_all(suffix)?;
47 self.writer.write_all(b"\n")?;
48
49 self.write_count += 1;
50 if self.write_count >= self.max_stmt_buffer {
51 self.write_count = 0;
52 self.writer.flush()?;
53 }
54
55 Ok(())
56 }
57
58 pub fn flush(&mut self) -> std::io::Result<()> {
59 self.write_count = 0;
60 self.writer.flush()
61 }
62}
63
64pub struct WriterPool {
65 output_dir: PathBuf,
66 writers: AHashMap<String, TableWriter>,
67}
68
69impl WriterPool {
70 pub fn new(output_dir: PathBuf) -> Self {
71 Self {
72 output_dir,
73 writers: AHashMap::new(),
74 }
75 }
76
77 pub fn ensure_output_dir(&self) -> std::io::Result<()> {
78 fs::create_dir_all(&self.output_dir)
79 }
80
81 pub fn get_writer(&mut self, table_name: &str) -> std::io::Result<&mut TableWriter> {
82 use std::collections::hash_map::Entry;
83
84 match self.writers.entry(table_name.to_string()) {
86 Entry::Occupied(entry) => Ok(entry.into_mut()),
87 Entry::Vacant(entry) => {
88 let filename = self.output_dir.join(format!("{}.sql", table_name));
89 let writer = TableWriter::new(&filename)?;
90 Ok(entry.insert(writer))
91 }
92 }
93 }
94
95 pub fn write_statement(&mut self, table_name: &str, stmt: &[u8]) -> std::io::Result<()> {
96 let writer = self.get_writer(table_name)?;
97 writer.write_statement(stmt)
98 }
99
100 pub fn write_statement_with_suffix(
101 &mut self,
102 table_name: &str,
103 stmt: &[u8],
104 suffix: &[u8],
105 ) -> std::io::Result<()> {
106 let writer = self.get_writer(table_name)?;
107 writer.write_statement_with_suffix(stmt, suffix)
108 }
109
110 pub fn close_all(&mut self) -> std::io::Result<()> {
111 for (_, writer) in self.writers.iter_mut() {
112 writer.flush()?;
113 }
114 Ok(())
115 }
116}