1mod bgzf_copy;
21
22use std::fs::File;
23use std::io::{BufReader, Write};
24use std::path::{Path, PathBuf};
25
26use noodles::bam;
27use noodles::sam::Header;
28use rsomics_common::{Result, RsomicsError};
29use serde::Serialize;
30
31use bgzf_copy::{HeaderReader, HeaderStream, copy_records, write_bgzf};
32
33const READ_BUFFER: usize = 1024 * 1024;
38const WRITE_BUFFER: usize = 1024 * 1024;
39
40#[derive(Debug, Default, Clone, Serialize)]
41pub struct CatStats {
42 pub inputs: u64,
43}
44
45#[derive(Debug, Clone)]
46pub struct CatOpts {
47 pub header_file: Option<PathBuf>,
49 pub no_pg: bool,
51}
52
53fn read_header_bytes(path: &Path) -> Result<(Header, Vec<u8>)> {
58 let file = File::open(path)
59 .map_err(|e| RsomicsError::InvalidInput(format!("{}: {e}", path.display())))?;
60 let mut reader = BufReader::with_capacity(READ_BUFFER, file);
61 let mut state = HeaderReader::new();
62 let header = {
63 let stream = HeaderStream::new(&mut reader, &mut state);
64 let mut bam_reader = bam::io::Reader::from(stream);
65 bam_reader.read_header().map_err(RsomicsError::Io)?
66 };
67 let raw = state.into_consumed_header();
68 Ok((header, raw))
69}
70
71fn merge_read_groups(base: &mut Header, src: &Header) -> bool {
75 let mut added = false;
76 for (id, map) in src.read_groups() {
77 if !base.read_groups().contains_key(id) {
78 base.read_groups_mut().insert(id.clone(), map.clone());
79 added = true;
80 }
81 }
82 added
83}
84
85pub fn cat(inputs: &[PathBuf], output_path: Option<&Path>, opts: &CatOpts) -> Result<CatStats> {
86 if inputs.is_empty() {
87 return Err(RsomicsError::InvalidInput("no input BAM files".to_string()));
88 }
89
90 let (mut out_header, base_raw, mut verbatim) = match &opts.header_file {
93 Some(hf) => {
94 let (h, _raw) = read_header_bytes(hf)?;
95 (h, Vec::new(), false)
96 }
97 None => {
98 let (h, raw) = read_header_bytes(&inputs[0])?;
99 (h, raw, true)
100 }
101 };
102
103 for path in &inputs[1..] {
104 let (h, _raw) = read_header_bytes(path)?;
105 if merge_read_groups(&mut out_header, &h) {
106 verbatim = false;
107 }
108 }
109
110 let pg = (!opts.no_pg).then(pg_line);
111 if pg.is_some() {
112 verbatim = false;
113 }
114
115 match output_path {
116 Some(path) => {
117 let file = File::create(path).map_err(|e| {
118 RsomicsError::InvalidInput(format!("creating {}: {e}", path.display()))
119 })?;
120 let mut out = std::io::BufWriter::with_capacity(WRITE_BUFFER, file);
121 let stats = write_all(
122 inputs,
123 &mut out,
124 &out_header,
125 &base_raw,
126 verbatim,
127 pg.as_deref(),
128 )?;
129 out.flush().map_err(RsomicsError::Io)?;
130 Ok(stats)
131 }
132 None => {
133 let stdout = std::io::stdout();
134 let mut out = std::io::BufWriter::with_capacity(WRITE_BUFFER, stdout.lock());
135 let stats = write_all(
136 inputs,
137 &mut out,
138 &out_header,
139 &base_raw,
140 verbatim,
141 pg.as_deref(),
142 )?;
143 out.flush().map_err(RsomicsError::Io)?;
144 Ok(stats)
145 }
146 }
147}
148
149fn pg_line() -> String {
153 format!(
154 "@PG\tID:rsomics-bam-cat\tPN:rsomics-bam-cat\tVN:{}\n",
155 env!("CARGO_PKG_VERSION")
156 )
157}
158
159fn write_all<W: Write>(
160 inputs: &[PathBuf],
161 out: &mut W,
162 out_header: &Header,
163 base_raw: &[u8],
164 verbatim: bool,
165 pg: Option<&str>,
166) -> Result<CatStats> {
167 write_output_header(out, out_header, base_raw, verbatim, pg)?;
168
169 for path in inputs {
170 let file = File::open(path)
171 .map_err(|e| RsomicsError::InvalidInput(format!("{}: {e}", path.display())))?;
172 let mut reader = BufReader::with_capacity(READ_BUFFER, file);
173 let mut state = HeaderReader::new();
174 {
175 let stream = HeaderStream::new(&mut reader, &mut state);
176 let mut bam_reader = bam::io::Reader::from(stream);
177 bam_reader.read_header().map_err(RsomicsError::Io)?;
178 }
179 copy_records(state, &mut reader, out)?;
180 }
181
182 out.write_all(&bgzf_copy::BGZF_EOF)
184 .map_err(RsomicsError::Io)?;
185
186 Ok(CatStats {
187 inputs: inputs.len() as u64,
188 })
189}
190
191fn write_output_header<W: Write>(
196 out: &mut W,
197 out_header: &Header,
198 base_raw: &[u8],
199 verbatim: bool,
200 pg: Option<&str>,
201) -> Result<()> {
202 if verbatim {
203 write_bgzf(out, base_raw)?;
204 return Ok(());
205 }
206
207 let mut buf = Vec::new();
210 {
211 let mut hw = bam::io::Writer::new(&mut buf);
212 hw.write_header(out_header).map_err(RsomicsError::Io)?;
213 }
214 let raw = strip_bgzf_to_uncompressed(&buf)?;
215 let raw = match pg {
216 Some(line) => splice_pg(&raw, line)?,
217 None => raw,
218 };
219 write_bgzf(out, &raw)?;
220 Ok(())
221}
222
223fn strip_bgzf_to_uncompressed(bgzf: &[u8]) -> Result<Vec<u8>> {
227 use std::io::Read;
228 let mut out = Vec::new();
229 noodles::bgzf::io::Reader::new(bgzf)
230 .read_to_end(&mut out)
231 .map_err(RsomicsError::Io)?;
232 Ok(out)
233}
234
235fn splice_pg(raw: &[u8], pg: &str) -> Result<Vec<u8>> {
239 let l_text = u32::from_le_bytes(raw[4..8].try_into().unwrap()) as usize;
241 let text_start = 8;
242 let text_end = text_start + l_text;
243 let text = &raw[text_start..text_end];
244 let mut new_text = Vec::with_capacity(text.len() + pg.len());
245 new_text.extend_from_slice(text);
246 if !new_text.ends_with(b"\n") && !new_text.is_empty() {
247 new_text.push(b'\n');
248 }
249 new_text.extend_from_slice(pg.as_bytes());
250
251 let mut out = Vec::with_capacity(raw.len() + pg.len());
252 out.extend_from_slice(&raw[..4]); out.extend_from_slice(&u32::try_from(new_text.len()).unwrap().to_le_bytes());
254 out.extend_from_slice(&new_text);
255 out.extend_from_slice(&raw[text_end..]); Ok(out)
257}