use anyhow::{Context, Result};
use log::{debug, info};
use noodles::bam;
use noodles::bgzf;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom, Write};
const BGZF_MAGIC: &[u8; 4] = &[0x1f, 0x8b, 0x08, 0x04];
const BGZF_MAX_BLOCK_SIZE: usize = 65536;
const PHRED_OFFSET: u8 = 33;
fn is_valid_bgzf_header(buffer: &[u8], i: usize) -> bool {
if i + 16 > buffer.len() {
return false;
}
if &buffer[i..i + 4] != BGZF_MAGIC.as_slice() {
return false;
}
if buffer[i + 12] != 0x42 || buffer[i + 13] != 0x43 {
return false;
}
if buffer[i + 14] != 0x02 || buffer[i + 15] != 0x00 {
return false;
}
true
}
fn find_next_bgzf_block<R>(
reader: &mut R,
approximate_offset: u64,
buffer: &mut Vec<u8>,
) -> Result<Option<u64>>
where
R: Read + Seek,
{
if approximate_offset == 0 {
return Ok(Some(0));
}
reader
.seek(SeekFrom::Start(approximate_offset))
.with_context(|| format!("Failed to seek to offset {approximate_offset}"))?;
let required_size = BGZF_MAX_BLOCK_SIZE + 15;
buffer.resize(required_size, 0);
let mut bytes_read = 0;
while bytes_read < buffer.len() {
let n = reader.read(&mut buffer[bytes_read..])?;
if n == 0 {
if bytes_read == 0 {
info!("Reached EOF at offset {approximate_offset} without finding BGZF block");
return Ok(None);
}
break; }
bytes_read += n;
}
for i in 0..bytes_read {
if is_valid_bgzf_header(buffer, i) {
let block_offset = approximate_offset + i as u64;
debug!("Found BGZF block signature at offset {block_offset}");
return Ok(Some(block_offset));
}
}
if bytes_read < buffer.len() {
return Ok(None);
}
anyhow::bail!(
"Scanned {bytes_read} bytes from offset {approximate_offset} without finding a BGZF block."
);
}
fn write_fastq_to(record: &bam::Record, output: &mut dyn Write) -> std::io::Result<()> {
let name = record
.name()
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing read name"))?;
output.write_all(b"@")?;
output.write_all(name.as_ref())?;
let flags = record.flags();
let is_paired = flags.is_segmented();
if is_paired {
if flags.is_last_segment() {
output.write_all(b"/2")?;
} else {
output.write_all(b"/1")?;
}
}
let data = record.data();
for result in data.iter() {
let (tag, value) =
result.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
if tag.as_ref() == b"BC" {
use noodles::sam::alignment::record::data::field::Value;
if let Value::String(s) = value {
let filter_flag = if flags.is_qc_fail() { 'Y' } else { 'N' };
let bc_str = std::str::from_utf8(s.as_ref()).unwrap_or("INVALID");
if is_paired {
let read_num = if flags.is_last_segment() { 2 } else { 1 };
write!(output, " {read_num}:{filter_flag}:0:{bc_str}")?;
} else {
write!(output, " 0:{filter_flag}:0:{bc_str}")?;
}
}
break;
}
}
output.write_all(b"\n")?;
let sequence = record.sequence();
for base in sequence.iter() {
output.write_all(&[base])?;
}
output.write_all(b"\n+\n")?;
let quality_scores = record.quality_scores();
for score in quality_scores.iter() {
output.write_all(&[score + PHRED_OFFSET])?;
}
output.write_all(b"\n")?;
Ok(())
}
fn is_valid_record_start(buf: &[u8], len: usize) -> bool {
if len < 36 {
return false;
}
let block_size = i32::from_le_bytes(buf[0..4].try_into().unwrap());
if !(32..=100_000_000).contains(&block_size) {
return false;
}
let ref_id = i32::from_le_bytes(buf[4..8].try_into().unwrap());
if ref_id < -1 {
return false;
}
let pos = i32::from_le_bytes(buf[8..12].try_into().unwrap());
if pos < -1 {
return false;
}
let l_read_name = buf[12];
if l_read_name < 2 {
return false;
}
let n_cigar_op = u16::from_le_bytes(buf[16..18].try_into().unwrap());
let l_seq = i32::from_le_bytes(buf[20..24].try_into().unwrap());
if l_seq < 0 {
return false;
}
if len >= 36 + l_read_name as usize && buf[36 + l_read_name as usize - 1] != 0 {
return false;
}
let min_size = 32
+ i64::from(l_read_name)
+ i64::from(n_cigar_op) * 4
+ (i64::from(l_seq) + 1) / 2
+ i64::from(l_seq);
if i64::from(block_size) < min_size {
return false;
}
true
}
fn find_first_record_in_block<R>(
reader: &mut R,
block_start: u64,
buffer: &mut Vec<u8>,
) -> Result<Option<u64>>
where
R: Read + Seek,
{
reader.seek(SeekFrom::Start(block_start))?;
let mut reader = flate2::read::GzDecoder::new(reader);
buffer.resize(BGZF_MAX_BLOCK_SIZE, 0);
let bytes_read = std::io::Read::read(&mut reader, buffer)?;
if bytes_read < 36 {
return Ok(None);
}
for i in 0..bytes_read - 36 {
if is_valid_record_start(&buffer[i..], bytes_read - i) {
return Ok(Some(i as u64));
}
}
Ok(None)
}
fn validate_block<R>(
reader: &mut R,
aligned_start: u64,
buffer: &mut Vec<u8>,
) -> Result<Option<u64>>
where
R: Read + Seek,
{
reader.seek(SeekFrom::Start(aligned_start))?;
match find_first_record_in_block(reader, aligned_start, buffer) {
Ok(Some(offset)) => {
let offset_u16 =
u16::try_from(offset).map_err(|e| anyhow::anyhow!("Invalid offset: {e}"))?;
let new_virtual_pos = bgzf::VirtualPosition::try_from((aligned_start, offset_u16))
.map_err(|e| anyhow::anyhow!("Invalid virtual position: {e}"))?;
Ok(Some(u64::from(new_virtual_pos)))
}
Ok(None) | Err(_) => Ok(None), }
}
fn process_records<R>(
reader: &mut bam::io::Reader<bgzf::io::Reader<R>>,
start_aligned_offset: u64,
end_offset: u64,
output: &mut dyn Write,
) -> Result<usize>
where
R: Read + Seek,
{
let mut read_count = 0;
let mut record = bam::Record::default();
let mut blocks_processed = 0;
let mut prev_block_offset = start_aligned_offset;
let mut prev_was_read1: Option<bool> = None;
loop {
let virtual_pos = reader.get_ref().virtual_position();
let current_block_offset = virtual_pos.compressed();
if current_block_offset != prev_block_offset {
blocks_processed += 1;
prev_block_offset = current_block_offset;
if current_block_offset >= end_offset {
if prev_was_read1 == Some(true) {
debug!(
"Reached end_offset at block {current_block_offset}, but need to read mate for previous read1"
);
} else {
debug!("Reached end of processing region at block {current_block_offset}");
break;
}
} else {
debug!("Processing block at offset {current_block_offset}");
}
}
match reader.read_record(&mut record) {
Ok(0) => break, Ok(_) => {}
Err(e) => {
if current_block_offset >= end_offset {
debug!("Reached end of processing region at block {current_block_offset}");
break;
}
return Err(anyhow::Error::new(e)).context("Failed to read BAM record");
}
}
let flags = record.flags();
let current_is_read1 = flags.is_first_segment();
if flags.is_segmented() {
if read_count == 0 && !current_is_read1 {
debug!(
"Skipping read2 at start of processing region, it was handled in the previous region"
);
continue;
}
if prev_was_read1 == Some(true) && current_is_read1 {
anyhow::bail!(
"Found consecutive R1 reads ({}). Input BAM must be collated.",
String::from_utf8_lossy(record.name().unwrap_or_default().as_ref())
);
}
if prev_was_read1 == Some(false) && !current_is_read1 {
anyhow::bail!(
"Found R2 read ({}) without preceding R1. Input BAM must be collated.",
String::from_utf8_lossy(record.name().unwrap_or_default().as_ref())
);
}
prev_was_read1 = Some(current_is_read1);
} else {
prev_was_read1 = Some(false);
}
write_fastq_to(&record, output).context("Failed to write FASTQ output")?;
read_count += 1;
if read_count % 100_000 == 0 {
info!("Processed {read_count} reads...");
}
if current_block_offset >= end_offset {
let can_stop = if flags.is_segmented() {
!current_is_read1 } else {
true };
if can_stop {
debug!("Completed beyond end_offset, stopping");
break;
}
}
}
info!("Completed: {blocks_processed} blocks, {read_count} reads");
Ok(read_count)
}
pub fn process_blocks(
input_path: &str,
start_offset: u64,
end_offset: u64,
output: &mut dyn Write,
) -> Result<usize> {
let file = File::open(input_path).with_context(|| format!("Failed to open {input_path}"))?;
let mut reader = std::io::BufReader::new(file);
let mut current_offset = start_offset;
let mut total_reads = 0;
let mut buffer = vec![0u8; BGZF_MAX_BLOCK_SIZE + 15];
loop {
let Some(block_start) = find_next_bgzf_block(&mut reader, current_offset, &mut buffer)?
else {
break;
};
if block_start >= end_offset {
break;
}
if let Some(virtual_pos) = validate_block(&mut reader, block_start, &mut buffer)? {
let aligned_start = virtual_pos >> 16;
info!(
"Processing byte range: {start_offset} to {end_offset} (aligned to block at {aligned_start})"
);
reader.seek(SeekFrom::Start(aligned_start))?;
let mut bgzf_reader = bgzf::io::Reader::new(&mut reader);
bgzf_reader.seek(bgzf::VirtualPosition::from(virtual_pos))?;
let mut bam_reader = bam::io::Reader::from(bgzf_reader);
let count = process_records(&mut bam_reader, aligned_start, end_offset, output)?;
total_reads += count;
break;
}
debug!("Invalid block at {block_start}, continuing search");
current_offset = block_start + 1;
}
Ok(total_reads)
}