use std::io;
use crate::unified_pipeline::MemoryEstimate;
#[derive(Debug, Clone)]
pub struct FastqRecord {
data: Vec<u8>,
name_end: u32,
seq_end: u32,
qual_start: u32,
}
impl FastqRecord {
pub fn from_slice(data: &[u8]) -> io::Result<Self> {
if data.is_empty() || data[0] != b'@' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ record must start with @",
));
}
let mut newline_positions = [0usize; 3];
let mut count = 0;
for (i, &byte) in data.iter().enumerate() {
if byte == b'\n' {
if count < 3 {
newline_positions[count] = i;
count += 1;
} else {
break;
}
}
}
if count < 3 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ record must have at least 3 internal newlines",
));
}
let name_end = newline_positions[0]; let seq_end = newline_positions[1]; let plus_end = newline_positions[2]; let qual_start = plus_end + 1;
if data[seq_end + 1] != b'+' {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ separator line must start with +",
));
}
let seq_len = seq_end - (name_end + 1);
let qual_end = if data.last() == Some(&b'\n') { data.len() - 1 } else { data.len() };
let qual_len = qual_end - qual_start;
if seq_len != qual_len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Sequence length ({seq_len}) != quality length ({qual_len})"),
));
}
let to_u32 = |v: usize, field: &str| {
u32::try_from(v).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, format!("{field} overflows u32"))
})
};
Ok(Self {
data: data.to_vec(),
name_end: to_u32(name_end, "name_end")?,
seq_end: to_u32(seq_end, "seq_end")?,
qual_start: to_u32(qual_start, "qual_start")?,
})
}
#[inline]
#[must_use]
pub fn name(&self) -> &[u8] {
&self.data[1..self.name_end as usize]
}
#[inline]
#[must_use]
pub fn sequence(&self) -> &[u8] {
&self.data[self.name_end as usize + 1..self.seq_end as usize]
}
#[inline]
#[must_use]
pub fn quality(&self) -> &[u8] {
let qual_end =
if self.data.last() == Some(&b'\n') { self.data.len() - 1 } else { self.data.len() };
&self.data[self.qual_start as usize..qual_end]
}
}
impl MemoryEstimate for FastqRecord {
fn estimate_heap_size(&self) -> usize {
self.data.capacity()
}
}
#[derive(Debug)]
enum FastqParseResult {
Incomplete,
Error(io::Error),
}
pub fn parse_fastq_records(data: &[u8]) -> io::Result<(Vec<FastqRecord>, Vec<u8>)> {
let mut records = Vec::new();
let mut pos = 0;
while pos < data.len() {
if data[pos] != b'@' {
while pos < data.len() && data[pos] != b'@' {
pos += 1;
}
if pos >= data.len() {
return Ok((records, Vec::new()));
}
}
match parse_single_fastq_record(&data[pos..]) {
Ok((record, consumed)) => {
records.push(record);
pos += consumed;
}
Err(FastqParseResult::Incomplete) => {
return Ok((records, data[pos..].to_vec()));
}
Err(FastqParseResult::Error(e)) => {
return Err(e);
}
}
}
Ok((records, Vec::new()))
}
fn parse_single_fastq_record(data: &[u8]) -> Result<(FastqRecord, usize), FastqParseResult> {
let mut pos = 0;
if data.is_empty() || data[0] != b'@' {
return Err(FastqParseResult::Error(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ record must start with @",
)));
}
let name_end_rel = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
pos += name_end_rel + 1;
if pos >= data.len() {
return Err(FastqParseResult::Incomplete);
}
let seq_end_rel = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
let seq_len = seq_end_rel;
pos += seq_end_rel + 1;
if pos >= data.len() {
return Err(FastqParseResult::Incomplete);
}
if data[pos] != b'+' {
return Err(FastqParseResult::Error(io::Error::new(
io::ErrorKind::InvalidData,
"FASTQ separator line must start with +",
)));
}
let plus_end_rel = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
pos += plus_end_rel + 1;
if pos >= data.len() {
return Err(FastqParseResult::Incomplete);
}
let (qual_len, advance) = if let Some(rel) = find_newline(&data[pos..]) {
(rel, rel + 1)
} else {
let remaining = data.len() - pos;
(remaining, remaining)
};
pos += advance;
if seq_len != qual_len {
return Err(FastqParseResult::Error(io::Error::new(
io::ErrorKind::InvalidData,
format!("Sequence length ({seq_len}) != quality length ({qual_len})"),
)));
}
let record = FastqRecord::from_slice(&data[..pos]).map_err(FastqParseResult::Error)?;
Ok((record, pos))
}
fn find_newline(data: &[u8]) -> Option<usize> {
data.iter().position(|&b| b == b'\n')
}
#[must_use]
pub fn strip_read_suffix(name: &[u8]) -> &[u8] {
let name = match name.iter().position(|&b| b == b' ') {
Some(space_pos) => &name[..space_pos],
None => name,
};
if name.len() >= 2 {
let last = name[name.len() - 1];
let sep = name[name.len() - 2];
if (last == b'1' || last == b'2')
&& (sep == b'/' || sep == b'.' || sep == b'_' || sep == b':')
{
return &name[..name.len() - 2];
}
}
name
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_single_fastq_record() {
let data = b"@read1\nACGT\n+\nIIII\n";
let (record, consumed) =
parse_single_fastq_record(data).expect("parse single FASTQ record");
assert_eq!(record.name(), b"read1");
assert_eq!(record.sequence(), b"ACGT");
assert_eq!(record.quality(), b"IIII");
assert_eq!(consumed, data.len());
}
#[test]
fn test_parse_fastq_records_multiple() {
let data = b"@read1\nACGT\n+\nIIII\n@read2\nTGCA\n+\nJJJJ\n";
let (records, leftover) = parse_fastq_records(data).expect("failed to parse FASTQ records");
assert_eq!(records.len(), 2);
assert_eq!(records[0].name(), b"read1");
assert_eq!(records[1].name(), b"read2");
assert!(leftover.is_empty());
}
#[test]
fn test_parse_fastq_incomplete_record() {
let data = b"@read1\nACGT\n+\n";
let (records, leftover) = parse_fastq_records(data).expect("failed to parse FASTQ records");
assert!(records.is_empty());
assert_eq!(leftover, data);
}
#[test]
fn test_parse_fastq_eof_without_trailing_newline() {
let data = b"@read1\nACGT\n+\nIIII";
let (records, leftover) = parse_fastq_records(data).expect("failed to parse FASTQ records");
assert_eq!(records.len(), 1);
assert_eq!(records[0].name(), b"read1");
assert_eq!(records[0].sequence(), b"ACGT");
assert_eq!(records[0].quality(), b"IIII");
assert!(leftover.is_empty());
}
#[test]
fn test_parse_fastq_eof_no_newline_seq_qual_mismatch() {
let data = b"@read1\nACGT\n+\nIII";
let result = parse_fastq_records(data);
assert!(
result.is_err() || {
let (recs, leftover) = result.unwrap();
recs.is_empty() && !leftover.is_empty()
}
);
}
#[test]
fn test_parse_fastq_with_leftover() {
let data = b"@read1\nACGT\n+\nIIII\n@read2\nTG";
let (records, leftover) = parse_fastq_records(data).expect("failed to parse FASTQ records");
assert_eq!(records.len(), 1);
assert_eq!(records[0].name(), b"read1");
assert_eq!(leftover, b"@read2\nTG");
}
#[test]
fn test_strip_read_suffix() {
assert_eq!(strip_read_suffix(b"read1/1"), b"read1");
assert_eq!(strip_read_suffix(b"read1/2"), b"read1");
assert_eq!(strip_read_suffix(b"read1.1"), b"read1");
assert_eq!(strip_read_suffix(b"read1.2"), b"read1");
assert_eq!(strip_read_suffix(b"read1_1"), b"read1");
assert_eq!(strip_read_suffix(b"read1_2"), b"read1");
assert_eq!(strip_read_suffix(b"read1:1"), b"read1");
assert_eq!(strip_read_suffix(b"read1:2"), b"read1");
assert_eq!(strip_read_suffix(b"read1/1 1:N:0:ATCACG"), b"read1");
assert_eq!(strip_read_suffix(b"read1 1:N:0:ATCACG"), b"read1");
assert_eq!(strip_read_suffix(b"read1"), b"read1");
assert_eq!(strip_read_suffix(b"a"), b"a");
assert_eq!(strip_read_suffix(b""), b"" as &[u8]);
}
}