use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use flate2::Compression;
use flate2::write::GzEncoder;
use crate::rat_enum::stream::records::encode_record;
pub const DEFAULT_BUFFER_RECORDS: usize = 1 << 20;
pub struct RunWriter {
buffer: Vec<Vec<u8>>,
threshold: usize,
out_dir: PathBuf,
thread_id: usize,
run_counter: usize,
total_records_emitted: u64,
}
impl RunWriter {
pub fn new(out_dir: impl Into<PathBuf>, thread_id: usize) -> Self {
Self::with_threshold(out_dir, thread_id, DEFAULT_BUFFER_RECORDS)
}
pub fn with_threshold(out_dir: impl Into<PathBuf>, thread_id: usize, threshold: usize) -> Self {
RunWriter {
buffer: Vec::with_capacity(threshold.min(1 << 14)),
threshold,
out_dir: out_dir.into(),
thread_id,
run_counter: 0,
total_records_emitted: 0,
}
}
pub fn record(&mut self, canonical: &[i8]) {
let mut rec = Vec::with_capacity(1 + canonical.len());
encode_record(canonical, &mut rec);
self.buffer.push(rec);
if self.buffer.len() >= self.threshold {
self.flush().expect("RunWriter flush");
}
}
pub fn flush(&mut self) -> std::io::Result<Option<PathBuf>> {
if self.buffer.is_empty() {
return Ok(None);
}
std::fs::create_dir_all(&self.out_dir)?;
self.buffer.sort();
self.buffer.dedup();
let path = self.out_dir.join(format!(
"run_t{:02}_r{:06}.bin",
self.thread_id, self.run_counter
));
let mut writer = GzEncoder::new(BufWriter::new(File::create(&path)?), Compression::fast());
for rec in &self.buffer {
writer.write_all(rec)?;
}
writer.finish()?.flush()?;
self.total_records_emitted += self.buffer.len() as u64;
self.run_counter += 1;
self.buffer.clear();
Ok(Some(path))
}
pub fn total_records_emitted(&self) -> u64 {
self.total_records_emitted
}
pub fn run_count(&self) -> usize {
self.run_counter
}
}
impl Drop for RunWriter {
fn drop(&mut self) {
if let Err(e) = self.flush() {
debug_assert!(false, "RunWriter::drop flush failed: {e}");
let _ = e;
}
}
}
pub fn list_run_files(runs_dir: &Path) -> std::io::Result<Vec<PathBuf>> {
let mut out: Vec<PathBuf> = std::fs::read_dir(runs_dir)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|s| s.starts_with("run_t") && s.ends_with(".bin"))
.unwrap_or(false)
})
.collect();
out.sort();
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rat_enum::stream::records::decode_record;
use std::io::Read;
#[test]
fn writes_sorted_run_with_local_dedup() {
let dir = tempdir();
let mut w = RunWriter::with_threshold(&dir, 0, 1000);
w.record(&[1, 2, 3]);
w.record(&[1, 2, 3]); w.record(&[1]);
w.record(&[]);
w.record(&[-1, 0]);
w.record(&[1, 1]);
w.flush().expect("flush");
let files = list_run_files(&dir).unwrap();
assert_eq!(files.len(), 1);
let mut buf = Vec::new();
flate2::read::GzDecoder::new(File::open(&files[0]).unwrap())
.read_to_end(&mut buf)
.unwrap();
let mut decoded: Vec<Vec<i8>> = Vec::new();
let mut rest: &[u8] = &buf;
while let Some((tail, rec)) = decode_record(rest) {
decoded.push(rec);
rest = tail;
}
assert!(rest.is_empty(), "trailing bytes in run file");
assert_eq!(
decoded,
vec![vec![], vec![1], vec![-1, 0], vec![1, 1], vec![1, 2, 3],]
);
}
#[test]
fn auto_flushes_when_buffer_full() {
let dir = tempdir();
let mut w = RunWriter::with_threshold(&dir, 7, 3);
w.record(&[1]);
w.record(&[2]);
w.record(&[3]); w.record(&[4]);
drop(w);
let files = list_run_files(&dir).unwrap();
assert_eq!(files.len(), 2, "expected 2 run files (auto-flush + drop)");
assert!(
files[0]
.file_name()
.unwrap()
.to_str()
.unwrap()
.contains("t07")
);
}
fn tempdir() -> PathBuf {
use std::sync::atomic::{AtomicUsize, Ordering};
static C: AtomicUsize = AtomicUsize::new(0);
let n = C.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let path = std::env::temp_dir().join(format!("rat_enum_runs_test_{pid}_{n}"));
std::fs::create_dir_all(&path).unwrap();
path
}
}