1use std::io::{self, BufRead, BufReader, Read};
28
29const READ_CHUNK_SIZE: usize = 64 * 1024 * 1024; const INITIAL_BUFFER_SIZE: usize = 64 * 1024 * 1024; const GROWTH_FACTOR: f64 = 1.25;
32
33pub struct BatchedSamReader<R: Read> {
38 inner: BufReader<R>,
39 buffer: Vec<u8>,
40 buffer_len: usize, parse_pos: usize, chunk_size: u64, bases_this_batch: u64,
46 records_this_batch: usize,
47 bytes_at_batch_start: usize,
48
49 batches_completed: usize,
51 total_bytes_read: usize,
52}
53
54impl<R: Read> BatchedSamReader<R> {
55 pub fn new(reader: R, chunk_size: u64) -> Self {
57 Self {
58 inner: BufReader::with_capacity(READ_CHUNK_SIZE, reader),
59 buffer: vec![0u8; INITIAL_BUFFER_SIZE],
60 buffer_len: 0,
61 parse_pos: 0,
62 chunk_size,
63 bases_this_batch: 0,
64 records_this_batch: 0,
65 bytes_at_batch_start: 0,
66 batches_completed: 0,
67 total_bytes_read: 0,
68 }
69 }
70
71 pub fn fill_buffer(&mut self) -> io::Result<bool> {
82 let unparsed_len = self.buffer_len.saturating_sub(self.parse_pos);
83
84 if unparsed_len > self.buffer.len() / 2 {
86 let new_size = self.buffer.len() * 2;
87 log::info!(
88 "Growing buffer (proactive): {}MB -> {}MB (unparsed: {}MB)",
89 self.buffer.len() / (1024 * 1024),
90 new_size / (1024 * 1024),
91 unparsed_len / (1024 * 1024)
92 );
93 self.buffer.resize(new_size, 0);
94 }
95
96 if self.parse_pos > 0 && unparsed_len > 0 {
98 self.buffer.copy_within(self.parse_pos..self.buffer_len, 0);
99 }
100 self.buffer_len = unparsed_len;
101 self.bytes_at_batch_start = self.bytes_at_batch_start.saturating_sub(self.parse_pos);
102 self.parse_pos = 0;
103
104 while self.buffer_len < self.buffer.len() {
106 let space = self.buffer.len() - self.buffer_len;
107 let to_read = std::cmp::min(space, READ_CHUNK_SIZE);
108
109 let n =
110 self.inner.read(&mut self.buffer[self.buffer_len..self.buffer_len + to_read])?;
111 if n == 0 {
112 return Ok(self.buffer_len > 0);
114 }
115 self.buffer_len += n;
116 self.total_bytes_read += n;
117 }
118 Ok(true)
119 }
120
121 pub fn buffer(&self) -> &[u8] {
123 &self.buffer[self.parse_pos..self.buffer_len]
124 }
125
126 fn at_batch_boundary(&self) -> bool {
128 self.bases_this_batch >= self.chunk_size && self.records_this_batch.is_multiple_of(2)
129 }
130
131 pub fn record_parsed(&mut self, seq_len: usize, bytes_consumed: usize) -> bool {
138 self.bases_this_batch += seq_len as u64;
139 self.records_this_batch += 1;
140 self.parse_pos += bytes_consumed;
141
142 if self.at_batch_boundary() {
143 let bytes_this_batch = self.parse_pos - self.bytes_at_batch_start;
144 let grew = self.maybe_grow(bytes_this_batch);
145
146 self.batches_completed += 1;
148 self.bases_this_batch = 0;
149 self.records_this_batch = 0;
150 self.bytes_at_batch_start = self.parse_pos;
151
152 grew
153 } else {
154 false
155 }
156 }
157
158 pub fn advance(&mut self, bytes: usize) {
160 self.parse_pos += bytes;
161 }
162
163 #[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss, clippy::cast_sign_loss)]
165 fn maybe_grow(&mut self, bytes_this_batch: usize) -> bool {
166 let needed = ((bytes_this_batch as f64) * GROWTH_FACTOR) as usize;
167 if needed > self.buffer.len() {
168 let new_size = needed.next_power_of_two();
170 log::info!(
171 "Growing buffer (batch {}): {}MB -> {}MB ({} bytes for {} bases)",
172 self.batches_completed + 1,
173 self.buffer.len() / (1024 * 1024),
174 new_size / (1024 * 1024),
175 bytes_this_batch,
176 self.bases_this_batch
177 );
178 self.buffer.resize(new_size, 0);
179 true
180 } else {
181 false
182 }
183 }
184
185 pub fn capacity(&self) -> usize {
187 self.buffer.len()
188 }
189
190 pub fn batches_completed(&self) -> usize {
192 self.batches_completed
193 }
194
195 pub fn total_bytes_read(&self) -> usize {
197 self.total_bytes_read
198 }
199
200 pub fn chunk_size(&self) -> u64 {
202 self.chunk_size
203 }
204
205 #[cfg(test)]
207 pub fn set_buffer(&mut self, buffer: Vec<u8>) {
208 self.buffer = buffer;
209 }
210
211 fn fill_buf_internal(&mut self) -> io::Result<&[u8]> {
216 if self.parse_pos < self.buffer_len {
218 return Ok(&self.buffer[self.parse_pos..self.buffer_len]);
219 }
220
221 self.parse_pos = 0;
223 self.buffer_len = 0;
224
225 let bytes_read = self.inner.read(&mut self.buffer)?;
227 self.buffer_len = bytes_read;
228 self.total_bytes_read += bytes_read;
229
230 if bytes_read >= self.buffer.len() * 3 / 4 {
232 let new_size = (self.buffer.len() * 2).min(1024 * 1024 * 1024); if new_size > self.buffer.len() {
234 log::info!(
235 "Growing buffer (BufRead): {}MB -> {}MB (read: {}MB)",
236 self.buffer.len() / (1024 * 1024),
237 new_size / (1024 * 1024),
238 bytes_read / (1024 * 1024),
239 );
240 self.buffer.resize(new_size, 0);
241 }
242 }
243
244 Ok(&self.buffer[self.parse_pos..self.buffer_len])
245 }
246}
247
248impl<R: Read> Read for BatchedSamReader<R> {
250 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251 let available = self.fill_buf_internal()?;
252 if available.is_empty() {
253 return Ok(0);
254 }
255 let to_copy = buf.len().min(available.len());
256 buf[..to_copy].copy_from_slice(&available[..to_copy]);
257 self.parse_pos += to_copy;
258 Ok(to_copy)
259 }
260}
261
262impl<R: Read> BufRead for BatchedSamReader<R> {
268 fn fill_buf(&mut self) -> io::Result<&[u8]> {
269 self.fill_buf_internal()
270 }
271
272 fn consume(&mut self, amt: usize) {
273 self.parse_pos = (self.parse_pos + amt).min(self.buffer_len);
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use std::io::Cursor;
281
282 fn make_sam_record(name: &str, seq_len: usize) -> String {
284 let seq = "A".repeat(seq_len);
285 let qual = "I".repeat(seq_len);
286 format!("{name}\t0\t*\t0\t0\t*\t*\t0\t0\t{seq}\t{qual}\n")
287 }
288
289 fn make_read_pair(name: &str, seq_len: usize) -> String {
291 let r1 = make_sam_record(&format!("{name}/1"), seq_len);
292 let r2 = make_sam_record(&format!("{name}/2"), seq_len);
293 format!("{r1}{r2}")
294 }
295
296 #[test]
297 fn test_basic_reading() {
298 let data = make_read_pair("read1", 100);
299 let cursor = Cursor::new(data.as_bytes().to_vec());
300
301 let mut reader = BatchedSamReader::new(cursor, 1000);
302 assert!(reader.fill_buffer().unwrap());
303 assert!(!reader.buffer().is_empty());
304 }
305
306 #[test]
307 fn test_batch_boundary_detection() {
308 let mut data = String::new();
311 for i in 0..10 {
312 data.push_str(&make_read_pair(&format!("read{i}"), 100));
313 }
314
315 let cursor = Cursor::new(data.as_bytes().to_vec());
316 let mut reader = BatchedSamReader::new(cursor, 500);
317
318 reader.fill_buffer().unwrap();
319
320 let mut batches = 0;
321 let mut records = 0;
322
323 while !reader.buffer().is_empty() {
325 if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
326 let line = &reader.buffer()[..=line_end];
327 let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
329
330 if reader.record_parsed(seq_len, line_end + 1) {
331 }
333 records += 1;
334
335 if reader.batches_completed() > batches {
336 batches = reader.batches_completed();
337 }
338 } else {
339 break;
340 }
341 }
342
343 assert_eq!(records, 20); assert!(batches >= 3); }
346
347 #[test]
348 fn test_buffer_growth_on_large_batch() {
349 let mut data = String::new();
352 for i in 0..200 {
354 data.push_str(&make_read_pair(&format!("read{i}"), 100));
355 }
356
357 let cursor = Cursor::new(data.as_bytes().to_vec());
358
359 let mut reader = BatchedSamReader::new(cursor, 50000); reader.set_buffer(vec![0u8; 1024]); reader.fill_buffer().unwrap();
366 let initial_capacity = reader.capacity();
367
368 let mut grew = false;
370 while !reader.buffer().is_empty() {
371 if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
372 let seq_len = 100; if reader.record_parsed(seq_len, line_end + 1) {
374 grew = true;
375 }
376 } else if !reader.fill_buffer().unwrap() {
377 break;
378 }
379 }
380
381 assert!(reader.capacity() >= initial_capacity);
383 let _ = grew;
385 }
386
387 #[test]
388 fn test_never_shrinks() {
389 let mut data = String::new();
391
392 for i in 0..100 {
394 data.push_str(&make_read_pair(&format!("large{i}"), 150));
395 }
396
397 for i in 0..10 {
399 data.push_str(&make_read_pair(&format!("small{i}"), 50));
400 }
401
402 let cursor = Cursor::new(data.as_bytes().to_vec());
403 let mut reader = BatchedSamReader::new(cursor, 10000);
404
405 reader.set_buffer(vec![0u8; 4096]);
407
408 reader.fill_buffer().unwrap();
409
410 let mut max_capacity = reader.capacity();
411
412 while !reader.buffer().is_empty() {
413 if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
414 let line = &reader.buffer()[..=line_end];
415 let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
416
417 reader.record_parsed(seq_len, line_end + 1);
418
419 if reader.capacity() > max_capacity {
421 max_capacity = reader.capacity();
422 }
423
424 assert!(reader.capacity() >= max_capacity);
426 } else if !reader.fill_buffer().unwrap() {
427 break;
428 }
429 }
430
431 assert!(reader.capacity() >= max_capacity);
433 }
434
435 #[test]
436 fn test_partial_line_preserved() {
437 let record = make_sam_record("splitme", 100);
439
440 let split_point = record.len() / 2;
442 let first_half = &record[..split_point];
443 let second_half = &record[split_point..];
444
445 #[allow(clippy::items_after_statements)]
447 struct PartialReader {
448 chunks: Vec<Vec<u8>>,
449 idx: usize,
450 }
451
452 #[allow(clippy::items_after_statements)]
453 impl Read for PartialReader {
454 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
455 if self.idx >= self.chunks.len() {
456 return Ok(0);
457 }
458 let chunk = &self.chunks[self.idx];
459 let to_copy = std::cmp::min(buf.len(), chunk.len());
460 buf[..to_copy].copy_from_slice(&chunk[..to_copy]);
461 self.idx += 1;
462 Ok(to_copy)
463 }
464 }
465
466 let partial_reader = PartialReader {
467 chunks: vec![first_half.as_bytes().to_vec(), second_half.as_bytes().to_vec()],
468 idx: 0,
469 };
470
471 let mut reader = BatchedSamReader::new(partial_reader, 1000);
472 reader.set_buffer(vec![0u8; 1024]); reader.fill_buffer().unwrap();
476
477 let mut found_complete = false;
480 for _ in 0..3 {
481 if reader.buffer().contains(&b'\n') {
482 found_complete = true;
483 break;
484 }
485 if !reader.fill_buffer().unwrap() {
486 break;
487 }
488 }
489
490 assert!(found_complete, "Should eventually find complete line");
491 }
492
493 #[test]
494 fn test_proactive_growth() {
495 let mut data = String::new();
497 for i in 0..100 {
498 data.push_str(&make_read_pair(&format!("read{i}"), 100));
499 }
500
501 let cursor = Cursor::new(data.as_bytes().to_vec());
502 let mut reader = BatchedSamReader::new(cursor, 1_000_000); reader.set_buffer(vec![0u8; 512]);
506 let initial = reader.capacity();
507
508 reader.fill_buffer().unwrap();
509
510 reader.fill_buffer().unwrap();
513
514 assert!(
516 reader.capacity() > initial,
517 "Buffer should grow proactively: {} vs {}",
518 reader.capacity(),
519 initial
520 );
521 }
522
523 #[test]
524 fn test_empty_input() {
525 let cursor = Cursor::new(Vec::<u8>::new());
526 let mut reader = BatchedSamReader::new(cursor, 1000);
527
528 assert!(!reader.fill_buffer().unwrap());
530 assert!(reader.buffer().is_empty());
531 }
532
533 #[test]
534 fn test_header_lines_with_advance() {
535 let mut data = String::new();
536 data.push_str("@HD\tVN:1.6\n");
537 data.push_str("@SQ\tSN:chr1\tLN:1000\n");
538 data.push_str(&make_read_pair("read1", 100));
539
540 let cursor = Cursor::new(data.as_bytes().to_vec());
541 let mut reader = BatchedSamReader::new(cursor, 1000);
542
543 reader.fill_buffer().unwrap();
544
545 let mut alignment_records = 0;
546
547 while !reader.buffer().is_empty() {
548 if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
549 let line = &reader.buffer()[..=line_end];
550
551 if line.starts_with(b"@") {
552 reader.advance(line_end + 1);
554 } else {
555 let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
557 reader.record_parsed(seq_len, line_end + 1);
558 alignment_records += 1;
559 }
560 } else {
561 break;
562 }
563 }
564
565 assert_eq!(alignment_records, 2); }
567
568 #[test]
571 fn test_bufread_basic_read() {
572 let data = b"Hello, World!";
573 let cursor = Cursor::new(data.to_vec());
574
575 let mut reader = BatchedSamReader::new(cursor, 1000);
576 let mut buf = vec![0u8; 1024];
577
578 let n = reader.read(&mut buf).unwrap();
579 assert_eq!(n, 13);
580 assert_eq!(&buf[..n], b"Hello, World!");
581 }
582
583 #[test]
584 fn test_bufread_fill_buf() {
585 let data = b"Line 1\nLine 2\nLine 3\n";
586 let cursor = Cursor::new(data.to_vec());
587
588 let mut reader = BatchedSamReader::new(cursor, 1000);
589
590 let mut line = String::new();
592 reader.read_line(&mut line).unwrap();
593 assert_eq!(line, "Line 1\n");
594
595 line.clear();
596 reader.read_line(&mut line).unwrap();
597 assert_eq!(line, "Line 2\n");
598 }
599
600 #[test]
601 fn test_bufread_consume() {
602 let data = b"ABCDEFGHIJ";
603 let cursor = Cursor::new(data.to_vec());
604
605 let mut reader = BatchedSamReader::new(cursor, 1000);
606
607 let buf = reader.fill_buf().unwrap();
609 assert_eq!(buf, b"ABCDEFGHIJ");
610
611 reader.consume(5);
613
614 let buf = reader.fill_buf().unwrap();
616 assert_eq!(buf, b"FGHIJ");
617 }
618
619 #[test]
620 fn test_bufread_growth_on_large_batch() {
621 let data = vec![0xAA; 52 * 1024];
623 let cursor = Cursor::new(data);
624
625 let mut reader = BatchedSamReader::new(cursor, 1000);
626 reader.set_buffer(vec![0u8; 64 * 1024]); let mut buf = vec![0u8; 64 * 1024];
629 let _ = reader.read(&mut buf).unwrap();
630
631 assert_eq!(reader.capacity(), 128 * 1024);
633 }
634
635 #[test]
636 fn test_bufread_no_growth_on_small_batch() {
637 let data = vec![0xAA; 16 * 1024];
639 let cursor = Cursor::new(data);
640
641 let mut reader = BatchedSamReader::new(cursor, 1000);
642 reader.set_buffer(vec![0u8; 64 * 1024]); let mut buf = vec![0u8; 64 * 1024];
645 let _ = reader.read(&mut buf).unwrap();
646
647 assert_eq!(reader.capacity(), 64 * 1024);
649 }
650
651 #[test]
652 fn test_bufread_empty_input() {
653 let data: Vec<u8> = vec![];
654 let cursor = Cursor::new(data);
655
656 let mut reader = BatchedSamReader::new(cursor, 1000);
657 let mut buf = vec![0u8; 1024];
658
659 let n = reader.read(&mut buf).unwrap();
660 assert_eq!(n, 0);
661 }
662}