sql_splitter/writer/
mod.rs1use ahash::AHashMap;
4use std::fs::{self, File};
5use std::io::{BufWriter, Write};
6use std::path::{Path, PathBuf};
7
8pub const WRITER_BUFFER_SIZE: usize = 256 * 1024;
10pub const STMT_BUFFER_COUNT: usize = 100;
12
13pub struct TableWriter {
15 writer: BufWriter<File>,
16 write_count: usize,
17 max_stmt_buffer: usize,
18}
19
20impl TableWriter {
21 pub fn new(filename: &Path) -> std::io::Result<Self> {
23 let file = File::create(filename)?;
24 let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, file);
25
26 Ok(Self {
27 writer,
28 write_count: 0,
29 max_stmt_buffer: STMT_BUFFER_COUNT,
30 })
31 }
32
33 pub fn write_statement(&mut self, stmt: &[u8]) -> std::io::Result<()> {
35 self.writer.write_all(stmt)?;
36 self.writer.write_all(b"\n")?;
37
38 self.write_count += 1;
39 if self.write_count >= self.max_stmt_buffer {
40 self.write_count = 0;
41 self.writer.flush()?;
42 }
43
44 Ok(())
45 }
46
47 pub fn write_statement_with_suffix(
49 &mut self,
50 stmt: &[u8],
51 suffix: &[u8],
52 ) -> std::io::Result<()> {
53 self.writer.write_all(stmt)?;
54 self.writer.write_all(suffix)?;
55 self.writer.write_all(b"\n")?;
56
57 self.write_count += 1;
58 if self.write_count >= self.max_stmt_buffer {
59 self.write_count = 0;
60 self.writer.flush()?;
61 }
62
63 Ok(())
64 }
65
66 pub fn flush(&mut self) -> std::io::Result<()> {
68 self.write_count = 0;
69 self.writer.flush()
70 }
71}
72
73pub struct WriterPool {
75 output_dir: PathBuf,
76 writers: AHashMap<String, TableWriter>,
77}
78
79impl WriterPool {
80 pub fn new(output_dir: PathBuf) -> Self {
82 Self {
83 output_dir,
84 writers: AHashMap::new(),
85 }
86 }
87
88 pub fn ensure_output_dir(&self) -> std::io::Result<()> {
90 fs::create_dir_all(&self.output_dir)
91 }
92
93 pub fn get_writer(&mut self, table_name: &str) -> std::io::Result<&mut TableWriter> {
95 use std::collections::hash_map::Entry;
96
97 match self.writers.entry(table_name.to_string()) {
99 Entry::Occupied(entry) => Ok(entry.into_mut()),
100 Entry::Vacant(entry) => {
101 let filename = self.output_dir.join(format!("{}.sql", table_name));
102 let writer = TableWriter::new(&filename)?;
103 Ok(entry.insert(writer))
104 }
105 }
106 }
107
108 pub fn write_statement(&mut self, table_name: &str, stmt: &[u8]) -> std::io::Result<()> {
110 let writer = self.get_writer(table_name)?;
111 writer.write_statement(stmt)
112 }
113
114 pub fn write_statement_with_suffix(
116 &mut self,
117 table_name: &str,
118 stmt: &[u8],
119 suffix: &[u8],
120 ) -> std::io::Result<()> {
121 let writer = self.get_writer(table_name)?;
122 writer.write_statement_with_suffix(stmt, suffix)
123 }
124
125 pub fn close_all(&mut self) -> std::io::Result<()> {
127 for (_, writer) in self.writers.iter_mut() {
128 writer.flush()?;
129 }
130 Ok(())
131 }
132}