use super::parallel_gzip_writer::{ParallelGzipConfig, ParallelGzipWriter};
use anyhow::{Context, Result};
use flate2::Compression;
use flate2::write::GzEncoder;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::Path;
pub struct FastqWriter {
inner: FastqWriterInner,
}
enum FastqWriterInner {
SingleThreaded(GzEncoder<BufWriter<File>>),
MultiThreaded(ParallelGzipWriter),
}
impl FastqWriter {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
let file =
File::create(path).with_context(|| format!("Failed to create {}", path.display()))?;
let buf = BufWriter::new(file);
let gz = GzEncoder::new(buf, Compression::default());
Ok(Self { inner: FastqWriterInner::SingleThreaded(gz) })
}
pub fn with_threads<P: AsRef<Path>>(path: P, threads: usize) -> Result<Self> {
let path = path.as_ref();
let file =
File::create(path).with_context(|| format!("Failed to create {}", path.display()))?;
if threads <= 1 {
let buf = BufWriter::new(file);
let gz = GzEncoder::new(buf, Compression::default());
Ok(Self { inner: FastqWriterInner::SingleThreaded(gz) })
} else {
let config = ParallelGzipConfig::with_threads(threads);
let writer = ParallelGzipWriter::new(file, &config)
.with_context(|| "Failed to create parallel gzip writer")?;
Ok(Self { inner: FastqWriterInner::MultiThreaded(writer) })
}
}
pub fn write_record(&mut self, name: &str, seq: &[u8], qual: &[u8]) -> Result<()> {
match &mut self.inner {
FastqWriterInner::SingleThreaded(writer) => write_record_to(writer, name, seq, qual),
FastqWriterInner::MultiThreaded(writer) => write_record_to(writer, name, seq, qual),
}
}
pub fn finish(self) -> Result<()> {
match self.inner {
FastqWriterInner::SingleThreaded(writer) => {
writer.finish().context("Failed to finish gzip stream")?;
}
FastqWriterInner::MultiThreaded(writer) => {
writer.finish().context("Failed to finish parallel gzip stream")?;
}
}
Ok(())
}
}
fn write_record_to<W: Write>(writer: &mut W, name: &str, seq: &[u8], qual: &[u8]) -> Result<()> {
writeln!(writer, "@{name}")?;
writer.write_all(seq)?;
writeln!(writer)?;
writeln!(writer, "+")?;
for &q in qual {
let ascii_q = q.saturating_add(33).min(126);
writer.write_all(&[ascii_q])?;
}
writeln!(writer)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Read;
use tempfile::NamedTempFile;
#[test]
fn test_write_single_record() -> Result<()> {
let temp = NamedTempFile::new()?;
let path = temp.path();
{
let mut writer = FastqWriter::new(path)?;
writer.write_record("read1", b"ACGT", &[30, 30, 30, 30])?;
writer.finish()?;
}
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut content = String::new();
decoder.read_to_string(&mut content)?;
assert!(content.contains("@read1"));
assert!(content.contains("ACGT"));
assert!(content.contains("????"));
Ok(())
}
#[test]
fn test_write_multiple_records() -> Result<()> {
let temp = NamedTempFile::new()?;
let path = temp.path();
{
let mut writer = FastqWriter::new(path)?;
writer.write_record("read1", b"AAAA", &[10, 20, 30, 40])?;
writer.write_record("read2", b"CCCC", &[40, 30, 20, 10])?;
writer.finish()?;
}
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut content = String::new();
decoder.read_to_string(&mut content)?;
assert!(content.contains("@read1"));
assert!(content.contains("@read2"));
assert!(content.contains("AAAA"));
assert!(content.contains("CCCC"));
Ok(())
}
#[test]
fn test_quality_encoding() -> Result<()> {
let temp = NamedTempFile::new()?;
let path = temp.path();
{
let mut writer = FastqWriter::new(path)?;
writer.write_record("test", b"ACGT", &[0, 10, 30, 41])?;
writer.finish()?;
}
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut content = String::new();
decoder.read_to_string(&mut content)?;
let lines: Vec<&str> = content.lines().collect();
let qual_line = lines[3];
assert_eq!(qual_line, "!+?J");
Ok(())
}
#[test]
fn test_multi_threaded_writer() -> Result<()> {
let temp = NamedTempFile::new()?;
let path = temp.path();
{
let mut writer = FastqWriter::with_threads(path, 4)?;
writer.write_record("read1", b"ACGT", &[30, 30, 30, 30])?;
writer.write_record("read2", b"TGCA", &[35, 35, 35, 35])?;
writer.finish()?;
}
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut content = String::new();
decoder.read_to_string(&mut content)?;
assert!(content.contains("@read1"));
assert!(content.contains("@read2"));
assert!(content.contains("ACGT"));
assert!(content.contains("TGCA"));
Ok(())
}
#[test]
fn test_multi_threaded_large_output() -> Result<()> {
let temp = NamedTempFile::new()?;
let path = temp.path();
{
let mut writer = FastqWriter::with_threads(path, 4)?;
for i in 0..10000 {
let name = format!("read{i:05}");
let seq = b"ACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT";
let qual = vec![30u8; seq.len()];
writer.write_record(&name, seq, &qual)?;
}
writer.finish()?;
}
let file = File::open(path)?;
let mut decoder = flate2::read::MultiGzDecoder::new(file);
let mut content = String::new();
decoder.read_to_string(&mut content)?;
assert!(content.contains("@read00000"));
assert!(content.contains("@read09999"));
Ok(())
}
}