use std::io::{self, BufRead, BufReader, Read};
const READ_CHUNK_SIZE: usize = 64 * 1024 * 1024; const INITIAL_BUFFER_SIZE: usize = 64 * 1024 * 1024; const GROWTH_FACTOR: f64 = 1.25;
pub struct BatchedSamReader<R: Read> {
inner: BufReader<R>,
buffer: Vec<u8>,
buffer_len: usize, parse_pos: usize,
chunk_size: u64, bases_this_batch: u64,
records_this_batch: usize,
bytes_at_batch_start: usize,
batches_completed: usize,
total_bytes_read: usize,
}
impl<R: Read> BatchedSamReader<R> {
pub fn new(reader: R, chunk_size: u64) -> Self {
Self {
inner: BufReader::with_capacity(READ_CHUNK_SIZE, reader),
buffer: vec![0u8; INITIAL_BUFFER_SIZE],
buffer_len: 0,
parse_pos: 0,
chunk_size,
bases_this_batch: 0,
records_this_batch: 0,
bytes_at_batch_start: 0,
batches_completed: 0,
total_bytes_read: 0,
}
}
pub fn fill_buffer(&mut self) -> io::Result<bool> {
let unparsed_len = self.buffer_len.saturating_sub(self.parse_pos);
if unparsed_len > self.buffer.len() / 2 {
let new_size = self.buffer.len() * 2;
log::info!(
"Growing buffer (proactive): {}MB -> {}MB (unparsed: {}MB)",
self.buffer.len() / (1024 * 1024),
new_size / (1024 * 1024),
unparsed_len / (1024 * 1024)
);
self.buffer.resize(new_size, 0);
}
if self.parse_pos > 0 && unparsed_len > 0 {
self.buffer.copy_within(self.parse_pos..self.buffer_len, 0);
}
self.buffer_len = unparsed_len;
self.bytes_at_batch_start = self.bytes_at_batch_start.saturating_sub(self.parse_pos);
self.parse_pos = 0;
while self.buffer_len < self.buffer.len() {
let space = self.buffer.len() - self.buffer_len;
let to_read = std::cmp::min(space, READ_CHUNK_SIZE);
let n =
self.inner.read(&mut self.buffer[self.buffer_len..self.buffer_len + to_read])?;
if n == 0 {
return Ok(self.buffer_len > 0);
}
self.buffer_len += n;
self.total_bytes_read += n;
}
Ok(true)
}
pub fn buffer(&self) -> &[u8] {
&self.buffer[self.parse_pos..self.buffer_len]
}
fn at_batch_boundary(&self) -> bool {
self.bases_this_batch >= self.chunk_size && self.records_this_batch.is_multiple_of(2)
}
pub fn record_parsed(&mut self, seq_len: usize, bytes_consumed: usize) -> bool {
self.bases_this_batch += seq_len as u64;
self.records_this_batch += 1;
self.parse_pos += bytes_consumed;
if self.at_batch_boundary() {
let bytes_this_batch = self.parse_pos - self.bytes_at_batch_start;
let grew = self.maybe_grow(bytes_this_batch);
self.batches_completed += 1;
self.bases_this_batch = 0;
self.records_this_batch = 0;
self.bytes_at_batch_start = self.parse_pos;
grew
} else {
false
}
}
pub fn advance(&mut self, bytes: usize) {
self.parse_pos += bytes;
}
#[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss, clippy::cast_sign_loss)]
fn maybe_grow(&mut self, bytes_this_batch: usize) -> bool {
let needed = ((bytes_this_batch as f64) * GROWTH_FACTOR) as usize;
if needed > self.buffer.len() {
let new_size = needed.next_power_of_two();
log::info!(
"Growing buffer (batch {}): {}MB -> {}MB ({} bytes for {} bases)",
self.batches_completed + 1,
self.buffer.len() / (1024 * 1024),
new_size / (1024 * 1024),
bytes_this_batch,
self.bases_this_batch
);
self.buffer.resize(new_size, 0);
true
} else {
false
}
}
pub fn capacity(&self) -> usize {
self.buffer.len()
}
pub fn batches_completed(&self) -> usize {
self.batches_completed
}
pub fn total_bytes_read(&self) -> usize {
self.total_bytes_read
}
pub fn chunk_size(&self) -> u64 {
self.chunk_size
}
#[cfg(test)]
pub fn set_buffer(&mut self, buffer: Vec<u8>) {
self.buffer = buffer;
}
fn fill_buf_internal(&mut self) -> io::Result<&[u8]> {
if self.parse_pos < self.buffer_len {
return Ok(&self.buffer[self.parse_pos..self.buffer_len]);
}
self.parse_pos = 0;
self.buffer_len = 0;
let bytes_read = self.inner.read(&mut self.buffer)?;
self.buffer_len = bytes_read;
self.total_bytes_read += bytes_read;
if bytes_read >= self.buffer.len() * 3 / 4 {
let new_size = (self.buffer.len() * 2).min(1024 * 1024 * 1024); if new_size > self.buffer.len() {
log::info!(
"Growing buffer (BufRead): {}MB -> {}MB (read: {}MB)",
self.buffer.len() / (1024 * 1024),
new_size / (1024 * 1024),
bytes_read / (1024 * 1024),
);
self.buffer.resize(new_size, 0);
}
}
Ok(&self.buffer[self.parse_pos..self.buffer_len])
}
}
impl<R: Read> Read for BatchedSamReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let available = self.fill_buf_internal()?;
if available.is_empty() {
return Ok(0);
}
let to_copy = buf.len().min(available.len());
buf[..to_copy].copy_from_slice(&available[..to_copy]);
self.parse_pos += to_copy;
Ok(to_copy)
}
}
impl<R: Read> BufRead for BatchedSamReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.fill_buf_internal()
}
fn consume(&mut self, amt: usize) {
self.parse_pos = (self.parse_pos + amt).min(self.buffer_len);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn make_sam_record(name: &str, seq_len: usize) -> String {
let seq = "A".repeat(seq_len);
let qual = "I".repeat(seq_len);
format!("{name}\t0\t*\t0\t0\t*\t*\t0\t0\t{seq}\t{qual}\n")
}
fn make_read_pair(name: &str, seq_len: usize) -> String {
let r1 = make_sam_record(&format!("{name}/1"), seq_len);
let r2 = make_sam_record(&format!("{name}/2"), seq_len);
format!("{r1}{r2}")
}
#[test]
fn test_basic_reading() {
let data = make_read_pair("read1", 100);
let cursor = Cursor::new(data.as_bytes().to_vec());
let mut reader = BatchedSamReader::new(cursor, 1000);
assert!(reader.fill_buffer().expect("fill_buffer should succeed"));
assert!(!reader.buffer().is_empty());
}
#[test]
fn test_batch_boundary_detection() {
let mut data = String::new();
for i in 0..10 {
data.push_str(&make_read_pair(&format!("read{i}"), 100));
}
let cursor = Cursor::new(data.as_bytes().to_vec());
let mut reader = BatchedSamReader::new(cursor, 500);
reader.fill_buffer().expect("fill_buffer should succeed");
let mut batches = 0;
let mut records = 0;
while !reader.buffer().is_empty() {
if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
let line = &reader.buffer()[..=line_end];
let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
if reader.record_parsed(seq_len, line_end + 1) {
}
records += 1;
if reader.batches_completed() > batches {
batches = reader.batches_completed();
}
} else {
break;
}
}
assert_eq!(records, 20); assert!(batches >= 3); }
#[test]
fn test_buffer_growth_on_large_batch() {
let mut data = String::new();
for i in 0..200 {
data.push_str(&make_read_pair(&format!("read{i}"), 100));
}
let cursor = Cursor::new(data.as_bytes().to_vec());
let mut reader = BatchedSamReader::new(cursor, 50000);
reader.set_buffer(vec![0u8; 1024]);
reader.fill_buffer().expect("fill_buffer should succeed");
let initial_capacity = reader.capacity();
let mut grew = false;
while !reader.buffer().is_empty() {
if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
let seq_len = 100; if reader.record_parsed(seq_len, line_end + 1) {
grew = true;
}
} else if !reader.fill_buffer().expect("fill_buffer should succeed") {
break;
}
}
assert!(reader.capacity() >= initial_capacity);
let _ = grew;
}
#[test]
fn test_never_shrinks() {
let mut data = String::new();
for i in 0..100 {
data.push_str(&make_read_pair(&format!("large{i}"), 150));
}
for i in 0..10 {
data.push_str(&make_read_pair(&format!("small{i}"), 50));
}
let cursor = Cursor::new(data.as_bytes().to_vec());
let mut reader = BatchedSamReader::new(cursor, 10000);
reader.set_buffer(vec![0u8; 4096]);
reader.fill_buffer().expect("fill_buffer should succeed");
let mut max_capacity = reader.capacity();
while !reader.buffer().is_empty() {
if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
let line = &reader.buffer()[..=line_end];
let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
reader.record_parsed(seq_len, line_end + 1);
if reader.capacity() > max_capacity {
max_capacity = reader.capacity();
}
assert!(reader.capacity() >= max_capacity);
} else if !reader.fill_buffer().expect("fill_buffer should succeed") {
break;
}
}
assert!(reader.capacity() >= max_capacity);
}
#[test]
fn test_partial_line_preserved() {
let record = make_sam_record("splitme", 100);
let split_point = record.len() / 2;
let first_half = &record[..split_point];
let second_half = &record[split_point..];
#[allow(clippy::items_after_statements)]
struct PartialReader {
chunks: Vec<Vec<u8>>,
idx: usize,
}
#[allow(clippy::items_after_statements)]
impl Read for PartialReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.idx >= self.chunks.len() {
return Ok(0);
}
let chunk = &self.chunks[self.idx];
let to_copy = std::cmp::min(buf.len(), chunk.len());
buf[..to_copy].copy_from_slice(&chunk[..to_copy]);
self.idx += 1;
Ok(to_copy)
}
}
let partial_reader = PartialReader {
chunks: vec![first_half.as_bytes().to_vec(), second_half.as_bytes().to_vec()],
idx: 0,
};
let mut reader = BatchedSamReader::new(partial_reader, 1000);
reader.set_buffer(vec![0u8; 1024]);
reader.fill_buffer().expect("fill_buffer should succeed");
let mut found_complete = false;
for _ in 0..3 {
if reader.buffer().contains(&b'\n') {
found_complete = true;
break;
}
if !reader.fill_buffer().expect("fill_buffer should succeed") {
break;
}
}
assert!(found_complete, "Should eventually find complete line");
}
#[test]
fn test_proactive_growth() {
let mut data = String::new();
for i in 0..100 {
data.push_str(&make_read_pair(&format!("read{i}"), 100));
}
let cursor = Cursor::new(data.as_bytes().to_vec());
let mut reader = BatchedSamReader::new(cursor, 1_000_000);
reader.set_buffer(vec![0u8; 512]);
let initial = reader.capacity();
reader.fill_buffer().expect("fill_buffer should succeed");
reader.fill_buffer().expect("fill_buffer should succeed");
assert!(
reader.capacity() > initial,
"Buffer should grow proactively: {} vs {}",
reader.capacity(),
initial
);
}
#[test]
fn test_empty_input() {
let cursor = Cursor::new(Vec::<u8>::new());
let mut reader = BatchedSamReader::new(cursor, 1000);
assert!(!reader.fill_buffer().expect("fill_buffer should succeed"));
assert!(reader.buffer().is_empty());
}
#[test]
fn test_header_lines_with_advance() {
let mut data = String::new();
data.push_str("@HD\tVN:1.6\n");
data.push_str("@SQ\tSN:chr1\tLN:1000\n");
data.push_str(&make_read_pair("read1", 100));
let cursor = Cursor::new(data.as_bytes().to_vec());
let mut reader = BatchedSamReader::new(cursor, 1000);
reader.fill_buffer().expect("fill_buffer should succeed");
let mut alignment_records = 0;
while !reader.buffer().is_empty() {
if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
let line = &reader.buffer()[..=line_end];
if line.starts_with(b"@") {
reader.advance(line_end + 1);
} else {
let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
reader.record_parsed(seq_len, line_end + 1);
alignment_records += 1;
}
} else {
break;
}
}
assert_eq!(alignment_records, 2); }
#[test]
fn test_bufread_basic_read() {
let data = b"Hello, World!";
let cursor = Cursor::new(data.to_vec());
let mut reader = BatchedSamReader::new(cursor, 1000);
let mut buf = vec![0u8; 1024];
let n = reader.read(&mut buf).expect("read should succeed");
assert_eq!(n, 13);
assert_eq!(&buf[..n], b"Hello, World!");
}
#[test]
fn test_bufread_fill_buf() {
let data = b"Line 1\nLine 2\nLine 3\n";
let cursor = Cursor::new(data.to_vec());
let mut reader = BatchedSamReader::new(cursor, 1000);
let mut line = String::new();
reader.read_line(&mut line).expect("read_line should succeed");
assert_eq!(line, "Line 1\n");
line.clear();
reader.read_line(&mut line).expect("read_line should succeed");
assert_eq!(line, "Line 2\n");
}
#[test]
fn test_bufread_consume() {
let data = b"ABCDEFGHIJ";
let cursor = Cursor::new(data.to_vec());
let mut reader = BatchedSamReader::new(cursor, 1000);
let buf = reader.fill_buf().expect("fill_buf should succeed");
assert_eq!(buf, b"ABCDEFGHIJ");
reader.consume(5);
let buf = reader.fill_buf().expect("fill_buf should succeed");
assert_eq!(buf, b"FGHIJ");
}
#[test]
fn test_bufread_growth_on_large_batch() {
let data = vec![0xAA; 52 * 1024];
let cursor = Cursor::new(data);
let mut reader = BatchedSamReader::new(cursor, 1000);
reader.set_buffer(vec![0u8; 64 * 1024]);
let mut buf = vec![0u8; 64 * 1024];
let _ = reader.read(&mut buf).expect("read should succeed");
assert_eq!(reader.capacity(), 128 * 1024);
}
#[test]
fn test_bufread_no_growth_on_small_batch() {
let data = vec![0xAA; 16 * 1024];
let cursor = Cursor::new(data);
let mut reader = BatchedSamReader::new(cursor, 1000);
reader.set_buffer(vec![0u8; 64 * 1024]);
let mut buf = vec![0u8; 64 * 1024];
let _ = reader.read(&mut buf).expect("read should succeed");
assert_eq!(reader.capacity(), 64 * 1024);
}
#[test]
fn test_bufread_empty_input() {
let data: Vec<u8> = vec![];
let cursor = Cursor::new(data);
let mut reader = BatchedSamReader::new(cursor, 1000);
let mut buf = vec![0u8; 1024];
let n = reader.read(&mut buf).expect("read should succeed");
assert_eq!(n, 0);
}
}