Skip to main content

fgumi_lib/
bam_io.rs

1//! BAM file I/O utilities.
2//!
3//! This module provides common utilities for creating BAM readers and writers with consistent
4//! error handling and header management.
5//!
6//! # Threading Model
7//!
8//! BAM files use BGZF compression, which can be parallelized for both reading and writing:
9//!
10//! - **Single-threaded**: Use `threads=1` (lower overhead, good for small files)
11//! - **Multi-threaded**: Use `threads>1` (higher throughput for large files)
12//!
13//! Multi-threaded I/O is beneficial for:
14//! - Large BAM files where decompression/compression is a bottleneck
15//! - Systems with fast storage (SSD/NVMe) where I/O isn't the limiting factor
16//! - Pipelines that process many records in parallel
17
18use anyhow::{Context, Result};
19use noodles::bam::bai;
20// Use old VirtualPosition for compatibility with noodles_csi::Chunk
21use noodles::bgzf::VirtualPosition;
22use noodles::core::Position;
23use noodles::sam::Header;
24// Use noodles_bgzf for standard BGZF types
25use noodles_bgzf::io::{MultithreadedReader, Reader as BgzfReader, Writer as BgzfWriter};
26// Use vendored MultithreadedWriter with position tracking
27// (until https://github.com/zaeleus/noodles/pull/371 merges)
28use crate::vendored::{BlockInfoRx, MultithreadedWriter, MultithreadedWriterBuilder};
29// Use RawBamReader for raw byte access
30// (until https://github.com/zaeleus/noodles/pull/373 merges)
31use fgumi_raw_bam::RawBamReader;
32// Use bgzf crate for CompressionLevel
33use bgzf::CompressionLevel;
34use noodles_csi::binning_index::Indexer;
35use noodles_csi::binning_index::index::reference_sequence::bin::Chunk;
36use noodles_csi::binning_index::index::reference_sequence::index::LinearIndex;
37use std::collections::HashMap;
38use std::fs::File;
39use std::io::{self, BufRead, Read, Write};
40use std::num::NonZero;
41use std::path::Path;
42
43/// Maximum uncompressed BGZF block size (64KB - 256 bytes for overhead).
44const MAX_BLOCK_SIZE: usize = 65280;
45
46/// Enum wrapping single-threaded and multi-threaded BGZF readers.
47///
48/// This allows functions to accept either reader type through a unified interface.
49pub enum BgzfReaderEnum {
50    /// Single-threaded BGZF reader (lower overhead for small files)
51    SingleThreaded(BgzfReader<File>),
52    /// Multi-threaded BGZF reader (noodles built-in threading)
53    MultiThreaded(MultithreadedReader<File>),
54}
55
56impl Read for BgzfReaderEnum {
57    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
58        match self {
59            BgzfReaderEnum::SingleThreaded(r) => r.read(buf),
60            BgzfReaderEnum::MultiThreaded(r) => r.read(buf),
61        }
62    }
63}
64
65impl BufRead for BgzfReaderEnum {
66    fn fill_buf(&mut self) -> io::Result<&[u8]> {
67        match self {
68            BgzfReaderEnum::SingleThreaded(r) => r.fill_buf(),
69            BgzfReaderEnum::MultiThreaded(r) => r.fill_buf(),
70        }
71    }
72
73    fn consume(&mut self, amt: usize) {
74        match self {
75            BgzfReaderEnum::SingleThreaded(r) => r.consume(amt),
76            BgzfReaderEnum::MultiThreaded(r) => r.consume(amt),
77        }
78    }
79}
80
81/// Type alias for a BAM reader that supports both single and multi-threaded BGZF.
82pub type BamReaderAuto = noodles::bam::io::Reader<BgzfReaderEnum>;
83
84/// Enum wrapping single-threaded and multi-threaded BGZF writers.
85///
86/// Uses `Box<dyn Write + Send>` to support both file and stdout output.
87pub enum BgzfWriterEnum {
88    /// Single-threaded BGZF writer
89    SingleThreaded(BgzfWriter<Box<dyn Write + Send>>),
90    /// Multi-threaded BGZF writer
91    MultiThreaded(MultithreadedWriter<Box<dyn Write + Send>>),
92}
93
94impl Write for BgzfWriterEnum {
95    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
96        match self {
97            BgzfWriterEnum::SingleThreaded(w) => w.write(buf),
98            BgzfWriterEnum::MultiThreaded(w) => w.write(buf),
99        }
100    }
101
102    fn flush(&mut self) -> io::Result<()> {
103        match self {
104            BgzfWriterEnum::SingleThreaded(w) => w.flush(),
105            BgzfWriterEnum::MultiThreaded(w) => w.flush(),
106        }
107    }
108}
109
110impl BgzfWriterEnum {
111    /// Finish writing and close the writer properly.
112    /// This is especially important for multi-threaded writers to ensure
113    /// all data is flushed and the EOF marker is written.
114    ///
115    /// # Errors
116    /// Returns an error if flushing or finalizing the writer fails.
117    pub fn finish(self) -> io::Result<()> {
118        match self {
119            BgzfWriterEnum::SingleThreaded(mut w) => {
120                w.flush()?;
121                // Single-threaded writer writes EOF on drop
122                Ok(())
123            }
124            BgzfWriterEnum::MultiThreaded(mut w) => {
125                w.finish().map_err(|e| io::Error::other(e.to_string()))?;
126                Ok(())
127            }
128        }
129    }
130}
131
132/// Type alias for a BAM writer that supports both single and multi-threaded BGZF
133pub type BamWriter = noodles::bam::io::Writer<BgzfWriterEnum>;
134
135/// Create a [`BgzfReaderEnum`] from a file, selecting single- or multi-threaded based on `threads`.
136fn make_bgzf_reader(file: File, threads: usize) -> BgzfReaderEnum {
137    if threads > 1 {
138        let worker_count = NonZero::new(threads).expect("threads > 1 checked above");
139        BgzfReaderEnum::MultiThreaded(MultithreadedReader::with_worker_count(worker_count, file))
140    } else {
141        BgzfReaderEnum::SingleThreaded(BgzfReader::new(file))
142    }
143}
144
145/// Create a [`BgzfWriterEnum`] from a writer, selecting single- or multi-threaded based on
146/// `threads`.
147#[allow(clippy::cast_possible_truncation)]
148fn make_bgzf_writer(
149    output: Box<dyn Write + Send>,
150    threads: usize,
151    compression_level: u32,
152) -> BgzfWriterEnum {
153    if threads > 1 {
154        let worker_count = NonZero::new(threads).expect("threads > 1 checked above");
155        let mut builder = MultithreadedWriterBuilder::default().set_worker_count(worker_count);
156
157        if let Ok(cl) = CompressionLevel::new(compression_level as u8) {
158            builder = builder.set_compression_level(cl);
159        }
160
161        BgzfWriterEnum::MultiThreaded(builder.build_from_writer(output))
162    } else {
163        let level = noodles_bgzf::io::writer::CompressionLevel::new(compression_level as u8)
164            .unwrap_or_default();
165        let writer = noodles_bgzf::io::writer::Builder::default()
166            .set_compression_level(level)
167            .build_from_writer(output);
168        BgzfWriterEnum::SingleThreaded(writer)
169    }
170}
171
172/// Write a BAM header (magic, SAM text, reference sequences) to any writer.
173///
174/// Shared implementation used by both [`RawBamWriter`] and [`IndexingBamWriter`].
175///
176/// Note: we use a manual implementation rather than delegating to
177/// `noodles::bam::io::Writer::write_header` because the noodles encoder
178/// produces subtly different output that causes read-back failures with
179/// some writer backends (e.g. `MultithreadedWriter`).
180#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
181fn write_bam_header(writer: &mut impl Write, header: &Header) -> io::Result<()> {
182    // BAM magic
183    writer.write_all(fgumi_raw_bam::BAM_MAGIC)?;
184
185    // Header text (SAM header serialized using noodles)
186    let mut sam_writer = noodles::sam::io::Writer::new(Vec::new());
187    sam_writer.write_header(header)?;
188    let header_bytes = sam_writer.into_inner();
189    let l_text = header_bytes.len() as i32;
190    writer.write_all(&l_text.to_le_bytes())?;
191    writer.write_all(&header_bytes)?;
192
193    // Reference sequences
194    let n_ref = header.reference_sequences().len() as i32;
195    writer.write_all(&n_ref.to_le_bytes())?;
196
197    for (name, map) in header.reference_sequences() {
198        // l_name: length of name + null terminator
199        let l_name = (name.len() + 1) as u32;
200        writer.write_all(&l_name.to_le_bytes())?;
201        writer.write_all(name)?;
202        writer.write_all(&[0u8])?; // null terminator
203
204        // l_ref: reference length
205        let l_ref = map.length().get() as i32;
206        writer.write_all(&l_ref.to_le_bytes())?;
207    }
208
209    Ok(())
210}
211
212/// Raw BAM writer for writing raw record bytes directly.
213///
214/// This bypasses the noodles Record encoding path for maximum performance
215/// when you already have raw BAM bytes. Writes records as:
216/// - 4-byte `block_size` (little-endian)
217/// - raw BAM record bytes
218pub struct RawBamWriter {
219    inner: BgzfWriterEnum,
220}
221
222impl RawBamWriter {
223    /// Create a new raw BAM writer from a BGZF writer.
224    #[must_use]
225    pub fn new(inner: BgzfWriterEnum) -> Self {
226        Self { inner }
227    }
228
229    /// Write the BAM header.
230    ///
231    /// # Errors
232    /// Returns an error if writing to the underlying writer fails.
233    pub fn write_header(&mut self, header: &Header) -> io::Result<()> {
234        write_bam_header(&mut self.inner, header)
235    }
236
237    /// Write a raw BAM record.
238    ///
239    /// The bytes should be the raw BAM record data (without the 4-byte `block_size` prefix).
240    ///
241    /// # Errors
242    /// Returns an error if writing to the underlying writer fails.
243    #[inline]
244    #[allow(clippy::cast_possible_truncation)]
245    pub fn write_raw_record(&mut self, record_bytes: &[u8]) -> io::Result<()> {
246        use std::io::Write;
247
248        // Write block_size (4 bytes, little-endian)
249        let block_size = record_bytes.len() as u32;
250        self.inner.write_all(&block_size.to_le_bytes())?;
251
252        // Write raw record bytes
253        self.inner.write_all(record_bytes)
254    }
255
256    /// Write pre-formatted BAM record bytes directly to the BGZF stream.
257    ///
258    /// Unlike `write_raw_record`, this does NOT add a length prefix — the data
259    /// must already contain properly formatted BAM records (4-byte LE size + record bytes).
260    /// Used by the pipeline for bulk secondary output.
261    ///
262    /// # Errors
263    /// Returns an error if writing to the underlying writer fails.
264    pub fn write_raw_bytes(&mut self, data: &[u8]) -> io::Result<()> {
265        use std::io::Write;
266        self.inner.write_all(data)
267    }
268
269    /// Finish writing and close the writer.
270    ///
271    /// # Errors
272    /// Returns an error if finalizing the writer fails.
273    pub fn finish(self) -> io::Result<()> {
274        self.inner.finish()
275    }
276}
277
278/// Create a raw BAM writer for writing raw record bytes directly.
279///
280/// This is more efficient than `create_bam_writer` when you already have raw BAM bytes
281/// because it bypasses the noodles Record encoding path.
282///
283/// # Errors
284/// Returns an error if the file cannot be created or the header cannot be written.
285///
286/// # Panics
287/// Panics if `threads > 1` but `NonZero::new` fails (should not happen).
288pub fn create_raw_bam_writer<P: AsRef<Path>>(
289    path: P,
290    header: &Header,
291    threads: usize,
292    compression_level: u32,
293) -> Result<RawBamWriter> {
294    let path_ref = path.as_ref();
295    let output = open_output_writer(path_ref)?;
296
297    let bgzf_writer = make_bgzf_writer(output, threads, compression_level);
298
299    let mut writer = RawBamWriter::new(bgzf_writer);
300    writer
301        .write_header(header)
302        .with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
303    Ok(writer)
304}
305
306/// Cached index entry awaiting block position resolution.
307///
308/// When using multi-threaded compression, we don't know the exact compressed
309/// position until the block is actually written. This struct caches the
310/// information needed to build the index entry once the position is known.
311#[derive(Debug)]
312struct CachedIndexEntry {
313    /// Block number this record starts in.
314    block_number: u64,
315    /// Uncompressed offset within the block.
316    offset_in_block: usize,
317    /// Length of record (including 4-byte size prefix).
318    record_len: usize,
319    /// Alignment context: (reference ID, start, end, mapped flag).
320    alignment_context: Option<(usize, Position, Position, bool)>,
321}
322
323/// BAM writer that builds an index incrementally during write.
324///
325/// This writer generates a BAI (BAM index) file alongside the BAM output by tracking
326/// virtual file positions and alignment information for each record. The index is
327/// built incrementally as records are written, avoiding a second pass through the file.
328///
329/// Uses multi-threaded BGZF compression with htslib-style block caching for accurate
330/// position tracking. Index entries are cached until their corresponding blocks are
331/// written, then resolved with the actual compressed positions.
332///
333/// # Usage
334///
335/// ```ignore
336/// let mut writer = create_indexing_bam_writer(output, &header, 6, 4)?;  // compression=6, threads=4
337/// for record in records {
338///     writer.write_raw_record(&record)?;
339/// }
340/// let index = writer.finish()?;  // Returns the BAI index
341/// ```
342pub struct IndexingBamWriter {
343    inner: MultithreadedWriter<File>,
344    block_info_rx: BlockInfoRx,
345    indexer: Indexer<LinearIndex>,
346    num_refs: usize,
347    // Block-aware caching (htslib-style)
348    entry_cache: Vec<CachedIndexEntry>,
349    block_positions: HashMap<u64, u64>,
350    current_block_number: u64,
351    current_block_offset: usize,
352}
353
354impl IndexingBamWriter {
355    /// Create a new indexing BAM writer.
356    fn new(inner: MultithreadedWriter<File>, num_refs: usize) -> Self {
357        let block_info_rx = inner.block_info_receiver().unwrap().clone();
358        Self {
359            current_block_number: inner.current_block_number(),
360            current_block_offset: inner.buffer_offset(),
361            inner,
362            block_info_rx,
363            indexer: Indexer::default(),
364            num_refs,
365            entry_cache: Vec::new(),
366            block_positions: HashMap::new(),
367        }
368    }
369
370    /// Write the BAM header.
371    fn write_header(&mut self, header: &Header) -> io::Result<()> {
372        write_bam_header(&mut self.inner, header)?;
373
374        // Flush header to ensure it's in its own block(s)
375        self.inner.flush()?;
376
377        // Update position tracking after header
378        self.current_block_number = self.inner.current_block_number();
379        self.current_block_offset = self.inner.buffer_offset();
380
381        Ok(())
382    }
383
384    /// Write a raw BAM record and update the index.
385    ///
386    /// Extracts alignment information from raw bytes for indexing:
387    /// - Reference ID (tid)
388    /// - Alignment start position
389    /// - Alignment end position (calculated from CIGAR)
390    /// - Mapped/unmapped status
391    ///
392    /// # Errors
393    /// Returns an error if writing the record or flushing blocks fails.
394    #[inline]
395    #[allow(clippy::cast_possible_truncation)]
396    pub fn write_raw_record(&mut self, record_bytes: &[u8]) -> io::Result<()> {
397        // Capture position BEFORE write
398        let block_number = self.current_block_number;
399        let offset_in_block = self.current_block_offset;
400
401        // Write record (4-byte size prefix + record bytes)
402        let block_size = record_bytes.len() as u32;
403        self.inner.write_all(&block_size.to_le_bytes())?;
404        self.inner.write_all(record_bytes)?;
405
406        let record_len = 4 + record_bytes.len();
407        self.current_block_offset += record_len;
408
409        // Check if a new block was started (buffer was flushed)
410        let new_block_number = self.inner.current_block_number();
411        if new_block_number > self.current_block_number {
412            // Block boundary crossed - update tracking
413            self.current_block_number = new_block_number;
414            self.current_block_offset = self.inner.buffer_offset();
415        }
416
417        // Cache entry for later resolution
418        let alignment_context = Self::extract_alignment_context(record_bytes);
419        self.entry_cache.push(CachedIndexEntry {
420            block_number,
421            offset_in_block,
422            record_len,
423            alignment_context,
424        });
425
426        // Process any completed blocks
427        self.flush_completed_blocks()?;
428
429        Ok(())
430    }
431
432    /// Write a record buffer to the BAM file and update the index.
433    ///
434    /// This method encodes the record buffer to raw bytes using noodles' encoder,
435    /// then writes it. Useful for the non-fast sorting path that uses decoded records.
436    ///
437    /// # Errors
438    /// Returns an error if encoding or writing the record fails.
439    pub fn write_record_buf(
440        &mut self,
441        header: &Header,
442        record: &noodles::sam::alignment::record_buf::RecordBuf,
443    ) -> io::Result<()> {
444        use crate::vendored::bam_codec::encode_record_buf;
445
446        // Encode record to raw bytes
447        let mut buf = Vec::new();
448        encode_record_buf(&mut buf, header, record)?;
449
450        // Write using raw record method
451        self.write_raw_record(&buf)
452    }
453
454    /// Process block completion notifications and resolve cached index entries.
455    #[allow(clippy::cast_possible_truncation)]
456    fn flush_completed_blocks(&mut self) -> io::Result<()> {
457        // Drain block info from receiver
458        while let Ok(info) = self.block_info_rx.try_recv() {
459            self.block_positions.insert(info.block_number, info.compressed_start);
460        }
461
462        // Flush cached entries for completed blocks
463        let mut i = 0;
464        while i < self.entry_cache.len() {
465            let entry = &self.entry_cache[i];
466
467            if let Some(&block_start) = self.block_positions.get(&entry.block_number) {
468                // Block is complete - calculate virtual positions
469                let start_vpos =
470                    VirtualPosition::try_from((block_start, entry.offset_in_block as u16))
471                        .unwrap_or(VirtualPosition::MIN);
472
473                // End position: might be in same block or next
474                let end_offset = entry.offset_in_block + entry.record_len;
475                let end_vpos = if end_offset <= MAX_BLOCK_SIZE {
476                    VirtualPosition::try_from((block_start, end_offset as u16))
477                        .unwrap_or(VirtualPosition::MIN)
478                } else {
479                    // Record spans block boundary - need next block position
480                    let next_block = entry.block_number + 1;
481                    if let Some(&next_start) = self.block_positions.get(&next_block) {
482                        let overflow = end_offset - MAX_BLOCK_SIZE;
483                        VirtualPosition::try_from((next_start, overflow as u16))
484                            .unwrap_or(VirtualPosition::MIN)
485                    } else {
486                        // Next block not ready yet - skip for now
487                        i += 1;
488                        continue;
489                    }
490                };
491
492                let chunk = Chunk::new(start_vpos, end_vpos);
493                if let Some(ctx) = entry.alignment_context {
494                    self.indexer.add_record(Some(ctx), chunk).map_err(io::Error::other)?;
495                } else {
496                    // Unmapped record
497                    self.indexer.add_record(None, chunk).map_err(io::Error::other)?;
498                }
499
500                self.entry_cache.remove(i);
501            } else {
502                i += 1;
503            }
504        }
505
506        Ok(())
507    }
508
509    /// Extract alignment context from raw BAM bytes.
510    ///
511    /// Returns `Some((ref_id, start, end, is_mapped))` for mapped reads,
512    /// or `None` for unmapped reads (needed for unmapped record counting).
513    #[inline]
514    #[allow(clippy::cast_sign_loss, clippy::cast_possible_wrap)]
515    fn extract_alignment_context(bam: &[u8]) -> Option<(usize, Position, Position, bool)> {
516        // Extract fields from BAM record
517        let tid = fgumi_raw_bam::ref_id(bam);
518        let pos = fgumi_raw_bam::pos(bam);
519        let flags = fgumi_raw_bam::flags(bam);
520
521        let is_unmapped = (flags & fgumi_raw_bam::flags::UNMAPPED) != 0;
522
523        // Unmapped reads: return None (indexer handles them specially)
524        if tid < 0 || is_unmapped {
525            return None;
526        }
527
528        // Calculate alignment end from CIGAR using byte-safe access
529        // (doesn't require 4-byte alignment like get_cigar_ops)
530        let ref_len = fgumi_raw_bam::reference_length_from_raw_bam(bam);
531
532        // BAM positions are 0-based, Position is 1-based
533        let start = Position::try_from((pos + 1) as usize).ok()?;
534        let end = Position::try_from((pos + ref_len) as usize).ok()?;
535
536        Some((tid as usize, start, end, true))
537    }
538
539    /// Finish writing, flush the BGZF stream, and return the index.
540    ///
541    /// # Errors
542    /// Returns an error if flushing, finalizing, or building the index fails.
543    pub fn finish(mut self) -> io::Result<bai::Index> {
544        // Flush any remaining data
545        self.inner.flush()?;
546
547        // Process all remaining blocks
548        // Wait a bit for the writer thread to finish processing
549        for _ in 0..100 {
550            self.flush_completed_blocks()?;
551            if self.entry_cache.is_empty() {
552                break;
553            }
554            std::thread::sleep(std::time::Duration::from_millis(10));
555        }
556
557        // Finish the writer (writes EOF marker)
558        let _ = self.inner.finish().map_err(|e| io::Error::other(e.to_string()))?;
559
560        // Final flush of any remaining entries
561        self.flush_completed_blocks()?;
562
563        // All entries should be flushed now
564        if !self.entry_cache.is_empty() {
565            return Err(io::Error::other(format!(
566                "Unflushed index entries remain: {} entries",
567                self.entry_cache.len()
568            )));
569        }
570
571        // Build the final index
572        let index = self.indexer.build(self.num_refs);
573        Ok(index)
574    }
575}
576
577/// Create an indexing BAM writer that builds a BAI index during write.
578///
579/// This writer generates the BAM index incrementally as records are written,
580/// avoiding a second pass through the file. The index can be written after
581/// calling `finish()`.
582///
583/// Uses multi-threaded BGZF compression with htslib-style block caching for
584/// accurate virtual position tracking. Index entries are cached until their
585/// corresponding blocks are written, then resolved with actual positions.
586///
587/// # Arguments
588/// * `path` - Path for the output BAM file
589/// * `header` - SAM header (must have coordinate sort order)
590/// * `compression_level` - BGZF compression level (1-12)
591/// * `threads` - Number of compression threads (minimum 1)
592///
593/// # Returns
594/// An `IndexingBamWriter` that will generate a BAI index on `finish()`.
595///
596/// # Errors
597/// Returns an error if the file cannot be created or the header cannot be written.
598///
599/// # Panics
600/// Panics if `NonZero::new` fails on the thread count (should not happen).
601pub fn create_indexing_bam_writer<P: AsRef<Path>>(
602    path: P,
603    header: &Header,
604    compression_level: u32,
605    threads: usize,
606) -> Result<IndexingBamWriter> {
607    let path_ref = path.as_ref();
608    if is_stdout_path(path_ref) {
609        anyhow::bail!(
610            "Cannot create an indexing BAM writer for stdout (indexing requires a seekable file)"
611        );
612    }
613    let output_file = File::create(path_ref)
614        .with_context(|| format!("Failed to create output BAM: {}", path_ref.display()))?;
615
616    // Use multi-threaded writer with block position tracking
617    let worker_count = NonZero::new(threads.max(1)).expect("threads.max(1) >= 1");
618    let mut builder = MultithreadedWriterBuilder::default().set_worker_count(worker_count);
619
620    #[allow(clippy::cast_possible_truncation)]
621    if let Ok(cl) = CompressionLevel::new(compression_level as u8) {
622        builder = builder.set_compression_level(cl);
623    }
624
625    let bgzf_writer = builder.build_from_writer(output_file);
626
627    let num_refs = header.reference_sequences().len();
628    let mut writer = IndexingBamWriter::new(bgzf_writer, num_refs);
629    writer
630        .write_header(header)
631        .with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
632
633    Ok(writer)
634}
635
636/// Write a BAI index to a file.
637///
638/// # Arguments
639/// * `path` - Path for the output BAI file (typically `output.bam.bai`)
640/// * `index` - The BAI index to write
641///
642/// # Errors
643/// Returns an error if the file cannot be created or writing the index fails.
644pub fn write_bai_index<P: AsRef<Path>>(path: P, index: &bai::Index) -> Result<()> {
645    let path_ref = path.as_ref();
646    let file = File::create(path_ref)
647        .with_context(|| format!("Failed to create index file: {}", path_ref.display()))?;
648    let mut writer = bai::io::Writer::new(file);
649    writer
650        .write_index(index)
651        .with_context(|| format!("Failed to write index to: {}", path_ref.display()))?;
652    Ok(())
653}
654
655/// Create a BAM reader and read its header.
656///
657/// # Arguments
658/// * `path` - Path to the input BAM file
659/// * `threads` - Number of decompression threads (1 = single-threaded)
660///
661/// # Threading
662/// - `threads <= 1`: Single-threaded (lower overhead, good for small files)
663/// - `threads > 1`: Multi-threaded (higher throughput for large files)
664///
665/// # Returns
666/// A tuple of (BAM reader, header)
667///
668/// # Errors
669/// Returns an error if the file cannot be opened or the header cannot be read
670///
671/// # Panics
672/// Panics if `threads > 1` but `NonZero::new` fails (should not happen).
673///
674/// # Example
675/// ```no_run
676/// use fgumi_lib::bam_io::create_bam_reader;
677/// use std::path::Path;
678///
679/// // Single-threaded
680/// let (mut reader, header) = create_bam_reader(Path::new("input.bam"), 1).unwrap();
681///
682/// // Multi-threaded with 4 decompression threads
683/// let (mut reader, header) = create_bam_reader(Path::new("input.bam"), 4).unwrap();
684/// ```
685pub fn create_bam_reader<P: AsRef<Path>>(
686    path: P,
687    threads: usize,
688) -> Result<(BamReaderAuto, Header)> {
689    let path_ref = path.as_ref();
690    let file = File::open(path_ref)
691        .with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
692
693    let bgzf_reader = make_bgzf_reader(file, threads);
694
695    let mut reader = noodles::bam::io::Reader::from(bgzf_reader);
696    let header = reader
697        .read_header()
698        .with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
699
700    Ok((reader, header))
701}
702
703/// Type alias for a raw BAM reader that supports both single and multi-threaded BGZF.
704pub type RawBamReaderAuto = RawBamReader<BgzfReaderEnum>;
705
706/// Create a raw BAM reader that yields raw bytes instead of noodles Record.
707///
708/// This is used by the raw sorting pipeline for high-performance byte-level access
709/// without going through noodles' Record parsing.
710///
711/// # Arguments
712/// * `path` - Path to the input BAM file
713/// * `threads` - Number of threads for BGZF decompression (1 = single-threaded)
714///
715/// # Returns
716/// A tuple of (`raw_reader`, `header`)
717///
718/// # Errors
719/// Returns an error if the file cannot be opened or the header cannot be read
720///
721/// # Panics
722/// Panics if `threads > 1` but `NonZero::new` fails (should not happen).
723pub fn create_raw_bam_reader<P: AsRef<Path>>(
724    path: P,
725    threads: usize,
726) -> Result<(RawBamReaderAuto, Header)> {
727    let path_ref = path.as_ref();
728    let file = File::open(path_ref)
729        .with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
730
731    let bgzf_reader = make_bgzf_reader(file, threads);
732
733    // Use noodles to read the header, then extract the BGZF reader
734    let mut noodles_reader = noodles::bam::io::Reader::from(bgzf_reader);
735    let header = noodles_reader
736        .read_header()
737        .with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
738
739    // Get back the BGZF reader (header has been consumed)
740    let bgzf_reader = noodles_reader.into_inner();
741
742    // Wrap in our raw reader
743    let raw_reader = RawBamReader::new(bgzf_reader);
744
745    Ok((raw_reader, header))
746}
747
748/// Create a BAM writer and write the header in one operation
749///
750/// # Arguments
751/// * `path` - Path for the output BAM file
752/// * `header` - SAM header to write
753/// * `threads` - Number of threads for BGZF compression (1 = single-threaded)
754/// * `compression_level` - Compression level (1-12)
755///
756/// # Returns
757/// A BAM writer ready for writing records
758///
759/// # Errors
760/// Returns an error if the file cannot be created or the header cannot be written
761///
762/// # Panics
763/// Panics if `threads > 1` but `NonZero::new` fails (should not happen).
764///
765/// # Example
766/// ```no_run
767/// use fgumi_lib::bam_io::create_bam_writer;
768/// use noodles::sam::Header;
769/// use std::path::Path;
770///
771/// let header = Header::default();
772/// // Single-threaded with fast compression
773/// let mut writer = create_bam_writer(Path::new("output.bam"), &header, 1, 1).unwrap();
774/// // Multi-threaded with 4 compression threads
775/// let mut writer = create_bam_writer(Path::new("output.bam"), &header, 4, 1).unwrap();
776/// ```
777pub fn create_bam_writer<P: AsRef<Path>>(
778    path: P,
779    header: &Header,
780    threads: usize,
781    compression_level: u32,
782) -> Result<BamWriter> {
783    let path_ref = path.as_ref();
784    let output = open_output_writer(path_ref)?;
785
786    let bgzf_writer = make_bgzf_writer(output, threads, compression_level);
787
788    let mut writer = noodles::bam::io::Writer::from(bgzf_writer);
789    writer
790        .write_header(header)
791        .with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
792    Ok(writer)
793}
794
795/// Create an optional BAM writer for rejects or filtered reads
796///
797/// This is a convenience function for commands that optionally output rejected/filtered reads.
798/// If the path is `None`, returns `None`. If the path is `Some`, creates a writer and writes
799/// the header.
800///
801/// # Arguments
802/// * `path` - Optional path for the output BAM file
803/// * `header` - SAM header to write
804/// * `threads` - Number of threads for BGZF compression (1 = single-threaded)
805/// * `compression_level` - Compression level (1-12)
806///
807/// # Returns
808/// An optional BAM writer, or None if path is None
809///
810/// # Errors
811/// Returns an error if the file cannot be created or the header cannot be written
812///
813/// # Example
814/// ```no_run
815/// use fgumi_lib::bam_io::create_optional_bam_writer;
816/// use noodles::sam::Header;
817/// use std::path::PathBuf;
818///
819/// let header = Header::default();
820/// let rejects = Some(PathBuf::from("rejects.bam"));
821/// let mut writer = create_optional_bam_writer(rejects, &header, 1, 1).unwrap();
822/// ```
823pub fn create_optional_bam_writer<P: AsRef<Path>>(
824    path: Option<P>,
825    header: &Header,
826    threads: usize,
827    compression_level: u32,
828) -> Result<Option<BamWriter>> {
829    match path {
830        Some(p) => Ok(Some(create_bam_writer(p, header, threads, compression_level)?)),
831        None => Ok(None),
832    }
833}
834
835/// Check if a path refers to stdin.
836///
837/// Returns true if the path is "-" or "/dev/stdin".
838///
839/// # Example
840/// ```
841/// use fgumi_lib::bam_io::is_stdin_path;
842/// use std::path::Path;
843///
844/// assert!(is_stdin_path(Path::new("-")));
845/// assert!(is_stdin_path(Path::new("/dev/stdin")));
846/// assert!(!is_stdin_path(Path::new("input.bam")));
847/// ```
848pub fn is_stdin_path<P: AsRef<Path>>(path: P) -> bool {
849    let path_str = path.as_ref().to_string_lossy();
850    path_str == "-" || path_str == "/dev/stdin"
851}
852
853/// Check if a path refers to stdout.
854///
855/// Returns true if the path is "-" or "/dev/stdout".
856///
857/// # Example
858/// ```
859/// use fgumi_lib::bam_io::is_stdout_path;
860/// use std::path::Path;
861///
862/// assert!(is_stdout_path(Path::new("-")));
863/// assert!(is_stdout_path(Path::new("/dev/stdout")));
864/// assert!(!is_stdout_path(Path::new("output.bam")));
865/// ```
866pub fn is_stdout_path<P: AsRef<Path>>(path: P) -> bool {
867    let path_str = path.as_ref().to_string_lossy();
868    path_str == "-" || path_str == "/dev/stdout"
869}
870
871/// Open an output writer for a path, supporting stdout via "-" or "/dev/stdout".
872fn open_output_writer<P: AsRef<Path>>(path: P) -> Result<Box<dyn Write + Send>> {
873    let path_ref = path.as_ref();
874    if is_stdout_path(path_ref) {
875        Ok(Box::new(std::io::stdout()))
876    } else {
877        let file = File::create(path_ref)
878            .with_context(|| format!("Failed to create output BAM: {}", path_ref.display()))?;
879        Ok(Box::new(file))
880    }
881}
882
883/// A reader that buffers all bytes read through it.
884///
885/// This is used to capture raw bytes while parsing the BAM header,
886/// so they can be replayed to the pipeline.
887struct TeeReader<R> {
888    inner: R,
889    buffer: Vec<u8>,
890}
891
892impl<R: Read> TeeReader<R> {
893    fn new(inner: R) -> Self {
894        Self { inner, buffer: Vec::new() }
895    }
896
897    /// Consume the `TeeReader` and return the buffered bytes and inner reader.
898    fn into_parts(self) -> (Vec<u8>, R) {
899        (self.buffer, self.inner)
900    }
901}
902
903impl<R: Read> Read for TeeReader<R> {
904    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
905        let n = self.inner.read(buf)?;
906        self.buffer.extend_from_slice(&buf[..n]);
907        Ok(n)
908    }
909}
910
911/// A reader that chains buffered data with a remaining stream.
912///
913/// First yields all bytes from the buffer, then reads from the inner reader.
914pub struct ChainedReader<R> {
915    buffer: io::Cursor<Vec<u8>>,
916    inner: R,
917    buffer_exhausted: bool,
918}
919
920impl<R: Read> ChainedReader<R> {
921    /// Create a new chained reader from buffered data and an inner reader.
922    pub fn new(buffer: Vec<u8>, inner: R) -> Self {
923        Self { buffer: io::Cursor::new(buffer), inner, buffer_exhausted: false }
924    }
925}
926
927impl<R: Read> Read for ChainedReader<R> {
928    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
929        if !self.buffer_exhausted {
930            let n = self.buffer.read(buf)?;
931            if n > 0 {
932                return Ok(n);
933            }
934            self.buffer_exhausted = true;
935        }
936        self.inner.read(buf)
937    }
938}
939
940impl<R: Read + Send> ChainedReader<R> {
941    /// Box this reader as a trait object for use with the pipeline.
942    pub fn into_boxed(self) -> Box<dyn Read + Send>
943    where
944        R: 'static,
945    {
946        Box::new(self)
947    }
948}
949
950/// Create a raw byte reader and header for pipeline use, supporting both files and stdin.
951///
952/// This function is designed for commands that need to pass a reader to the pipeline.
953/// Unlike `create_bam_reader`, this returns a raw byte reader (not a BAM reader) that
954/// can be passed directly to `run_bam_pipeline_*_from_reader` functions.
955///
956/// For files: Opens the file, reads the header, seeks back to start, returns the file.
957/// For stdin: Buffers all bytes read while parsing header, returns a chained reader
958///            that first yields the buffered bytes then continues from stdin.
959///
960/// # Arguments
961/// * `path` - Path to the input BAM file, or "-" / "/dev/stdin" for stdin
962///
963/// # Returns
964/// A tuple of (boxed reader, header). The reader is positioned at the start of the file
965/// (including the header bytes) so the pipeline can parse the header again.
966///
967/// # Errors
968/// Returns an error if the file cannot be opened or the header cannot be read
969///
970/// # Example
971/// ```no_run
972/// use fgumi_lib::bam_io::create_bam_reader_for_pipeline;
973/// use std::path::Path;
974///
975/// // From file
976/// let (reader, header) = create_bam_reader_for_pipeline(Path::new("input.bam")).unwrap();
977///
978/// // From stdin
979/// let (reader, header) = create_bam_reader_for_pipeline(Path::new("-")).unwrap();
980/// ```
981pub fn create_bam_reader_for_pipeline<P: AsRef<Path>>(
982    path: P,
983) -> Result<(Box<dyn Read + Send>, Header)> {
984    use std::io::{Seek, SeekFrom};
985
986    let path_ref = path.as_ref();
987
988    if is_stdin_path(path_ref) {
989        // Read from stdin with buffering
990        let stdin = io::stdin();
991        let tee_reader = TeeReader::new(stdin);
992        let bgzf_reader = BgzfReader::new(tee_reader);
993        let mut bam_reader = noodles::bam::io::Reader::from(bgzf_reader);
994
995        // Read header (this consumes and buffers the raw bytes)
996        let header =
997            bam_reader.read_header().with_context(|| "Failed to read header from stdin")?;
998
999        // Get the buffered bytes and remaining stdin
1000        let bgzf_reader = bam_reader.into_inner();
1001        let tee_reader = bgzf_reader.into_inner();
1002        let (buffered_bytes, stdin) = tee_reader.into_parts();
1003
1004        // Create a chained reader: buffered bytes first, then remaining stdin
1005        let chained = ChainedReader::new(buffered_bytes, stdin);
1006
1007        Ok((Box::new(chained), header))
1008    } else {
1009        // Read from file - we can seek back
1010        let mut file = File::open(path_ref)
1011            .with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
1012
1013        // Read header using noodles
1014        let bgzf_reader = BgzfReader::new(&file);
1015        let mut bam_reader = noodles::bam::io::Reader::from(bgzf_reader);
1016        let header = bam_reader
1017            .read_header()
1018            .with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
1019
1020        // Seek file back to start
1021        file.seek(SeekFrom::Start(0))
1022            .with_context(|| format!("Failed to seek in file: {}", path_ref.display()))?;
1023
1024        Ok((Box::new(file), header))
1025    }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031    use noodles::sam::header::record::value::{Map, map::ReferenceSequence};
1032    use std::num::NonZeroUsize;
1033    use tempfile::NamedTempFile;
1034
1035    fn create_test_header() -> Header {
1036        let mut builder = Header::builder();
1037        let ref_seq = Map::<ReferenceSequence>::new(
1038            NonZeroUsize::new(100).expect("100 is non-zero constant"),
1039        );
1040        builder = builder.add_reference_sequence(b"chr1", ref_seq);
1041        builder.build()
1042    }
1043
1044    #[test]
1045    fn test_create_bam_reader_nonexistent_file() {
1046        let result = create_bam_reader("/nonexistent/file.bam", 1);
1047        assert!(result.is_err());
1048        if let Err(e) = result {
1049            let err_msg = e.to_string();
1050            assert!(err_msg.contains("Failed to open input BAM"));
1051        }
1052    }
1053
1054    #[test]
1055    fn test_create_bam_writer() -> Result<()> {
1056        let temp_file = NamedTempFile::new()?;
1057        let header = create_test_header();
1058
1059        let writer = create_bam_writer(temp_file.path(), &header, 1, 6);
1060        assert!(writer.is_ok());
1061
1062        Ok(())
1063    }
1064
1065    #[test]
1066    fn test_create_bam_writer_multithreaded() -> Result<()> {
1067        let temp_file = NamedTempFile::new()?;
1068        let header = create_test_header();
1069
1070        let writer = create_bam_writer(temp_file.path(), &header, 4, 6);
1071        assert!(writer.is_ok());
1072
1073        Ok(())
1074    }
1075
1076    #[test]
1077    fn test_create_bam_writer_invalid_path() {
1078        let header = create_test_header();
1079        let result = create_bam_writer("/invalid/path/output.bam", &header, 1, 6);
1080        assert!(result.is_err());
1081        if let Err(e) = result {
1082            let err_msg = e.to_string();
1083            assert!(err_msg.contains("Failed to create output BAM"));
1084        }
1085    }
1086
1087    #[test]
1088    fn test_create_optional_bam_writer_some() -> Result<()> {
1089        let temp_file = NamedTempFile::new()?;
1090        let header = create_test_header();
1091
1092        let writer = create_optional_bam_writer(Some(temp_file.path()), &header, 1, 6)?;
1093        assert!(writer.is_some());
1094
1095        Ok(())
1096    }
1097
1098    #[test]
1099    fn test_create_optional_bam_writer_none() -> Result<()> {
1100        let header = create_test_header();
1101
1102        let writer = create_optional_bam_writer(None::<&str>, &header, 1, 6)?;
1103        assert!(writer.is_none());
1104
1105        Ok(())
1106    }
1107
1108    #[test]
1109    fn test_create_optional_bam_writer_invalid_path() {
1110        let header = create_test_header();
1111        let result = create_optional_bam_writer(Some("/invalid/path/output.bam"), &header, 1, 6);
1112        assert!(result.is_err());
1113    }
1114
1115    #[test]
1116    fn test_roundtrip_write_and_read() -> Result<()> {
1117        let temp_file = NamedTempFile::new()?;
1118        let header = create_test_header();
1119
1120        // Write (single-threaded)
1121        {
1122            let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
1123            // Writer is dropped, file is written
1124        }
1125
1126        // Read (single-threaded)
1127        let (mut reader, read_header) = create_bam_reader(temp_file.path(), 1)?;
1128
1129        // Verify header has our reference sequence
1130        assert_eq!(read_header.reference_sequences().len(), 1);
1131
1132        // Verify we can iterate (even though there are no records)
1133        let records: Result<Vec<_>, _> = reader.records().collect();
1134        assert!(records.is_ok());
1135
1136        Ok(())
1137    }
1138
1139    #[test]
1140    fn test_roundtrip_write_and_read_multithreaded() -> Result<()> {
1141        let temp_file = NamedTempFile::new()?;
1142        let header = create_test_header();
1143
1144        // Write (multi-threaded)
1145        {
1146            let _writer = create_bam_writer(temp_file.path(), &header, 4, 6)?;
1147            // Writer is dropped, file is written
1148        }
1149
1150        // Read (multi-threaded)
1151        let (mut reader, read_header) = create_bam_reader(temp_file.path(), 4)?;
1152
1153        // Verify header has our reference sequence
1154        assert_eq!(read_header.reference_sequences().len(), 1);
1155
1156        // Verify we can iterate (even though there are no records)
1157        let records: Result<Vec<_>, _> = reader.records().collect();
1158        assert!(records.is_ok());
1159
1160        Ok(())
1161    }
1162
1163    #[test]
1164    fn test_bgzf_writer_flush_single_threaded() -> Result<()> {
1165        let temp_file = NamedTempFile::new()?;
1166        let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1167        let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
1168
1169        // Write some data and flush
1170        writer.write_all(b"test data")?;
1171        let result = writer.flush();
1172        assert!(result.is_ok());
1173
1174        Ok(())
1175    }
1176
1177    #[test]
1178    fn test_bgzf_writer_flush_multithreaded() -> Result<()> {
1179        let temp_file = NamedTempFile::new()?;
1180        let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1181        let worker_count = NonZero::new(2).expect("2 is non-zero");
1182        let compression_level = CompressionLevel::new(6).expect("valid compression level");
1183        let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
1184            worker_count,
1185            output_file,
1186            compression_level,
1187        ));
1188
1189        // Write some data and flush
1190        writer.write_all(b"test data")?;
1191        let result = writer.flush();
1192        assert!(result.is_ok());
1193
1194        Ok(())
1195    }
1196
1197    #[test]
1198    fn test_bgzf_writer_finish_single_threaded() -> Result<()> {
1199        let temp_file = NamedTempFile::new()?;
1200        let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1201        let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
1202
1203        // Write some data
1204        writer.write_all(b"test data")?;
1205
1206        // Finish the writer - this should flush and close properly
1207        let result = writer.finish();
1208        assert!(result.is_ok());
1209
1210        Ok(())
1211    }
1212
1213    #[test]
1214    fn test_bgzf_writer_finish_multithreaded() -> Result<()> {
1215        let temp_file = NamedTempFile::new()?;
1216        let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1217        let worker_count = NonZero::new(2).expect("2 is non-zero");
1218        let compression_level = CompressionLevel::new(6).expect("valid compression level");
1219        let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
1220            worker_count,
1221            output_file,
1222            compression_level,
1223        ));
1224
1225        // Write some data
1226        writer.write_all(b"test data")?;
1227
1228        // Finish the writer - this ensures proper finalization
1229        let result = writer.finish();
1230        assert!(result.is_ok());
1231
1232        Ok(())
1233    }
1234
1235    #[test]
1236    fn test_bgzf_writer_write_single_threaded() -> Result<()> {
1237        let temp_file = NamedTempFile::new()?;
1238        let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1239        let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
1240
1241        // Test writing via the Write trait
1242        let bytes_written = writer.write(b"test")?;
1243        assert_eq!(bytes_written, 4);
1244
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn test_bgzf_writer_write_multithreaded() -> Result<()> {
1250        let temp_file = NamedTempFile::new()?;
1251        let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1252        let worker_count = NonZero::new(2).expect("2 is non-zero");
1253        let compression_level = CompressionLevel::new(6).expect("valid compression level");
1254        let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
1255            worker_count,
1256            output_file,
1257            compression_level,
1258        ));
1259
1260        // Test writing via the Write trait
1261        let bytes_written = writer.write(b"test")?;
1262        assert_eq!(bytes_written, 4);
1263
1264        Ok(())
1265    }
1266
1267    #[test]
1268    fn test_is_stdin_path() {
1269        // Test stdin paths
1270        assert!(is_stdin_path("-"));
1271        assert!(is_stdin_path("/dev/stdin"));
1272        assert!(is_stdin_path(Path::new("-")));
1273        assert!(is_stdin_path(Path::new("/dev/stdin")));
1274
1275        // Test non-stdin paths
1276        assert!(!is_stdin_path("input.bam"));
1277        assert!(!is_stdin_path("/path/to/file.bam"));
1278        assert!(!is_stdin_path(""));
1279        assert!(!is_stdin_path("/dev/null"));
1280    }
1281
1282    #[test]
1283    fn test_create_bam_reader_for_pipeline_from_file() -> Result<()> {
1284        let temp_file = NamedTempFile::new()?;
1285        let header = create_test_header();
1286
1287        // Write a BAM file first
1288        {
1289            let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
1290        }
1291
1292        // Read using create_bam_reader_for_pipeline
1293        let (mut reader, read_header) = create_bam_reader_for_pipeline(temp_file.path())?;
1294        assert_eq!(read_header.reference_sequences().len(), 1);
1295
1296        // The reader returns raw bytes, verify we can read something
1297        let mut buf = [0u8; 16];
1298        let n = reader.read(&mut buf)?;
1299        assert!(n > 0, "Should read some bytes from the file");
1300
1301        Ok(())
1302    }
1303
1304    #[test]
1305    fn test_create_bam_reader_for_pipeline_nonexistent_file() {
1306        let result = create_bam_reader_for_pipeline("/nonexistent/file.bam");
1307        assert!(result.is_err());
1308        if let Err(e) = result {
1309            let err_msg = e.to_string();
1310            assert!(err_msg.contains("Failed to open input BAM"));
1311        }
1312    }
1313
1314    #[test]
1315    fn test_chained_reader() {
1316        let buffer = vec![1, 2, 3, 4, 5];
1317        let remaining = io::Cursor::new(vec![6, 7, 8, 9, 10]);
1318        let mut chained = ChainedReader::new(buffer, remaining);
1319
1320        let mut result = Vec::new();
1321        chained.read_to_end(&mut result).unwrap();
1322
1323        assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1324    }
1325
1326    #[test]
1327    fn test_chained_reader_empty_buffer() {
1328        let buffer = vec![];
1329        let remaining = io::Cursor::new(vec![1, 2, 3]);
1330        let mut chained = ChainedReader::new(buffer, remaining);
1331
1332        let mut result = Vec::new();
1333        chained.read_to_end(&mut result).unwrap();
1334
1335        assert_eq!(result, vec![1, 2, 3]);
1336    }
1337
1338    #[test]
1339    fn test_chained_reader_empty_remaining() {
1340        let buffer = vec![1, 2, 3];
1341        let remaining = io::Cursor::new(vec![]);
1342        let mut chained = ChainedReader::new(buffer, remaining);
1343
1344        let mut result = Vec::new();
1345        chained.read_to_end(&mut result).unwrap();
1346
1347        assert_eq!(result, vec![1, 2, 3]);
1348    }
1349
1350    /// Create a minimal BAM record for testing.
1351    ///
1352    /// Creates a mapped record at the given reference and position with a simple CIGAR.
1353    /// The read name is padded to ensure CIGAR alignment (the read name length must make
1354    /// the CIGAR start at a 4-byte aligned offset from the record start).
1355    #[allow(clippy::cast_possible_truncation)]
1356    fn create_test_bam_record(ref_id: i32, pos: i32, read_name: &[u8]) -> Vec<u8> {
1357        // BAM record structure (without block_size prefix):
1358        // refID (4), pos (4), l_read_name (1), mapq (1), bin (2), n_cigar_op (2),
1359        // flag (2), l_seq (4), next_refID (4), next_pos (4), tlen (4),
1360        // read_name (l_read_name), cigar (n_cigar_op * 4), seq ((l_seq+1)/2), qual (l_seq)
1361        //
1362        // Fixed header is 32 bytes. CIGAR starts at 32 + l_read_name.
1363        // For CIGAR to be 4-byte aligned, l_read_name must be a multiple of 4.
1364
1365        // Calculate padding needed to make l_read_name a multiple of 4
1366        let name_with_null = read_name.len() + 1;
1367        let padding = (4 - (name_with_null % 4)) % 4;
1368        let l_read_name = (name_with_null + padding) as u8;
1369
1370        let mapq: u8 = 60;
1371        let bin: u16 = 4681; // bin for small coordinates
1372        let n_cigar_op: u16 = 1;
1373        let flag: u16 = 0; // mapped, forward strand
1374        let l_seq: u32 = 10;
1375        let next_ref_id: i32 = -1;
1376        let next_pos: i32 = -1;
1377        let tlen: i32 = 0;
1378
1379        let mut record = Vec::new();
1380
1381        // Core fields
1382        record.extend_from_slice(&ref_id.to_le_bytes());
1383        record.extend_from_slice(&pos.to_le_bytes());
1384        record.push(l_read_name);
1385        record.push(mapq);
1386        record.extend_from_slice(&bin.to_le_bytes());
1387        record.extend_from_slice(&n_cigar_op.to_le_bytes());
1388        record.extend_from_slice(&flag.to_le_bytes());
1389        record.extend_from_slice(&l_seq.to_le_bytes());
1390        record.extend_from_slice(&next_ref_id.to_le_bytes());
1391        record.extend_from_slice(&next_pos.to_le_bytes());
1392        record.extend_from_slice(&tlen.to_le_bytes());
1393
1394        // Read name + null terminator + padding (null bytes)
1395        record.extend_from_slice(read_name);
1396        record.push(0); // null terminator
1397        record.extend(std::iter::repeat_n(0u8, padding));
1398
1399        // CIGAR: 10M = (10 << 4) = 160
1400        let cigar_op: u32 = 10 << 4; // 10M
1401        record.extend_from_slice(&cigar_op.to_le_bytes());
1402
1403        // Sequence: 10 bases = 5 bytes (4-bit encoded)
1404        // AAAAAAAAAA = 0x11 0x11 0x11 0x11 0x11
1405        record.extend_from_slice(&[0x11, 0x11, 0x11, 0x11, 0x11]);
1406
1407        // Quality: 10 bytes of qual 30
1408        record.extend_from_slice(&[30u8; 10]);
1409
1410        record
1411    }
1412
1413    #[test]
1414    fn test_create_indexing_bam_writer() -> Result<()> {
1415        let temp_file = NamedTempFile::new()?;
1416        let header = create_test_header();
1417
1418        let writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2);
1419        assert!(writer.is_ok());
1420
1421        // Just finish without writing records
1422        let writer = writer.unwrap();
1423        let index = writer.finish();
1424        assert!(index.is_ok());
1425
1426        Ok(())
1427    }
1428
1429    #[test]
1430    fn test_indexing_bam_writer_with_records() -> Result<()> {
1431        let temp_file = NamedTempFile::new()?;
1432        let header = create_test_header();
1433
1434        let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
1435
1436        // Write some test records
1437        for i in 0..100 {
1438            let record = create_test_bam_record(0, i * 100, format!("read{i}").as_bytes());
1439            writer.write_raw_record(&record)?;
1440        }
1441
1442        let index = writer.finish()?;
1443
1444        // Verify index has reference sequences
1445        assert!(!index.reference_sequences().is_empty());
1446
1447        Ok(())
1448    }
1449
1450    #[test]
1451    fn test_indexing_bam_writer_produces_valid_bam() -> Result<()> {
1452        let temp_file = NamedTempFile::new()?;
1453        let header = create_test_header();
1454
1455        // Write BAM with index
1456        {
1457            let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
1458
1459            for i in 0..10 {
1460                let record = create_test_bam_record(0, i * 100, format!("read{i}").as_bytes());
1461                writer.write_raw_record(&record)?;
1462            }
1463
1464            let _index = writer.finish()?;
1465        }
1466
1467        // Read back and verify
1468        let (mut reader, read_header) = create_bam_reader(temp_file.path(), 1)?;
1469        assert_eq!(read_header.reference_sequences().len(), 1);
1470
1471        let records: Vec<_> = reader.records().collect();
1472        assert_eq!(records.len(), 10);
1473
1474        Ok(())
1475    }
1476
1477    #[test]
1478    fn test_indexing_bam_writer_multithreaded() -> Result<()> {
1479        let temp_file = NamedTempFile::new()?;
1480        let header = create_test_header();
1481
1482        let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 4)?;
1483
1484        // Write enough records to potentially trigger multiple blocks
1485        for i in 0..1000 {
1486            let record = create_test_bam_record(0, i * 10, format!("read{i:04}").as_bytes());
1487            writer.write_raw_record(&record)?;
1488        }
1489
1490        let index = writer.finish()?;
1491
1492        // Verify index has reference sequences with bins
1493        assert!(!index.reference_sequences().is_empty());
1494
1495        Ok(())
1496    }
1497
1498    #[test]
1499    fn test_indexing_bam_writer_unmapped_records() -> Result<()> {
1500        let temp_file = NamedTempFile::new()?;
1501        let header = create_test_header();
1502
1503        let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
1504
1505        // Write unmapped record (ref_id = -1)
1506        let record = create_test_bam_record(-1, -1, b"unmapped_read");
1507        writer.write_raw_record(&record)?;
1508
1509        // Write mapped record
1510        let record = create_test_bam_record(0, 100, b"mapped_read");
1511        writer.write_raw_record(&record)?;
1512
1513        let index = writer.finish()?;
1514        assert!(!index.reference_sequences().is_empty());
1515
1516        Ok(())
1517    }
1518
1519    #[test]
1520    fn test_write_raw_bytes() -> Result<()> {
1521        let temp = tempfile::NamedTempFile::new()?;
1522        let header = noodles::sam::Header::default();
1523        let mut writer = create_raw_bam_writer(temp.path(), &header, 1, 1)?;
1524
1525        // Write pre-formatted BAM record bytes (4-byte LE length + record)
1526        let record_bytes = vec![1, 2, 3, 4, 5];
1527        let mut formatted = Vec::new();
1528        #[allow(clippy::cast_possible_truncation)]
1529        let len = record_bytes.len() as u32;
1530        formatted.extend_from_slice(&len.to_le_bytes());
1531        formatted.extend_from_slice(&record_bytes);
1532
1533        writer.write_raw_bytes(&formatted)?;
1534        writer.finish()?;
1535
1536        // Verify file is non-empty (contains header + data + EOF)
1537        assert!(temp.path().metadata()?.len() > 0);
1538        Ok(())
1539    }
1540
1541    #[test]
1542    fn test_is_stdout_path() {
1543        assert!(is_stdout_path("-"));
1544        assert!(is_stdout_path("/dev/stdout"));
1545        assert!(is_stdout_path(Path::new("-")));
1546        assert!(is_stdout_path(Path::new("/dev/stdout")));
1547
1548        assert!(!is_stdout_path("output.bam"));
1549        assert!(!is_stdout_path("/path/to/file.bam"));
1550        assert!(!is_stdout_path(""));
1551        assert!(!is_stdout_path("/dev/null"));
1552    }
1553
1554    #[test]
1555    fn test_indexing_writer_rejects_stdout() {
1556        let header = Header::default();
1557        let result = create_indexing_bam_writer("-", &header, 6, 2);
1558        assert!(result.is_err());
1559        let err = result.err().unwrap();
1560        assert!(err.to_string().contains("stdout"));
1561    }
1562}