Skip to main content

fgumi_lib/
batched_sam_reader.rs

1//! Adaptive buffered SAM reader that grows based on observed batch sizes.
2//!
3//! # Motivation
4//!
5//! When piping SAM output from `bwa mem` via stdin, the output arrives in batches
6//! determined by bwa's `-K` parameter (chunk size in bases). Each batch contains
7//! reads totaling at least `-K` bases, with an even number of records to keep
8//! read pairs together. The batch sizes in bytes vary significantly based on
9//! read length, alignment complexity, and optional tag content.
10//!
11//! A fixed-size buffer is problematic:
12//! - Too small: causes excessive I/O syscalls and potential stalls waiting for data
13//! - Too large: wastes memory, especially for small input files
14//!
15//! This reader solves the problem by starting with a reasonable initial buffer (64MB)
16//! and growing adaptively based on observed batch sizes. It never shrinks, avoiding
17//! repeated allocations when batch sizes fluctuate.
18//!
19//! # How it works
20//!
21//! The reader tracks bwa's batch boundaries by counting bases and records. When a
22//! batch boundary is detected (bases >= `chunk_size` AND record count is even), it
23//! measures the batch's byte size and grows the buffer if needed. This ensures the
24//! buffer can always hold at least one complete batch, enabling efficient streaming
25//! without blocking on partial batches.
26
27use std::io::{self, BufRead, BufReader, Read};
28
29const READ_CHUNK_SIZE: usize = 64 * 1024 * 1024; // 64MB I/O chunks
30const INITIAL_BUFFER_SIZE: usize = 64 * 1024 * 1024; // 64MB initial buffer
31const GROWTH_FACTOR: f64 = 1.25;
32
33/// Adaptive buffered reader for SAM input from bwa mem.
34///
35/// Reads bytes into a growable buffer, tracks batch boundaries like bwa,
36/// and grows the buffer based on observed batch sizes. Never shrinks.
37pub struct BatchedSamReader<R: Read> {
38    inner: BufReader<R>,
39    buffer: Vec<u8>,
40    buffer_len: usize, // Valid bytes in buffer
41    parse_pos: usize,  // Current parse position
42
43    // Batch tracking (mirrors bwa logic)
44    chunk_size: u64, // BWA -K value
45    bases_this_batch: u64,
46    records_this_batch: usize,
47    bytes_at_batch_start: usize,
48
49    // Statistics
50    batches_completed: usize,
51    total_bytes_read: usize,
52}
53
54impl<R: Read> BatchedSamReader<R> {
55    /// Create a new reader with the given bwa chunk size (-K value).
56    pub fn new(reader: R, chunk_size: u64) -> Self {
57        Self {
58            inner: BufReader::with_capacity(READ_CHUNK_SIZE, reader),
59            buffer: vec![0u8; INITIAL_BUFFER_SIZE],
60            buffer_len: 0,
61            parse_pos: 0,
62            chunk_size,
63            bases_this_batch: 0,
64            records_this_batch: 0,
65            bytes_at_batch_start: 0,
66            batches_completed: 0,
67            total_bytes_read: 0,
68        }
69    }
70
71    /// Fill buffer by reading chunks until buffer is full or EOF.
72    ///
73    /// Handles edge cases:
74    /// - Grows buffer proactively if unparsed data > 50% of capacity
75    /// - Preserves partial records at end of buffer
76    /// - Never loses data
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the underlying reader fails.
81    pub fn fill_buffer(&mut self) -> io::Result<bool> {
82        let unparsed_len = self.buffer_len.saturating_sub(self.parse_pos);
83
84        // Proactive growth: if unparsed data > 50% of buffer, grow first
85        if unparsed_len > self.buffer.len() / 2 {
86            let new_size = self.buffer.len() * 2;
87            log::info!(
88                "Growing buffer (proactive): {}MB -> {}MB (unparsed: {}MB)",
89                self.buffer.len() / (1024 * 1024),
90                new_size / (1024 * 1024),
91                unparsed_len / (1024 * 1024)
92            );
93            self.buffer.resize(new_size, 0);
94        }
95
96        // Compact: move unparsed data to front
97        if self.parse_pos > 0 && unparsed_len > 0 {
98            self.buffer.copy_within(self.parse_pos..self.buffer_len, 0);
99        }
100        self.buffer_len = unparsed_len;
101        self.bytes_at_batch_start = self.bytes_at_batch_start.saturating_sub(self.parse_pos);
102        self.parse_pos = 0;
103
104        // Read chunks until buffer is full or EOF
105        while self.buffer_len < self.buffer.len() {
106            let space = self.buffer.len() - self.buffer_len;
107            let to_read = std::cmp::min(space, READ_CHUNK_SIZE);
108
109            let n =
110                self.inner.read(&mut self.buffer[self.buffer_len..self.buffer_len + to_read])?;
111            if n == 0 {
112                // EOF - return true if there's still data to parse
113                return Ok(self.buffer_len > 0);
114            }
115            self.buffer_len += n;
116            self.total_bytes_read += n;
117        }
118        Ok(true)
119    }
120
121    /// Get current buffer slice for parsing.
122    pub fn buffer(&self) -> &[u8] {
123        &self.buffer[self.parse_pos..self.buffer_len]
124    }
125
126    /// Check if we've hit a batch boundary (like bwa).
127    fn at_batch_boundary(&self) -> bool {
128        self.bases_this_batch >= self.chunk_size && self.records_this_batch.is_multiple_of(2)
129    }
130
131    /// Called after parsing a SAM record.
132    ///
133    /// Tracks bases and records for batch detection. When a batch boundary
134    /// is reached, may grow the buffer based on observed batch size.
135    ///
136    /// Returns true if the buffer was grown.
137    pub fn record_parsed(&mut self, seq_len: usize, bytes_consumed: usize) -> bool {
138        self.bases_this_batch += seq_len as u64;
139        self.records_this_batch += 1;
140        self.parse_pos += bytes_consumed;
141
142        if self.at_batch_boundary() {
143            let bytes_this_batch = self.parse_pos - self.bytes_at_batch_start;
144            let grew = self.maybe_grow(bytes_this_batch);
145
146            // Reset for next batch
147            self.batches_completed += 1;
148            self.bases_this_batch = 0;
149            self.records_this_batch = 0;
150            self.bytes_at_batch_start = self.parse_pos;
151
152            grew
153        } else {
154            false
155        }
156    }
157
158    /// Advance parse position without tracking as a record (e.g., for headers).
159    pub fn advance(&mut self, bytes: usize) {
160        self.parse_pos += bytes;
161    }
162
163    /// Grow buffer if needed based on batch size. Never shrinks.
164    #[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss, clippy::cast_sign_loss)]
165    fn maybe_grow(&mut self, bytes_this_batch: usize) -> bool {
166        let needed = ((bytes_this_batch as f64) * GROWTH_FACTOR) as usize;
167        if needed > self.buffer.len() {
168            // Round up to next power of 2 for efficiency
169            let new_size = needed.next_power_of_two();
170            log::info!(
171                "Growing buffer (batch {}): {}MB -> {}MB ({} bytes for {} bases)",
172                self.batches_completed + 1,
173                self.buffer.len() / (1024 * 1024),
174                new_size / (1024 * 1024),
175                bytes_this_batch,
176                self.bases_this_batch
177            );
178            self.buffer.resize(new_size, 0);
179            true
180        } else {
181            false
182        }
183    }
184
185    /// Get current buffer capacity.
186    pub fn capacity(&self) -> usize {
187        self.buffer.len()
188    }
189
190    /// Get number of completed batches.
191    pub fn batches_completed(&self) -> usize {
192        self.batches_completed
193    }
194
195    /// Get total bytes read from input.
196    pub fn total_bytes_read(&self) -> usize {
197        self.total_bytes_read
198    }
199
200    /// Get the configured chunk size (bwa -K value).
201    pub fn chunk_size(&self) -> u64 {
202        self.chunk_size
203    }
204
205    /// Set the buffer for testing purposes.
206    #[cfg(test)]
207    pub fn set_buffer(&mut self, buffer: Vec<u8>) {
208        self.buffer = buffer;
209    }
210
211    /// Internal `fill_buf` that handles buffer management for `BufRead`.
212    ///
213    /// When used via `BufRead`, we can't track SAM record boundaries,
214    /// but we still grow the buffer proactively when it's nearly full.
215    fn fill_buf_internal(&mut self) -> io::Result<&[u8]> {
216        // Return existing data if available
217        if self.parse_pos < self.buffer_len {
218            return Ok(&self.buffer[self.parse_pos..self.buffer_len]);
219        }
220
221        // Compact and refill
222        self.parse_pos = 0;
223        self.buffer_len = 0;
224
225        // Read new data
226        let bytes_read = self.inner.read(&mut self.buffer)?;
227        self.buffer_len = bytes_read;
228        self.total_bytes_read += bytes_read;
229
230        // Proactive growth: if buffer was mostly filled (>75%), grow for next time
231        if bytes_read >= self.buffer.len() * 3 / 4 {
232            let new_size = (self.buffer.len() * 2).min(1024 * 1024 * 1024); // Cap at 1GB
233            if new_size > self.buffer.len() {
234                log::info!(
235                    "Growing buffer (BufRead): {}MB -> {}MB (read: {}MB)",
236                    self.buffer.len() / (1024 * 1024),
237                    new_size / (1024 * 1024),
238                    bytes_read / (1024 * 1024),
239                );
240                self.buffer.resize(new_size, 0);
241            }
242        }
243
244        Ok(&self.buffer[self.parse_pos..self.buffer_len])
245    }
246}
247
248/// Implement `Read` trait for compatibility with standard I/O.
249impl<R: Read> Read for BatchedSamReader<R> {
250    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251        let available = self.fill_buf_internal()?;
252        if available.is_empty() {
253            return Ok(0);
254        }
255        let to_copy = buf.len().min(available.len());
256        buf[..to_copy].copy_from_slice(&available[..to_copy]);
257        self.parse_pos += to_copy;
258        Ok(to_copy)
259    }
260}
261
262/// Implement `BufRead` trait for use with SAM readers (e.g., noodles).
263///
264/// When used via `BufRead`, batch tracking is not active since we don't
265/// have visibility into SAM record boundaries. The buffer still grows
266/// proactively based on fill ratio, similar to `GrowableReader`.
267impl<R: Read> BufRead for BatchedSamReader<R> {
268    fn fill_buf(&mut self) -> io::Result<&[u8]> {
269        self.fill_buf_internal()
270    }
271
272    fn consume(&mut self, amt: usize) {
273        self.parse_pos = (self.parse_pos + amt).min(self.buffer_len);
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use std::io::Cursor;
281
282    /// Create a fake SAM record line with given sequence length.
283    fn make_sam_record(name: &str, seq_len: usize) -> String {
284        let seq = "A".repeat(seq_len);
285        let qual = "I".repeat(seq_len);
286        format!("{name}\t0\t*\t0\t0\t*\t*\t0\t0\t{seq}\t{qual}\n")
287    }
288
289    /// Create paired SAM records (R1 and R2).
290    fn make_read_pair(name: &str, seq_len: usize) -> String {
291        let r1 = make_sam_record(&format!("{name}/1"), seq_len);
292        let r2 = make_sam_record(&format!("{name}/2"), seq_len);
293        format!("{r1}{r2}")
294    }
295
296    #[test]
297    fn test_basic_reading() {
298        let data = make_read_pair("read1", 100);
299        let cursor = Cursor::new(data.as_bytes().to_vec());
300
301        let mut reader = BatchedSamReader::new(cursor, 1000);
302        assert!(reader.fill_buffer().unwrap());
303        assert!(!reader.buffer().is_empty());
304    }
305
306    #[test]
307    fn test_batch_boundary_detection() {
308        // Create 10 pairs of 100bp reads = 2000 bases total
309        // With chunk_size=500, should hit boundary after ~3 pairs (600 bases)
310        let mut data = String::new();
311        for i in 0..10 {
312            data.push_str(&make_read_pair(&format!("read{i}"), 100));
313        }
314
315        let cursor = Cursor::new(data.as_bytes().to_vec());
316        let mut reader = BatchedSamReader::new(cursor, 500);
317
318        reader.fill_buffer().unwrap();
319
320        let mut batches = 0;
321        let mut records = 0;
322
323        // Parse records and count batches
324        while !reader.buffer().is_empty() {
325            if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
326                let line = &reader.buffer()[..=line_end];
327                // Simple seq length extraction (field 10)
328                let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
329
330                if reader.record_parsed(seq_len, line_end + 1) {
331                    // Buffer grew - this shouldn't happen with small data
332                }
333                records += 1;
334
335                if reader.batches_completed() > batches {
336                    batches = reader.batches_completed();
337                }
338            } else {
339                break;
340            }
341        }
342
343        assert_eq!(records, 20); // 10 pairs = 20 records
344        assert!(batches >= 3); // 2000 bases / 500 chunk = 4 batches (but pairs must be even)
345    }
346
347    #[test]
348    fn test_buffer_growth_on_large_batch() {
349        // Create data larger than initial 64MB buffer equivalent
350        // Use small initial buffer for testing
351        let mut data = String::new();
352        // Each pair is ~250 bytes, create enough for 100KB
353        for i in 0..200 {
354            data.push_str(&make_read_pair(&format!("read{i}"), 100));
355        }
356
357        let cursor = Cursor::new(data.as_bytes().to_vec());
358
359        // Use tiny initial buffer to force growth
360        let mut reader = BatchedSamReader::new(cursor, 50000); // 50K bases per batch
361
362        // Manually set small initial buffer for testing
363        reader.set_buffer(vec![0u8; 1024]); // 1KB initial
364
365        reader.fill_buffer().unwrap();
366        let initial_capacity = reader.capacity();
367
368        // Parse until we hit a batch boundary that triggers growth
369        let mut grew = false;
370        while !reader.buffer().is_empty() {
371            if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
372                let seq_len = 100; // We know our records have 100bp
373                if reader.record_parsed(seq_len, line_end + 1) {
374                    grew = true;
375                }
376            } else if !reader.fill_buffer().unwrap() {
377                break;
378            }
379        }
380
381        // Buffer should have grown
382        assert!(reader.capacity() >= initial_capacity);
383        // Note: grew might not be true if proactive growth happened instead
384        let _ = grew;
385    }
386
387    #[test]
388    fn test_never_shrinks() {
389        // First batch large, second batch small - buffer should not shrink
390        let mut data = String::new();
391
392        // Large batch: 100 pairs of 150bp = 30000 bases
393        for i in 0..100 {
394            data.push_str(&make_read_pair(&format!("large{i}"), 150));
395        }
396
397        // Small batch: 10 pairs of 50bp = 1000 bases
398        for i in 0..10 {
399            data.push_str(&make_read_pair(&format!("small{i}"), 50));
400        }
401
402        let cursor = Cursor::new(data.as_bytes().to_vec());
403        let mut reader = BatchedSamReader::new(cursor, 10000);
404
405        // Small initial buffer
406        reader.set_buffer(vec![0u8; 4096]);
407
408        reader.fill_buffer().unwrap();
409
410        let mut max_capacity = reader.capacity();
411
412        while !reader.buffer().is_empty() {
413            if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
414                let line = &reader.buffer()[..=line_end];
415                let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
416
417                reader.record_parsed(seq_len, line_end + 1);
418
419                // Track max capacity
420                if reader.capacity() > max_capacity {
421                    max_capacity = reader.capacity();
422                }
423
424                // Verify never shrinks
425                assert!(reader.capacity() >= max_capacity);
426            } else if !reader.fill_buffer().unwrap() {
427                break;
428            }
429        }
430
431        // Final capacity should be >= max seen
432        assert!(reader.capacity() >= max_capacity);
433    }
434
435    #[test]
436    fn test_partial_line_preserved() {
437        // Create data where a line will be split across buffer boundaries
438        let record = make_sam_record("splitme", 100);
439
440        // Put half of record at end of "first chunk"
441        let split_point = record.len() / 2;
442        let first_half = &record[..split_point];
443        let second_half = &record[split_point..];
444
445        // We need to simulate partial reads, so use a custom reader
446        #[allow(clippy::items_after_statements)]
447        struct PartialReader {
448            chunks: Vec<Vec<u8>>,
449            idx: usize,
450        }
451
452        #[allow(clippy::items_after_statements)]
453        impl Read for PartialReader {
454            fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
455                if self.idx >= self.chunks.len() {
456                    return Ok(0);
457                }
458                let chunk = &self.chunks[self.idx];
459                let to_copy = std::cmp::min(buf.len(), chunk.len());
460                buf[..to_copy].copy_from_slice(&chunk[..to_copy]);
461                self.idx += 1;
462                Ok(to_copy)
463            }
464        }
465
466        let partial_reader = PartialReader {
467            chunks: vec![first_half.as_bytes().to_vec(), second_half.as_bytes().to_vec()],
468            idx: 0,
469        };
470
471        let mut reader = BatchedSamReader::new(partial_reader, 1000);
472        reader.set_buffer(vec![0u8; 1024]); // Small buffer
473
474        // First fill gets partial line
475        reader.fill_buffer().unwrap();
476
477        // Should not find complete line yet if buffer is small enough
478        // After second fill, should have complete line
479        let mut found_complete = false;
480        for _ in 0..3 {
481            if reader.buffer().contains(&b'\n') {
482                found_complete = true;
483                break;
484            }
485            if !reader.fill_buffer().unwrap() {
486                break;
487            }
488        }
489
490        assert!(found_complete, "Should eventually find complete line");
491    }
492
493    #[test]
494    fn test_proactive_growth() {
495        // Test that buffer grows proactively when unparsed > 50%
496        let mut data = String::new();
497        for i in 0..100 {
498            data.push_str(&make_read_pair(&format!("read{i}"), 100));
499        }
500
501        let cursor = Cursor::new(data.as_bytes().to_vec());
502        let mut reader = BatchedSamReader::new(cursor, 1_000_000); // Large chunk to avoid batch triggers
503
504        // Tiny buffer to force proactive growth
505        reader.set_buffer(vec![0u8; 512]);
506        let initial = reader.capacity();
507
508        reader.fill_buffer().unwrap();
509
510        // Don't parse anything - just refill
511        // Second fill should trigger proactive growth if data > 50% of buffer
512        reader.fill_buffer().unwrap();
513
514        // Buffer should have grown proactively
515        assert!(
516            reader.capacity() > initial,
517            "Buffer should grow proactively: {} vs {}",
518            reader.capacity(),
519            initial
520        );
521    }
522
523    #[test]
524    fn test_empty_input() {
525        let cursor = Cursor::new(Vec::<u8>::new());
526        let mut reader = BatchedSamReader::new(cursor, 1000);
527
528        // Should return false (no data)
529        assert!(!reader.fill_buffer().unwrap());
530        assert!(reader.buffer().is_empty());
531    }
532
533    #[test]
534    fn test_header_lines_with_advance() {
535        let mut data = String::new();
536        data.push_str("@HD\tVN:1.6\n");
537        data.push_str("@SQ\tSN:chr1\tLN:1000\n");
538        data.push_str(&make_read_pair("read1", 100));
539
540        let cursor = Cursor::new(data.as_bytes().to_vec());
541        let mut reader = BatchedSamReader::new(cursor, 1000);
542
543        reader.fill_buffer().unwrap();
544
545        let mut alignment_records = 0;
546
547        while !reader.buffer().is_empty() {
548            if let Some(line_end) = reader.buffer().iter().position(|&b| b == b'\n') {
549                let line = &reader.buffer()[..=line_end];
550
551                if line.starts_with(b"@") {
552                    // Header line - advance without counting as record
553                    reader.advance(line_end + 1);
554                } else {
555                    // Alignment record
556                    let seq_len = line.split(|&b| b == b'\t').nth(9).map_or(0, <[u8]>::len);
557                    reader.record_parsed(seq_len, line_end + 1);
558                    alignment_records += 1;
559                }
560            } else {
561                break;
562            }
563        }
564
565        assert_eq!(alignment_records, 2); // One pair
566    }
567
568    // Tests for BufRead implementation
569
570    #[test]
571    fn test_bufread_basic_read() {
572        let data = b"Hello, World!";
573        let cursor = Cursor::new(data.to_vec());
574
575        let mut reader = BatchedSamReader::new(cursor, 1000);
576        let mut buf = vec![0u8; 1024];
577
578        let n = reader.read(&mut buf).unwrap();
579        assert_eq!(n, 13);
580        assert_eq!(&buf[..n], b"Hello, World!");
581    }
582
583    #[test]
584    fn test_bufread_fill_buf() {
585        let data = b"Line 1\nLine 2\nLine 3\n";
586        let cursor = Cursor::new(data.to_vec());
587
588        let mut reader = BatchedSamReader::new(cursor, 1000);
589
590        // Read lines using BufRead
591        let mut line = String::new();
592        reader.read_line(&mut line).unwrap();
593        assert_eq!(line, "Line 1\n");
594
595        line.clear();
596        reader.read_line(&mut line).unwrap();
597        assert_eq!(line, "Line 2\n");
598    }
599
600    #[test]
601    fn test_bufread_consume() {
602        let data = b"ABCDEFGHIJ";
603        let cursor = Cursor::new(data.to_vec());
604
605        let mut reader = BatchedSamReader::new(cursor, 1000);
606
607        // Fill buffer
608        let buf = reader.fill_buf().unwrap();
609        assert_eq!(buf, b"ABCDEFGHIJ");
610
611        // Consume 5 bytes
612        reader.consume(5);
613
614        // Remaining should be FGHIJ
615        let buf = reader.fill_buf().unwrap();
616        assert_eq!(buf, b"FGHIJ");
617    }
618
619    #[test]
620    fn test_bufread_growth_on_large_batch() {
621        // Data that fills >75% of buffer
622        let data = vec![0xAA; 52 * 1024];
623        let cursor = Cursor::new(data);
624
625        let mut reader = BatchedSamReader::new(cursor, 1000);
626        reader.set_buffer(vec![0u8; 64 * 1024]); // 64KB buffer
627
628        let mut buf = vec![0u8; 64 * 1024];
629        let _ = reader.read(&mut buf).unwrap();
630
631        // Should have grown to 128KB (doubled)
632        assert_eq!(reader.capacity(), 128 * 1024);
633    }
634
635    #[test]
636    fn test_bufread_no_growth_on_small_batch() {
637        // Data that fills <75% of buffer
638        let data = vec![0xAA; 16 * 1024];
639        let cursor = Cursor::new(data);
640
641        let mut reader = BatchedSamReader::new(cursor, 1000);
642        reader.set_buffer(vec![0u8; 64 * 1024]); // 64KB buffer
643
644        let mut buf = vec![0u8; 64 * 1024];
645        let _ = reader.read(&mut buf).unwrap();
646
647        // Should NOT have grown
648        assert_eq!(reader.capacity(), 64 * 1024);
649    }
650
651    #[test]
652    fn test_bufread_empty_input() {
653        let data: Vec<u8> = vec![];
654        let cursor = Cursor::new(data);
655
656        let mut reader = BatchedSamReader::new(cursor, 1000);
657        let mut buf = vec![0u8; 1024];
658
659        let n = reader.read(&mut buf).unwrap();
660        assert_eq!(n, 0);
661    }
662}