Skip to main content

rsomics_bam_cat/
lib.rs

1//! `samtools cat` port: concatenate BAMs by copying compressed BGZF blocks
2//! verbatim.
3//!
4//! cat is IO-bound by design — samtools never inflates the alignment records, it
5//! moves the compressed gzip members byte-for-byte (`bgzf_raw_read` /
6//! `bgzf_raw_write`, bam_cat.c). The only re-framed bytes are the single output
7//! header. This port does the same via [`bgzf_copy`]: each input's header is
8//! inflated and dropped, one merged header is written once, and every alignment
9//! frame is copied raw. So the per-record deflate that the
10//! [`rsomics_bamio::raw`] edit path pays (and that loses to samtools) never
11//! happens here.
12//!
13//! Header rule (bam_cat.c `cat_check_merge_hdr`): the first file's header is the
14//! base; @RG lines present in later files but not the base are appended; @SQ and
15//! everything else come from the first file unchanged. When no @RG merge is
16//! needed — the dominant case of concatenating shards of one alignment — the
17//! first header's raw uncompressed bytes are re-emitted verbatim, so the output
18//! header is byte-identical to samtools `cat --no-PG`.
19
20mod bgzf_copy;
21
22use std::fs::File;
23use std::io::{BufReader, Write};
24use std::path::{Path, PathBuf};
25
26use noodles::bam;
27use noodles::sam::Header;
28use rsomics_common::{Result, RsomicsError};
29use serde::Serialize;
30
31use bgzf_copy::{HeaderReader, HeaderStream, copy_records, write_bgzf};
32
33/// Buffer size for the raw input stream under the frame reader: one ~256 KiB
34/// refill amortises the per-frame (~64 KiB) reads down to fewer syscalls than
35/// samtools' default BGZF read path, which is the lever for a same-machine win
36/// on an otherwise IO-bound copy.
37const READ_BUFFER: usize = 1024 * 1024;
38const WRITE_BUFFER: usize = 1024 * 1024;
39
40#[derive(Debug, Default, Clone, Serialize)]
41pub struct CatStats {
42    pub inputs: u64,
43}
44
45#[derive(Debug, Clone)]
46pub struct CatOpts {
47    /// Header source file (`-h`): use this header instead of the first input's.
48    pub header_file: Option<PathBuf>,
49    /// Omit the @PG line (`--no-PG`). Always set in compat runs.
50    pub no_pg: bool,
51}
52
53/// Read just the uncompressed BAM header from `path`, returning the parsed
54/// [`Header`] and the raw header bytes (everything before the first alignment
55/// record: `magic l_text text n_ref refs`). The raw bytes let the writer
56/// re-emit the first file's header verbatim when no @RG merge is needed.
57fn read_header_bytes(path: &Path) -> Result<(Header, Vec<u8>)> {
58    let file = File::open(path)
59        .map_err(|e| RsomicsError::InvalidInput(format!("{}: {e}", path.display())))?;
60    let mut reader = BufReader::with_capacity(READ_BUFFER, file);
61    let mut state = HeaderReader::new();
62    let header = {
63        let stream = HeaderStream::new(&mut reader, &mut state);
64        let mut bam_reader = bam::io::Reader::from(stream);
65        bam_reader.read_header().map_err(RsomicsError::Io)?
66    };
67    let raw = state.into_consumed_header();
68    Ok((header, raw))
69}
70
71/// bam_cat.c `cat_check_merge_hdr`: append @RG lines from `src` that the base
72/// header does not already carry (matched by ID). Returns whether any line was
73/// added — when nothing is added the base header is emitted verbatim.
74fn merge_read_groups(base: &mut Header, src: &Header) -> bool {
75    let mut added = false;
76    for (id, map) in src.read_groups() {
77        if !base.read_groups().contains_key(id) {
78            base.read_groups_mut().insert(id.clone(), map.clone());
79            added = true;
80        }
81    }
82    added
83}
84
85pub fn cat(inputs: &[PathBuf], output_path: Option<&Path>, opts: &CatOpts) -> Result<CatStats> {
86    if inputs.is_empty() {
87        return Err(RsomicsError::InvalidInput("no input BAM files".to_string()));
88    }
89
90    // Build the output header: from -h if given, else the first input's header.
91    // RG lines from every input are merged in; @SQ etc. stay as the base's.
92    let (mut out_header, base_raw, mut verbatim) = match &opts.header_file {
93        Some(hf) => {
94            let (h, _raw) = read_header_bytes(hf)?;
95            (h, Vec::new(), false)
96        }
97        None => {
98            let (h, raw) = read_header_bytes(&inputs[0])?;
99            (h, raw, true)
100        }
101    };
102
103    for path in &inputs[1..] {
104        let (h, _raw) = read_header_bytes(path)?;
105        if merge_read_groups(&mut out_header, &h) {
106            verbatim = false;
107        }
108    }
109
110    let pg = (!opts.no_pg).then(pg_line);
111    if pg.is_some() {
112        verbatim = false;
113    }
114
115    match output_path {
116        Some(path) => {
117            let file = File::create(path).map_err(|e| {
118                RsomicsError::InvalidInput(format!("creating {}: {e}", path.display()))
119            })?;
120            let mut out = std::io::BufWriter::with_capacity(WRITE_BUFFER, file);
121            let stats = write_all(
122                inputs,
123                &mut out,
124                &out_header,
125                &base_raw,
126                verbatim,
127                pg.as_deref(),
128            )?;
129            out.flush().map_err(RsomicsError::Io)?;
130            Ok(stats)
131        }
132        None => {
133            let stdout = std::io::stdout();
134            let mut out = std::io::BufWriter::with_capacity(WRITE_BUFFER, stdout.lock());
135            let stats = write_all(
136                inputs,
137                &mut out,
138                &out_header,
139                &base_raw,
140                verbatim,
141                pg.as_deref(),
142            )?;
143            out.flush().map_err(RsomicsError::Io)?;
144            Ok(stats)
145        }
146    }
147}
148
149/// samtools writes a `@PG ID:samtools PN:samtools` line; we record our own tool
150/// so a downstream reader sees the provenance. The exact text differs from
151/// samtools (different program), so compat runs pass `--no-PG`.
152fn pg_line() -> String {
153    format!(
154        "@PG\tID:rsomics-bam-cat\tPN:rsomics-bam-cat\tVN:{}\n",
155        env!("CARGO_PKG_VERSION")
156    )
157}
158
159fn write_all<W: Write>(
160    inputs: &[PathBuf],
161    out: &mut W,
162    out_header: &Header,
163    base_raw: &[u8],
164    verbatim: bool,
165    pg: Option<&str>,
166) -> Result<CatStats> {
167    write_output_header(out, out_header, base_raw, verbatim, pg)?;
168
169    for path in inputs {
170        let file = File::open(path)
171            .map_err(|e| RsomicsError::InvalidInput(format!("{}: {e}", path.display())))?;
172        let mut reader = BufReader::with_capacity(READ_BUFFER, file);
173        let mut state = HeaderReader::new();
174        {
175            let stream = HeaderStream::new(&mut reader, &mut state);
176            let mut bam_reader = bam::io::Reader::from(stream);
177            bam_reader.read_header().map_err(RsomicsError::Io)?;
178        }
179        copy_records(state, &mut reader, out)?;
180    }
181
182    // One trailing BGZF EOF marker for the whole concatenation.
183    out.write_all(&bgzf_copy::BGZF_EOF)
184        .map_err(RsomicsError::Io)?;
185
186    Ok(CatStats {
187        inputs: inputs.len() as u64,
188    })
189}
190
191/// Emit the output BAM header as one or more BGZF blocks. When `verbatim` (no
192/// @RG merge, no @PG, header straight from the first input) the first file's
193/// raw header bytes are re-framed unchanged — byte-identical to samtools
194/// `cat --no-PG`. Otherwise the merged header is serialised via the BAM writer.
195fn write_output_header<W: Write>(
196    out: &mut W,
197    out_header: &Header,
198    base_raw: &[u8],
199    verbatim: bool,
200    pg: Option<&str>,
201) -> Result<()> {
202    if verbatim {
203        write_bgzf(out, base_raw)?;
204        return Ok(());
205    }
206
207    // Serialise the (possibly @RG-merged) header into BAM header bytes, then
208    // optionally splice the @PG line in before re-framing as BGZF.
209    let mut buf = Vec::new();
210    {
211        let mut hw = bam::io::Writer::new(&mut buf);
212        hw.write_header(out_header).map_err(RsomicsError::Io)?;
213    }
214    let raw = strip_bgzf_to_uncompressed(&buf)?;
215    let raw = match pg {
216        Some(line) => splice_pg(&raw, line)?,
217        None => raw,
218    };
219    write_bgzf(out, &raw)?;
220    Ok(())
221}
222
223/// The BAM writer emits a BGZF stream (header block(s) + EOF). Inflate it back
224/// to the raw uncompressed header bytes so it can be re-framed alongside our
225/// own block writer. Header-sized, so the inflate cost is negligible.
226fn strip_bgzf_to_uncompressed(bgzf: &[u8]) -> Result<Vec<u8>> {
227    use std::io::Read;
228    let mut out = Vec::new();
229    noodles::bgzf::io::Reader::new(bgzf)
230        .read_to_end(&mut out)
231        .map_err(RsomicsError::Io)?;
232    Ok(out)
233}
234
235/// Insert `pg` (a full `@PG…\n` line) into the raw header text, after the last
236/// existing header line and before the binary `n_ref`/refs block. Mirrors
237/// htslib appending @PG to the text region.
238fn splice_pg(raw: &[u8], pg: &str) -> Result<Vec<u8>> {
239    // Layout: magic(4) l_text(4) text(l_text) n_ref(4) refs…
240    let l_text = u32::from_le_bytes(raw[4..8].try_into().unwrap()) as usize;
241    let text_start = 8;
242    let text_end = text_start + l_text;
243    let text = &raw[text_start..text_end];
244    let mut new_text = Vec::with_capacity(text.len() + pg.len());
245    new_text.extend_from_slice(text);
246    if !new_text.ends_with(b"\n") && !new_text.is_empty() {
247        new_text.push(b'\n');
248    }
249    new_text.extend_from_slice(pg.as_bytes());
250
251    let mut out = Vec::with_capacity(raw.len() + pg.len());
252    out.extend_from_slice(&raw[..4]); // magic
253    out.extend_from_slice(&u32::try_from(new_text.len()).unwrap().to_le_bytes());
254    out.extend_from_slice(&new_text);
255    out.extend_from_slice(&raw[text_end..]); // n_ref + refs
256    Ok(out)
257}