Skip to main content

sql_splitter/writer/
mod.rs

1use ahash::AHashMap;
2use std::fs::{self, File};
3use std::io::{BufWriter, Write};
4use std::path::{Path, PathBuf};
5
6pub const WRITER_BUFFER_SIZE: usize = 256 * 1024;
7pub const STMT_BUFFER_COUNT: usize = 100;
8
9pub struct TableWriter {
10    writer: BufWriter<File>,
11    write_count: usize,
12    max_stmt_buffer: usize,
13}
14
15impl TableWriter {
16    pub fn new(filename: &Path) -> std::io::Result<Self> {
17        let file = File::create(filename)?;
18        let writer = BufWriter::with_capacity(WRITER_BUFFER_SIZE, file);
19
20        Ok(Self {
21            writer,
22            write_count: 0,
23            max_stmt_buffer: STMT_BUFFER_COUNT,
24        })
25    }
26
27    pub fn write_statement(&mut self, stmt: &[u8]) -> std::io::Result<()> {
28        self.writer.write_all(stmt)?;
29        self.writer.write_all(b"\n")?;
30
31        self.write_count += 1;
32        if self.write_count >= self.max_stmt_buffer {
33            self.write_count = 0;
34            self.writer.flush()?;
35        }
36
37        Ok(())
38    }
39
40    pub fn write_statement_with_suffix(
41        &mut self,
42        stmt: &[u8],
43        suffix: &[u8],
44    ) -> std::io::Result<()> {
45        self.writer.write_all(stmt)?;
46        self.writer.write_all(suffix)?;
47        self.writer.write_all(b"\n")?;
48
49        self.write_count += 1;
50        if self.write_count >= self.max_stmt_buffer {
51            self.write_count = 0;
52            self.writer.flush()?;
53        }
54
55        Ok(())
56    }
57
58    pub fn flush(&mut self) -> std::io::Result<()> {
59        self.write_count = 0;
60        self.writer.flush()
61    }
62}
63
64pub struct WriterPool {
65    output_dir: PathBuf,
66    writers: AHashMap<String, TableWriter>,
67}
68
69impl WriterPool {
70    pub fn new(output_dir: PathBuf) -> Self {
71        Self {
72            output_dir,
73            writers: AHashMap::new(),
74        }
75    }
76
77    pub fn ensure_output_dir(&self) -> std::io::Result<()> {
78        fs::create_dir_all(&self.output_dir)
79    }
80
81    pub fn get_writer(&mut self, table_name: &str) -> std::io::Result<&mut TableWriter> {
82        use std::collections::hash_map::Entry;
83
84        // Use entry API to avoid separate contains_key + get_mut (eliminates unwrap)
85        match self.writers.entry(table_name.to_string()) {
86            Entry::Occupied(entry) => Ok(entry.into_mut()),
87            Entry::Vacant(entry) => {
88                let filename = self.output_dir.join(format!("{}.sql", table_name));
89                let writer = TableWriter::new(&filename)?;
90                Ok(entry.insert(writer))
91            }
92        }
93    }
94
95    pub fn write_statement(&mut self, table_name: &str, stmt: &[u8]) -> std::io::Result<()> {
96        let writer = self.get_writer(table_name)?;
97        writer.write_statement(stmt)
98    }
99
100    pub fn write_statement_with_suffix(
101        &mut self,
102        table_name: &str,
103        stmt: &[u8],
104        suffix: &[u8],
105    ) -> std::io::Result<()> {
106        let writer = self.get_writer(table_name)?;
107        writer.write_statement_with_suffix(stmt, suffix)
108    }
109
110    pub fn close_all(&mut self) -> std::io::Result<()> {
111        for (_, writer) in self.writers.iter_mut() {
112            writer.flush()?;
113        }
114        Ok(())
115    }
116}