use std::fs::{self, File};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use crate::error::{ProcessingError, ProcessingResult};
#[derive(Debug)]
pub struct DiskOverflow {
temp_dir: PathBuf,
current_writer: Option<BufWriter<File>>,
current_file: u32,
kmer_count: u64,
records_per_file: usize,
}
impl DiskOverflow {
pub fn new<P: AsRef<Path>>(temp_dir: P) -> ProcessingResult<Self> {
let temp_dir = temp_dir.as_ref().to_path_buf();
fs::create_dir_all(&temp_dir)?;
Ok(Self {
temp_dir,
current_writer: None,
current_file: 0,
kmer_count: 0,
records_per_file: 1_000_000, })
}
pub fn store(&mut self, kmer_encoded: u64, count: u32) -> ProcessingResult<()> {
if self.current_writer.is_none() {
self.new_file()?;
}
if let Some(ref mut writer) = self.current_writer {
writer.write_all(&kmer_encoded.to_le_bytes())?;
writer.write_all(&count.to_le_bytes())?;
self.kmer_count += 1;
if self.kmer_count.is_multiple_of(self.records_per_file as u64) {
self.close_current_file()?;
self.current_file += 1;
}
}
Ok(())
}
fn new_file(&mut self) -> ProcessingResult<()> {
let file_path = self
.temp_dir
.join(format!("overflow_{:04}.dat", self.current_file));
let file = File::create(&file_path)?;
self.current_writer = Some(BufWriter::new(file));
Ok(())
}
fn close_current_file(&mut self) -> ProcessingResult<()> {
if let Some(mut writer) = self.current_writer.take() {
writer.flush()?;
writer.get_mut().sync_all()?;
}
Ok(())
}
pub fn close(&mut self) -> ProcessingResult<()> {
self.close_current_file()?;
Ok(())
}
pub fn kmer_count(&self) -> u64 {
self.kmer_count
}
pub fn size_bytes(&self) -> u64 {
self.kmer_count * 12
}
pub fn read_all(&self) -> impl Iterator<Item = ProcessingResult<(u64, u32)>> {
let files = self.list_overflow_files();
OverflowFileReader::new(files)
}
fn list_overflow_files(&self) -> Vec<PathBuf> {
let mut files = Vec::new();
if let Ok(entries) = fs::read_dir(&self.temp_dir) {
for entry in entries.flatten() {
let path = entry.path();
if let Some(name) = path.file_name() {
if let Some(name_str) = name.to_str() {
if name_str.starts_with("overflow_") && name_str.ends_with(".dat") {
files.push(path);
}
}
}
}
}
files.sort(); files
}
}
impl Drop for DiskOverflow {
fn drop(&mut self) {
let _ = self.close();
}
}
struct OverflowFileReader {
files: Vec<PathBuf>,
current_file: usize,
current_reader: Option<std::io::BufReader<File>>,
pos: u64,
}
impl OverflowFileReader {
fn new(files: Vec<PathBuf>) -> Self {
Self {
files,
current_file: 0,
current_reader: None,
pos: 0,
}
}
fn advance_to_next_file(&mut self) -> ProcessingResult<bool> {
self.current_reader = None;
if self.current_file < self.files.len() {
let file_path = &self.files[self.current_file];
let file = File::open(file_path)?;
self.current_reader = Some(std::io::BufReader::new(file));
self.current_file += 1;
Ok(true)
} else {
Ok(false)
}
}
}
impl Iterator for OverflowFileReader {
type Item = ProcessingResult<(u64, u32)>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.current_reader.is_none() && !self.files.is_empty() {
if let Err(e) = self.advance_to_next_file() {
return Some(Err(ProcessingError::with_context(
"Failed to open overflow file",
e,
)));
}
}
self.current_reader.as_ref()?;
if let Some(ref mut reader) = self.current_reader {
let mut buffer = [0u8; 12];
match reader.read_exact(&mut buffer) {
Ok(_) => {
let kmer = u64::from_le_bytes([
buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5],
buffer[6], buffer[7],
]);
let count =
u32::from_le_bytes([buffer[8], buffer[9], buffer[10], buffer[11]]);
self.pos += 1;
return Some(Ok((kmer, count)));
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
match self.advance_to_next_file() {
Ok(true) => continue, Ok(false) => return None, Err(e) => {
return Some(Err(ProcessingError::with_context(
"Error advancing files",
e,
)))
}
}
}
Err(e) => {
return Some(Err(ProcessingError::with_context(
"Error reading overflow file",
e,
)));
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_disk_overflow_basic() {
let temp_dir = TempDir::new().unwrap();
let mut overflow = DiskOverflow::new(temp_dir.path()).unwrap();
overflow.store(0x12345678, 10).unwrap();
overflow.store(0x87654321, 20).unwrap();
assert_eq!(overflow.kmer_count(), 2);
overflow.close().unwrap();
}
#[test]
fn test_disk_overflow_read_back() {
let temp_dir = TempDir::new().unwrap();
let mut overflow = DiskOverflow::new(temp_dir.path()).unwrap();
overflow.store(0x12345678, 10).unwrap();
overflow.store(0x87654321, 20).unwrap();
overflow.close().unwrap();
let overflow_read = DiskOverflow::new(temp_dir.path()).unwrap();
let pairs: Vec<_> = overflow_read.read_all().collect();
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0].as_ref().unwrap(), &(0x12345678, 10));
assert_eq!(pairs[1].as_ref().unwrap(), &(0x87654321, 20));
}
#[test]
fn test_overflow_size() {
let temp_dir = TempDir::new().unwrap();
let mut overflow = DiskOverflow::new(temp_dir.path()).unwrap();
assert_eq!(overflow.size_bytes(), 0);
overflow.store(0x12345678, 10).unwrap();
assert_eq!(overflow.size_bytes(), 12);
overflow.store(0x87654321, 20).unwrap();
assert_eq!(overflow.size_bytes(), 24);
}
}