use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use super::{CompressionType, FastqWriter, OwnedRecord};
use crate::cli::{SplitConfig, SplitMode};
pub struct SplitWriter {
base_path: PathBuf,
config: SplitConfig,
compression: CompressionType,
current_file_idx: usize,
current_writer: Option<FastqWriter>,
records_in_current_file: usize,
total_records_written: usize,
records_per_file: Option<usize>,
}
impl SplitWriter {
pub fn new(
base_path: impl AsRef<Path>,
config: SplitConfig,
total_records: Option<usize>,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
let compression = CompressionType::from_path(&base_path);
let records_per_file = match config.mode {
SplitMode::ByFile(n_files) => {
let total = total_records.ok_or_else(|| {
anyhow::anyhow!("total_records required for ByFile split mode")
})?;
Some((total + n_files - 1) / n_files) }
SplitMode::ByLines(_) => None,
};
Ok(Self {
base_path,
config,
compression,
current_file_idx: 0,
current_writer: None,
records_in_current_file: 0,
total_records_written: 0,
records_per_file,
})
}
pub fn write_record(&mut self, record: &OwnedRecord) -> Result<()> {
if self.should_switch_file() {
self.switch_to_next_file()?;
}
if self.current_writer.is_none() {
self.switch_to_next_file()?;
}
if let Some(writer) = &mut self.current_writer {
writer
.write_record(record)
.context("Failed to write record")?;
}
self.records_in_current_file += 1;
self.total_records_written += 1;
Ok(())
}
pub fn write_batch(&mut self, records: &[OwnedRecord]) -> Result<()> {
for record in records {
self.write_record(record)?;
}
Ok(())
}
fn should_switch_file(&self) -> bool {
match self.config.mode {
SplitMode::ByFile(_) => {
if let Some(per_file) = self.records_per_file {
self.records_in_current_file >= per_file
} else {
false
}
}
SplitMode::ByLines(lines) => {
let records_per_file = lines / 4;
self.records_in_current_file >= records_per_file
}
}
}
fn switch_to_next_file(&mut self) -> Result<()> {
if let Some(mut writer) = self.current_writer.take() {
writer.flush()?;
}
let file_path = self.generate_file_path(self.current_file_idx);
let writer = FastqWriter::new(&file_path, self.compression)
.with_context(|| format!("Failed to create split file: {}", file_path.display()))?;
self.current_writer = Some(writer);
self.current_file_idx += 1;
self.records_in_current_file = 0;
Ok(())
}
fn generate_file_path(&self, idx: usize) -> PathBuf {
let suffix = format!("{:0width$}", idx + 1, width = self.config.prefix_digits);
let parent = self.base_path.parent().unwrap_or(Path::new(""));
let stem = self
.base_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("output");
let extension = match self.compression {
CompressionType::Gzip | CompressionType::ParallelGzip => "fastq.gz",
CompressionType::None => "fastq",
};
parent.join(format!("{}_{}.{}", stem, suffix, extension))
}
pub fn finish(&mut self) -> Result<()> {
if let Some(mut writer) = self.current_writer.take() {
writer.flush()?;
}
Ok(())
}
pub fn total_records_written(&self) -> usize {
self.total_records_written
}
pub fn num_files_created(&self) -> usize {
self.current_file_idx
}
pub fn get_output_files(&self) -> Vec<PathBuf> {
(0..self.current_file_idx)
.map(|idx| self.generate_file_path(idx))
.collect()
}
}
impl Drop for SplitWriter {
fn drop(&mut self) {
let _ = self.finish();
}
}
pub struct PairedSplitWriter {
base_path: PathBuf,
config: SplitConfig,
compression: CompressionType,
current_file_idx: usize,
current_r1_writer: Option<FastqWriter>,
current_r2_writer: Option<FastqWriter>,
records_in_current_file: usize,
total_records_written: usize,
records_per_file: Option<usize>,
}
impl PairedSplitWriter {
pub fn new(
base_path: impl AsRef<Path>,
config: SplitConfig,
total_records: Option<usize>,
) -> Result<Self> {
let base_path = base_path.as_ref().to_path_buf();
let compression = CompressionType::from_path(&base_path);
let records_per_file = match config.mode {
SplitMode::ByFile(n_files) => {
let total = total_records.ok_or_else(|| {
anyhow::anyhow!("total_records required for ByFile split mode")
})?;
Some((total + n_files - 1) / n_files)
}
SplitMode::ByLines(_) => None,
};
Ok(Self {
base_path,
config,
compression,
current_file_idx: 0,
current_r1_writer: None,
current_r2_writer: None,
records_in_current_file: 0,
total_records_written: 0,
records_per_file,
})
}
pub fn write_pair(&mut self, r1: &OwnedRecord, r2: &OwnedRecord) -> Result<()> {
if self.should_switch_file() {
self.switch_to_next_file()?;
}
if self.current_r1_writer.is_none() || self.current_r2_writer.is_none() {
self.switch_to_next_file()?;
}
if let (Some(r1_writer), Some(r2_writer)) =
(&mut self.current_r1_writer, &mut self.current_r2_writer)
{
r1_writer
.write_record(r1)
.context("Failed to write R1 record")?;
r2_writer
.write_record(r2)
.context("Failed to write R2 record")?;
}
self.records_in_current_file += 1;
self.total_records_written += 1;
Ok(())
}
pub fn write_batch(&mut self, pairs: &[(OwnedRecord, OwnedRecord)]) -> Result<()> {
for (r1, r2) in pairs {
self.write_pair(r1, r2)?;
}
Ok(())
}
fn should_switch_file(&self) -> bool {
match self.config.mode {
SplitMode::ByFile(_) => {
if let Some(per_file) = self.records_per_file {
self.records_in_current_file >= per_file
} else {
false
}
}
SplitMode::ByLines(lines) => {
let records_per_file = lines / 4;
self.records_in_current_file >= records_per_file
}
}
}
fn switch_to_next_file(&mut self) -> Result<()> {
if let Some(mut writer) = self.current_r1_writer.take() {
writer.flush()?;
}
if let Some(mut writer) = self.current_r2_writer.take() {
writer.flush()?;
}
let (r1_path, r2_path) = self.generate_file_paths(self.current_file_idx);
let r1_writer = FastqWriter::new(&r1_path, self.compression)
.with_context(|| format!("Failed to create split file: {}", r1_path.display()))?;
let r2_writer = FastqWriter::new(&r2_path, self.compression)
.with_context(|| format!("Failed to create split file: {}", r2_path.display()))?;
self.current_r1_writer = Some(r1_writer);
self.current_r2_writer = Some(r2_writer);
self.current_file_idx += 1;
self.records_in_current_file = 0;
Ok(())
}
fn generate_file_paths(&self, idx: usize) -> (PathBuf, PathBuf) {
let suffix = format!("{:0width$}", idx + 1, width = self.config.prefix_digits);
let parent = self.base_path.parent().unwrap_or(Path::new(""));
let stem = self
.base_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("output");
let extension = match self.compression {
CompressionType::Gzip | CompressionType::ParallelGzip => "fastq.gz",
CompressionType::None => "fastq",
};
let r1_path = parent.join(format!("{}_{}.R1.{}", stem, suffix, extension));
let r2_path = parent.join(format!("{}_{}.R2.{}", stem, suffix, extension));
(r1_path, r2_path)
}
pub fn finish(&mut self) -> Result<()> {
if let Some(mut writer) = self.current_r1_writer.take() {
writer.flush()?;
}
if let Some(mut writer) = self.current_r2_writer.take() {
writer.flush()?;
}
Ok(())
}
pub fn total_records_written(&self) -> usize {
self.total_records_written
}
pub fn num_files_created(&self) -> usize {
self.current_file_idx
}
pub fn get_output_files(&self) -> Vec<PathBuf> {
let mut files = Vec::new();
for idx in 0..self.current_file_idx {
let (r1, r2) = self.generate_file_paths(idx);
files.push(r1);
files.push(r2);
}
files
}
}
impl Drop for PairedSplitWriter {
fn drop(&mut self) {
let _ = self.finish();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::SplitMode;
use tempfile::tempdir;
fn make_record(name: &str, seq: &str) -> OwnedRecord {
let qual = vec![b'I'; seq.len()];
OwnedRecord::new(name.as_bytes().to_vec(), seq.as_bytes().to_vec(), qual)
}
#[test]
fn test_split_writer_by_file() {
let dir = tempdir().unwrap();
let base_path = dir.path().join("output");
let config = SplitConfig {
mode: SplitMode::ByFile(2),
prefix_digits: 4,
};
let mut writer = SplitWriter::new(&base_path, config, Some(10)).unwrap();
for i in 0..10 {
let record = make_record(&format!("read{}", i), "ACGTACGTACGT");
writer.write_record(&record).unwrap();
}
writer.finish().unwrap();
assert_eq!(writer.num_files_created(), 2);
assert_eq!(writer.total_records_written(), 10);
assert!(dir.path().join("output_0001.fastq").exists());
assert!(dir.path().join("output_0002.fastq").exists());
}
#[test]
fn test_split_writer_by_lines() {
let dir = tempdir().unwrap();
let base_path = dir.path().join("output");
let config = SplitConfig {
mode: SplitMode::ByLines(8), prefix_digits: 3,
};
let mut writer = SplitWriter::new(&base_path, config, None).unwrap();
for i in 0..6 {
let record = make_record(&format!("read{}", i), "ACGTACGTACGT");
writer.write_record(&record).unwrap();
}
writer.finish().unwrap();
assert_eq!(writer.num_files_created(), 3);
assert_eq!(writer.total_records_written(), 6);
assert!(dir.path().join("output_001.fastq").exists());
assert!(dir.path().join("output_002.fastq").exists());
assert!(dir.path().join("output_003.fastq").exists());
}
#[test]
fn test_paired_split_writer() {
let dir = tempdir().unwrap();
let base_path = dir.path().join("output");
let config = SplitConfig {
mode: SplitMode::ByFile(2),
prefix_digits: 4,
};
let mut writer = PairedSplitWriter::new(&base_path, config, Some(6)).unwrap();
for i in 0..6 {
let r1 = make_record(&format!("read{}/1", i), "ACGTACGTACGT");
let r2 = make_record(&format!("read{}/2", i), "TGCATGCATGCA");
writer.write_pair(&r1, &r2).unwrap();
}
writer.finish().unwrap();
assert_eq!(writer.num_files_created(), 2);
assert_eq!(writer.total_records_written(), 6);
assert!(dir.path().join("output_0001.R1.fastq").exists());
assert!(dir.path().join("output_0001.R2.fastq").exists());
assert!(dir.path().join("output_0002.R1.fastq").exists());
assert!(dir.path().join("output_0002.R2.fastq").exists());
}
#[test]
fn test_write_batch() {
let dir = tempdir().unwrap();
let base_path = dir.path().join("output");
let config = SplitConfig {
mode: SplitMode::ByLines(8),
prefix_digits: 4,
};
let mut writer = SplitWriter::new(&base_path, config, None).unwrap();
let records: Vec<_> = (0..4)
.map(|i| make_record(&format!("read{}", i), "ACGTACGT"))
.collect();
writer.write_batch(&records).unwrap();
writer.finish().unwrap();
assert_eq!(writer.total_records_written(), 4);
}
}