use std::fs::File;
use std::io;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::Path;
#[cfg(feature = "bzip2")]
use bzip2::bufread::BzDecoder;
#[cfg(feature = "gzip")]
use flate2::bufread::MultiGzDecoder;
#[cfg(feature = "xz")]
use liblzma::read::XzDecoder;
use needletail::{parse_fastx_reader, FastxReader};
#[cfg(feature = "zstd")]
use zstd::stream::read::Decoder as ZstdDecoder;
#[cfg(feature = "alignment")]
use noodles_util::alignment;
#[derive(Debug, PartialEq, Copy, Clone, Default)]
enum CompressionFormat {
#[cfg(feature = "bzip2")]
Bzip2,
#[cfg(feature = "gzip")]
Gzip,
#[default]
None,
#[cfg(feature = "xz")]
Xz,
#[cfg(feature = "zstd")]
Zstd,
}
fn detect_compression_format<R: Read + Seek>(reader: &mut R) -> io::Result<CompressionFormat> {
let original_position = reader.stream_position()?;
reader.seek(SeekFrom::Start(0))?;
let mut magic = [0; 5];
let n = reader.read(&mut magic)?;
let magic = &magic[..n];
let format = match magic {
#[cfg(feature = "gzip")]
[0x1f, 0x8b, ..] => CompressionFormat::Gzip,
#[cfg(feature = "bzip2")]
[0x42, 0x5a, ..] => CompressionFormat::Bzip2,
#[cfg(feature = "zstd")]
[0x28, 0xb5, 0x2f, 0xfd, ..] => CompressionFormat::Zstd,
#[cfg(feature = "xz")]
[0xfd, 0x37, 0x7a, 0x58, 0x5a] => CompressionFormat::Xz,
_ => CompressionFormat::None,
};
reader.seek(SeekFrom::Start(original_position))?;
Ok(format)
}
pub(crate) enum SeqReader {
Fastx(Box<dyn FastxReader>),
#[cfg(feature = "alignment")]
Alignment(alignment::io::Reader<Box<dyn Read + Send>>),
}
impl SeqReader {
pub fn new<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let mut file = File::open(&path).map(BufReader::new)?;
let compression_format = detect_compression_format(&mut file)?;
let decompressed_reader: Box<dyn Read + Send> = match compression_format {
#[cfg(feature = "gzip")]
CompressionFormat::Gzip => Box::new(MultiGzDecoder::new(file)),
#[cfg(feature = "zstd")]
CompressionFormat::Zstd => Box::new(ZstdDecoder::new(file)?),
#[cfg(feature = "bzip2")]
CompressionFormat::Bzip2 => Box::new(BzDecoder::new(file)),
#[cfg(feature = "xz")]
CompressionFormat::Xz => Box::new(XzDecoder::new(file)),
CompressionFormat::None => Box::new(file),
};
let mut reader = decompressed_reader;
let mut magic = [0; 4];
let n = reader.read(&mut magic)?;
let magic_slice = &magic[..n];
let is_alignment = magic_slice.starts_with(b"BAM\x01")
|| magic_slice.starts_with(b"CRAM")
|| magic_slice.starts_with(b"@HD")
|| magic_slice.starts_with(b"@SQ")
|| magic_slice.starts_with(b"@RG");
let full_reader: Box<dyn Read + Send> =
Box::new(io::Cursor::new(magic_slice.to_vec()).chain(reader));
#[cfg(feature = "alignment")]
if is_alignment {
let aln_reader = alignment::io::Reader::new(full_reader)?;
return Ok(Self::Alignment(aln_reader));
}
if is_alignment && cfg!(not(feature = "alignment")) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Alignment format detected but 'alignment' feature is disabled",
));
}
let fastx_reader = parse_fastx_reader(full_reader)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
Ok(Self::Fastx(fastx_reader))
}
}
pub(crate) fn count_records<P: AsRef<Path>>(path: P) -> io::Result<usize> {
let mut reader = SeqReader::new(path)?;
let mut count = 0;
match &mut reader {
SeqReader::Fastx(r) => {
while let Some(res) = r.next() {
res.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
count += 1;
}
}
#[cfg(feature = "alignment")]
SeqReader::Alignment(r) => {
let header = r.read_header()?;
for res in r.records(&header) {
res?;
count += 1;
}
}
}
if count == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Is the file empty?",
));
}
Ok(count)
}
pub(crate) fn iter_records<P: AsRef<Path>>(
path: P,
mut callback: impl FnMut(&[u8], &[u8]) -> io::Result<()>,
) -> io::Result<()> {
let mut reader = SeqReader::new(path)?;
match &mut reader {
SeqReader::Fastx(r) => {
while let Some(res) = r.next() {
let rec = res.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
callback(rec.read_id(), &rec.seq())?;
}
}
#[cfg(feature = "alignment")]
SeqReader::Alignment(r) => {
let header = r.read_header()?;
let mut seq_buf = Vec::new();
for res in r.records(&header) {
let record = res?;
if !record.flags().unwrap_or_default().is_unmapped() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Mapped records are not supported. Only unaligned BAM/CRAM/SAM is allowed.",
));
}
let name = record.name().map(|n| n.as_ref()).unwrap_or_default();
let sequence = record.sequence();
seq_buf.clear();
for base in sequence.as_ref().iter() {
seq_buf.push(base);
}
callback(name, &seq_buf)?;
}
}
}
Ok(())
}
pub(crate) enum Message {
Data((Vec<u8>, Vec<u8>)),
}
pub(crate) trait FastqRecordExt {
fn read_id(&self) -> &[u8];
}
impl FastqRecordExt for needletail::parser::SequenceRecord<'_> {
fn read_id(&self) -> &[u8] {
let id = self.id();
id.split(|&x| x.is_ascii_whitespace())
.next()
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_detect_gzip_format() {
let data = vec![
0x1f, 0x8b, 0x08, 0x08, 0x1c, 0x6b, 0xe2, 0x66, 0x00, 0x03, 0x74, 0x65, 0x78, 0x74,
0x2e, 0x74, 0x78, 0x74, 0x00, 0x4b, 0xcb, 0xcf, 0x57, 0x48, 0x4a, 0x2c, 0xe2, 0x02,
0x00, 0x27, 0xb4, 0xdd, 0x13, 0x08, 0x00, 0x00, 0x00,
];
let mut reader = Cursor::new(data);
let original_position = reader.position();
let format = detect_compression_format(&mut reader).unwrap();
assert_eq!(format, CompressionFormat::Gzip);
assert_eq!(reader.position(), original_position);
}
#[test]
fn test_detect_bzip2_format() {
let data = vec![
0x42, 0x5a, 0x68, 0x39, 0x31, 0x41, 0x59, 0x26, 0x53, 0x59, 0x7b, 0x6e, 0xa8, 0x38,
0x00, 0x00, 0x02, 0x51, 0x80, 0x00, 0x10, 0x40, 0x00, 0x31, 0x00, 0x90, 0x00, 0x20,
0x00, 0x22, 0x1a, 0x63, 0x50, 0x86, 0x00, 0x2c, 0x8c, 0x3c, 0x5d, 0xc9, 0x14, 0xe1,
0x42, 0x41, 0xed, 0xba, 0xa0, 0xe0,
];
let mut reader = Cursor::new(data);
let original_position = reader.position();
let format = detect_compression_format(&mut reader).unwrap();
assert_eq!(format, CompressionFormat::Bzip2);
assert_eq!(reader.position(), original_position);
}
#[test]
fn test_detect_zstd_format() {
let data = vec![
0x28, 0xb5, 0x2f, 0xfd, 0x24, 0x08, 0x41, 0x00, 0x00, 0x66, 0x6f, 0x6f, 0x20, 0x62,
0x61, 0x72, 0x0a, 0x37, 0x17, 0xa5, 0xec,
];
let mut reader = Cursor::new(data);
let original_position = reader.position();
let format = detect_compression_format(&mut reader).unwrap();
assert_eq!(format, CompressionFormat::Zstd);
assert_eq!(reader.position(), original_position);
}
#[test]
fn test_detect_xz_format() {
let data = vec![
0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00, 0x00, 0x04, 0xe6, 0xd6, 0xb4, 0x46, 0x02, 0x00,
0x21, 0x01, 0x16, 0x00, 0x00, 0x00, 0x74, 0x2f, 0xe5, 0xa3, 0x01, 0x00, 0x07, 0x66,
0x6f, 0x6f, 0x20, 0x62, 0x61, 0x72, 0x0a, 0x00, 0xfd, 0xbb, 0xfb, 0x3b, 0x8e, 0xcc,
0x32, 0x13, 0x00, 0x01, 0x20, 0x08, 0xbb, 0x19, 0xd9, 0xbb, 0x1f, 0xb6, 0xf3, 0x7d,
0x01, 0x00, 0x00, 0x00, 0x00, 0x04, 0x59, 0x5a,
];
let mut reader = Cursor::new(data);
let original_position = reader.position();
let format = detect_compression_format(&mut reader).unwrap();
assert_eq!(format, CompressionFormat::Xz);
assert_eq!(reader.position(), original_position);
}
#[test]
fn test_detect_none_format() {
let data = b"I'm not compressed";
let mut reader = Cursor::new(data);
let format = detect_compression_format(&mut reader).unwrap();
assert_eq!(format, CompressionFormat::None);
}
#[test]
fn test_detect_format_when_reader_is_part_way_through() {
let data = vec![
0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00, 0x00, 0x04, 0xe6, 0xd6, 0xb4, 0x46, 0x02, 0x00,
0x21, 0x01, 0x16, 0x00, 0x00, 0x00, 0x74, 0x2f, 0xe5, 0xa3, 0x01, 0x00, 0x07, 0x66,
0x6f, 0x6f, 0x20, 0x62, 0x61, 0x72, 0x0a, 0x00, 0xfd, 0xbb, 0xfb, 0x3b, 0x8e, 0xcc,
0x32, 0x13, 0x00, 0x01, 0x20, 0x08, 0xbb, 0x19, 0xd9, 0xbb, 0x1f, 0xb6, 0xf3, 0x7d,
0x01, 0x00, 0x00, 0x00, 0x00, 0x04, 0x59, 0x5a,
];
let mut reader = Cursor::new(data);
reader.seek(SeekFrom::Start(10)).unwrap();
let original_position = reader.position();
let format = detect_compression_format(&mut reader).unwrap();
assert_eq!(format, CompressionFormat::Xz);
assert_eq!(reader.position(), original_position);
}
#[test]
fn test_read_id_no_comment() {
let data = b"@SEQ_ID\nGATTA\n+\n!!!!!\n";
let reader = Cursor::new(data);
let mut fastx_reader = parse_fastx_reader(reader).unwrap();
let record = fastx_reader.next().unwrap().unwrap();
assert_eq!(record.read_id(), b"SEQ_ID");
}
#[test]
fn test_read_id_with_comment() {
let data = b"@SEQ_ID comment\nGATTA\n+\n!!!!!\n";
let reader = Cursor::new(data);
let mut fastx_reader = parse_fastx_reader(reader).unwrap();
let record = fastx_reader.next().unwrap().unwrap();
assert_eq!(record.read_id(), b"SEQ_ID");
}
#[test]
fn test_read_id_with_empty_comment() {
let data = b"@SEQ_ID \nGATTA\n+\n!!!!!\n";
let reader = Cursor::new(data);
let mut fastx_reader = parse_fastx_reader(reader).unwrap();
let record = fastx_reader.next().unwrap().unwrap();
assert_eq!(record.read_id(), b"SEQ_ID");
}
#[test]
fn test_read_id_with_multiple_spaces() {
let data = b"@SEQ_ID comment\nGATTA\n+\n!!!!!\n";
let reader = Cursor::new(data);
let mut fastx_reader = parse_fastx_reader(reader).unwrap();
let record = fastx_reader.next().unwrap().unwrap();
assert_eq!(record.read_id(), b"SEQ_ID");
}
#[test]
fn test_read_id_with_tabs() {
let data = b"@SEQ_ID\tst:Z:2024-06-05T11:34:21.517+00:00\tRG:Z:0e9626940687df5718807f8d3dcf3c2d2b2e49c6_dna_r10.4.1_e8.2_400bps_sup@v5.0.0_SQK-RBK114-96_barcode58\nGATTA\n+\n!!!!!\n";
let reader = Cursor::new(data);
let mut fastx_reader = parse_fastx_reader(reader).unwrap();
let record = fastx_reader.next().unwrap().unwrap();
assert_eq!(record.read_id(), b"SEQ_ID");
}
#[test]
#[cfg(feature = "alignment")]
fn test_detect_alignment_format() {
use std::io::Write;
let mut temp_file = tempfile::NamedTempFile::new().unwrap();
write!(
temp_file,
"@HD\tVN:1.6\tSO:unsorted\nREAD1\t4\t*\t0\t0\t*\t*\t0\t0\tGATTA\t!!!!!\n"
)
.unwrap();
let mut names = Vec::new();
let mut seqs = Vec::new();
iter_records(temp_file.path(), |id, seq| {
names.push(id.to_vec());
seqs.push(seq.to_vec());
Ok(())
})
.unwrap();
assert_eq!(names, vec![b"READ1"]);
assert_eq!(seqs, vec![b"GATTA"]);
}
#[test]
#[cfg(feature = "alignment")]
fn test_detect_mapped_alignment_errors() {
use std::io::Write;
let mut temp_file = tempfile::NamedTempFile::new().unwrap();
write!(
temp_file,
"@HD\tVN:1.6\tSO:unsorted\n@SQ\tSN:chr1\tLN:1000\nREAD1\t0\tchr1\t1\t0\t5M\t*\t0\t0\tGATTA\t!!!!!\n"
)
.unwrap();
let result = iter_records(temp_file.path(), |_, _| Ok(()));
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Mapped records are not supported"));
}
}