use std::fs::File;
use std::io::{BufRead, BufReader, Read};
use std::path::Path;
use anyhow::{Context, Result};
use flate2::read::MultiGzDecoder;
use super::pool::ReadPool;
use super::OwnedRecord;
const BUFFER_SIZE: usize = 1024 * 1024;
const FILE_BUFFER_SIZE: usize = 256 * 1024;
fn is_gzipped(path: &Path) -> bool {
let path_str = path.to_string_lossy().to_lowercase();
path_str.ends_with(".gz") || path_str.ends_with(".gzip")
}
#[inline]
fn convert_mgi_id(name: &[u8]) -> Vec<u8> {
if name.is_empty() {
return name.to_vec();
}
let name_str = std::str::from_utf8(name).unwrap_or("");
if !name_str.contains('L') || !name_str.contains('C') || !name_str.contains('R') {
return name.to_vec();
}
let mut result = Vec::with_capacity(name.len() + 10);
if let Some(l_pos) = name_str.find('L') {
if let Some(c_pos) = name_str[l_pos..].find('C').map(|p| p + l_pos) {
if let Some(r_pos) = name_str[c_pos..].find('R').map(|p| p + c_pos) {
result.extend_from_slice(&name[..l_pos]);
result.push(b':');
result.extend_from_slice(&name[l_pos..c_pos]);
result.push(b':');
result.extend_from_slice(&name[c_pos..r_pos]);
result.push(b':');
if let Some(slash_pos) = name_str[r_pos..].find('/').map(|p| p + r_pos) {
result.extend_from_slice(&name[r_pos..slash_pos]);
result.push(b' ');
result.extend_from_slice(&name[slash_pos + 1..]);
} else {
result.extend_from_slice(&name[r_pos..]);
}
return result;
}
}
}
name.to_vec()
}
pub struct DirectFastqReader {
reader: BufReader<Box<dyn Read + Send>>,
finished: bool,
line_buf: Vec<u8>,
fix_mgi_id: bool,
}
impl DirectFastqReader {
pub fn new(path: &Path) -> Result<Self> {
let file = File::open(path)
.with_context(|| format!("Failed to open file: {}", path.display()))?;
let reader: Box<dyn Read + Send> = if is_gzipped(path) {
Box::new(BufReader::with_capacity(
BUFFER_SIZE,
MultiGzDecoder::new(BufReader::with_capacity(FILE_BUFFER_SIZE, file)),
))
} else {
Box::new(BufReader::with_capacity(BUFFER_SIZE, file))
};
Ok(Self {
reader: BufReader::with_capacity(BUFFER_SIZE, reader),
finished: false,
line_buf: Vec::with_capacity(512),
fix_mgi_id: false,
})
}
pub fn with_mgi_id_conversion(mut self, convert: bool) -> Self {
self.fix_mgi_id = convert;
self
}
#[inline]
pub fn read_into(&mut self, record: &mut OwnedRecord) -> Result<bool> {
if self.finished {
return Ok(false);
}
self.line_buf.clear();
if self.reader.read_until(b'\n', &mut self.line_buf)? == 0 {
self.finished = true;
return Ok(false);
}
record.name.clear();
let name_start = if self.line_buf.first() == Some(&b'@') { 1 } else { 0 };
let name_end = self.line_buf.len().saturating_sub(1);
let name_end = if name_end > 0 && self.line_buf.get(name_end - 1) == Some(&b'\r') {
name_end - 1
} else {
name_end
};
if self.fix_mgi_id {
let converted = convert_mgi_id(&self.line_buf[name_start..name_end]);
record.name.extend_from_slice(&converted);
} else {
record.name.extend_from_slice(&self.line_buf[name_start..name_end]);
}
record.seq.clear();
if self.reader.read_until(b'\n', &mut record.seq)? == 0 {
self.finished = true;
return Ok(false);
}
if record.seq.last() == Some(&b'\n') {
record.seq.pop();
}
if record.seq.last() == Some(&b'\r') {
record.seq.pop();
}
self.line_buf.clear();
if self.reader.read_until(b'\n', &mut self.line_buf)? == 0 {
self.finished = true;
return Ok(false);
}
record.qual.clear();
if self.reader.read_until(b'\n', &mut record.qual)? == 0 {
self.finished = true;
return Ok(false);
}
if record.qual.last() == Some(&b'\n') {
record.qual.pop();
}
if record.qual.last() == Some(&b'\r') {
record.qual.pop();
}
Ok(true)
}
pub fn read_batch_pooled(
&mut self,
batch_size: usize,
pool: &mut ReadPool,
) -> Result<Vec<OwnedRecord>> {
if self.finished {
return Ok(Vec::new());
}
let mut records = Vec::with_capacity(batch_size);
while records.len() < batch_size {
let mut record = pool.acquire();
if !self.read_into(&mut record)? {
pool.release(record);
break;
}
records.push(record);
}
Ok(records)
}
pub fn read_batch(&mut self, batch_size: usize) -> Result<Vec<OwnedRecord>> {
if self.finished {
return Ok(Vec::new());
}
let mut records = Vec::with_capacity(batch_size);
while records.len() < batch_size {
let mut record = OwnedRecord::with_capacity(256);
if !self.read_into(&mut record)? {
break;
}
records.push(record);
}
Ok(records)
}
#[inline]
pub fn is_finished(&self) -> bool {
self.finished
}
}
pub struct DirectPairedFastqReader {
pub reader1: DirectFastqReader,
pub reader2: DirectFastqReader,
}
impl DirectPairedFastqReader {
pub fn new(path1: &Path, path2: &Path) -> Result<Self> {
let reader1 = DirectFastqReader::new(path1)
.with_context(|| format!("Failed to open R1 file: {}", path1.display()))?;
let reader2 = DirectFastqReader::new(path2)
.with_context(|| format!("Failed to open R2 file: {}", path2.display()))?;
Ok(Self { reader1, reader2 })
}
pub fn with_mgi_id_conversion(mut self, convert: bool) -> Self {
self.reader1.fix_mgi_id = convert;
self.reader2.fix_mgi_id = convert;
self
}
pub fn read_batch_pooled(
&mut self,
batch_size: usize,
pool1: &mut ReadPool,
pool2: &mut ReadPool,
) -> Result<Vec<(OwnedRecord, OwnedRecord)>> {
let mut pairs = Vec::with_capacity(batch_size);
loop {
if pairs.len() >= batch_size {
break;
}
let mut r1 = pool1.acquire();
let mut r2 = pool2.acquire();
let has_r1 = self.reader1.read_into(&mut r1)?;
let has_r2 = self.reader2.read_into(&mut r2)?;
match (has_r1, has_r2) {
(true, true) => {
pairs.push((r1, r2));
}
(false, false) => {
pool1.release(r1);
pool2.release(r2);
break;
}
(true, false) => {
pool1.release(r1);
pool2.release(r2);
return Err(anyhow::anyhow!("R1 file has more records than R2 file"));
}
(false, true) => {
pool1.release(r1);
pool2.release(r2);
return Err(anyhow::anyhow!("R2 file has more records than R1 file"));
}
}
}
Ok(pairs)
}
pub fn read_batch(&mut self, batch_size: usize) -> Result<Vec<(OwnedRecord, OwnedRecord)>> {
let mut pairs = Vec::with_capacity(batch_size);
loop {
if pairs.len() >= batch_size {
break;
}
let mut r1 = OwnedRecord::with_capacity(256);
let mut r2 = OwnedRecord::with_capacity(256);
let has_r1 = self.reader1.read_into(&mut r1)?;
let has_r2 = self.reader2.read_into(&mut r2)?;
match (has_r1, has_r2) {
(true, true) => {
pairs.push((r1, r2));
}
(false, false) => {
break;
}
(true, false) => {
return Err(anyhow::anyhow!("R1 file has more records than R2 file"));
}
(false, true) => {
return Err(anyhow::anyhow!("R2 file has more records than R1 file"));
}
}
}
Ok(pairs)
}
#[inline]
pub fn is_finished(&self) -> bool {
self.reader1.is_finished() && self.reader2.is_finished()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_temp_fastq(contents: &[u8]) -> NamedTempFile {
let mut file = NamedTempFile::with_suffix(".fastq").unwrap();
file.write_all(contents).unwrap();
file.flush().unwrap();
file
}
fn create_temp_fastq_gz(contents: &[u8]) -> NamedTempFile {
use flate2::write::GzEncoder;
use flate2::Compression;
let mut file = NamedTempFile::with_suffix(".fastq.gz").unwrap();
{
let mut encoder = GzEncoder::new(&mut file, Compression::default());
encoder.write_all(contents).unwrap();
encoder.finish().unwrap();
}
file.flush().unwrap();
file
}
const SAMPLE_FASTQ: &[u8] = b"@read1
ACGTACGT
+
IIIIIIII
@read2
TGCATGCA
+
HHHHHHHH
";
const SAMPLE_FASTQ_CRLF: &[u8] = b"@read1\r\nACGTACGT\r\n+\r\nIIIIIIII\r\n@read2\r\nTGCATGCA\r\n+\r\nHHHHHHHH\r\n";
#[test]
fn test_direct_reader_basic() {
let file = create_temp_fastq(SAMPLE_FASTQ);
let mut reader = DirectFastqReader::new(file.path()).unwrap();
let mut record = OwnedRecord::with_capacity(256);
assert!(reader.read_into(&mut record).unwrap());
assert_eq!(record.name, b"read1");
assert_eq!(record.seq, b"ACGTACGT");
assert_eq!(record.qual, b"IIIIIIII");
assert!(reader.read_into(&mut record).unwrap());
assert_eq!(record.name, b"read2");
assert_eq!(record.seq, b"TGCATGCA");
assert_eq!(record.qual, b"HHHHHHHH");
assert!(!reader.read_into(&mut record).unwrap());
assert!(reader.is_finished());
}
#[test]
fn test_direct_reader_crlf() {
let file = create_temp_fastq(SAMPLE_FASTQ_CRLF);
let mut reader = DirectFastqReader::new(file.path()).unwrap();
let mut record = OwnedRecord::with_capacity(256);
assert!(reader.read_into(&mut record).unwrap());
assert_eq!(record.name, b"read1");
assert_eq!(record.seq, b"ACGTACGT");
assert_eq!(record.qual, b"IIIIIIII");
assert!(reader.read_into(&mut record).unwrap());
assert_eq!(record.name, b"read2");
assert_eq!(record.seq, b"TGCATGCA");
assert_eq!(record.qual, b"HHHHHHHH");
}
#[test]
fn test_direct_reader_gzipped() {
let file = create_temp_fastq_gz(SAMPLE_FASTQ);
let mut reader = DirectFastqReader::new(file.path()).unwrap();
let mut record = OwnedRecord::with_capacity(256);
assert!(reader.read_into(&mut record).unwrap());
assert_eq!(record.name, b"read1");
assert_eq!(record.seq, b"ACGTACGT");
assert_eq!(record.qual, b"IIIIIIII");
assert!(reader.read_into(&mut record).unwrap());
assert_eq!(record.name, b"read2");
}
#[test]
fn test_direct_reader_pooled() {
let file = create_temp_fastq(SAMPLE_FASTQ);
let mut reader = DirectFastqReader::new(file.path()).unwrap();
let mut pool = ReadPool::new(256);
let records = reader.read_batch_pooled(10, &mut pool).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].name, b"read1");
assert_eq!(records[0].seq, b"ACGTACGT");
assert_eq!(records[1].name, b"read2");
assert_eq!(records[1].seq, b"TGCATGCA");
let more = reader.read_batch_pooled(10, &mut pool).unwrap();
assert!(more.is_empty());
}
#[test]
fn test_direct_reader_batch() {
let file = create_temp_fastq(SAMPLE_FASTQ);
let mut reader = DirectFastqReader::new(file.path()).unwrap();
let batch1 = reader.read_batch(1).unwrap();
assert_eq!(batch1.len(), 1);
assert_eq!(batch1[0].name, b"read1");
let batch2 = reader.read_batch(10).unwrap();
assert_eq!(batch2.len(), 1);
assert_eq!(batch2[0].name, b"read2");
let batch3 = reader.read_batch(10).unwrap();
assert!(batch3.is_empty());
}
#[test]
fn test_direct_paired_reader_basic() {
let r1_content = b"@read1/1
AAAA
+
IIII
@read2/1
CCCC
+
IIII
";
let r2_content = b"@read1/2
TTTT
+
IIII
@read2/2
GGGG
+
IIII
";
let file1 = create_temp_fastq(r1_content);
let file2 = create_temp_fastq(r2_content);
let mut reader = DirectPairedFastqReader::new(file1.path(), file2.path()).unwrap();
let mut pool1 = ReadPool::new(256);
let mut pool2 = ReadPool::new(256);
let pairs = reader.read_batch_pooled(10, &mut pool1, &mut pool2).unwrap();
assert_eq!(pairs.len(), 2);
assert_eq!(pairs[0].0.seq, b"AAAA");
assert_eq!(pairs[0].1.seq, b"TTTT");
assert_eq!(pairs[1].0.seq, b"CCCC");
assert_eq!(pairs[1].1.seq, b"GGGG");
assert!(reader.is_finished());
}
#[test]
fn test_direct_paired_reader_mismatch_r1_longer() {
let r1_content = b"@read1/1
AAAA
+
IIII
@read2/1
CCCC
+
IIII
";
let r2_content = b"@read1/2
TTTT
+
IIII
";
let file1 = create_temp_fastq(r1_content);
let file2 = create_temp_fastq(r2_content);
let mut reader = DirectPairedFastqReader::new(file1.path(), file2.path()).unwrap();
let mut pool1 = ReadPool::new(256);
let mut pool2 = ReadPool::new(256);
let result = reader.read_batch_pooled(10, &mut pool1, &mut pool2);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("R1 file has more records"));
}
#[test]
fn test_direct_paired_reader_mismatch_r2_longer() {
let r1_content = b"@read1/1
AAAA
+
IIII
";
let r2_content = b"@read1/2
TTTT
+
IIII
@read2/2
GGGG
+
IIII
";
let file1 = create_temp_fastq(r1_content);
let file2 = create_temp_fastq(r2_content);
let mut reader = DirectPairedFastqReader::new(file1.path(), file2.path()).unwrap();
let mut pool1 = ReadPool::new(256);
let mut pool2 = ReadPool::new(256);
let result = reader.read_batch_pooled(10, &mut pool1, &mut pool2);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("R2 file has more records"));
}
#[test]
fn test_direct_paired_reader_batch_no_pool() {
let r1_content = b"@read1/1
AAAA
+
IIII
";
let r2_content = b"@read1/2
TTTT
+
IIII
";
let file1 = create_temp_fastq(r1_content);
let file2 = create_temp_fastq(r2_content);
let mut reader = DirectPairedFastqReader::new(file1.path(), file2.path()).unwrap();
let pairs = reader.read_batch(10).unwrap();
assert_eq!(pairs.len(), 1);
assert_eq!(pairs[0].0.seq, b"AAAA");
assert_eq!(pairs[0].1.seq, b"TTTT");
}
#[test]
fn test_is_gzipped_detection() {
assert!(is_gzipped(Path::new("file.fastq.gz")));
assert!(is_gzipped(Path::new("file.fq.gz")));
assert!(is_gzipped(Path::new("file.FASTQ.GZ")));
assert!(is_gzipped(Path::new("file.gzip")));
assert!(!is_gzipped(Path::new("file.fastq")));
assert!(!is_gzipped(Path::new("file.fq")));
}
#[test]
fn test_pool_memory_reuse() {
let file = create_temp_fastq(SAMPLE_FASTQ);
let mut reader = DirectFastqReader::new(file.path()).unwrap();
let mut pool = ReadPool::new(256);
pool.prefill(5);
assert_eq!(pool.len(), 5);
let records = reader.read_batch_pooled(2, &mut pool).unwrap();
assert_eq!(records.len(), 2);
assert_eq!(pool.len(), 3);
pool.release_batch(records);
assert_eq!(pool.len(), 5); }
}