1use anyhow::{Context, Result};
19use noodles::bam::bai;
20use noodles::bgzf::VirtualPosition;
22use noodles::core::Position;
23use noodles::sam::Header;
24use noodles_bgzf::io::{MultithreadedReader, Reader as BgzfReader, Writer as BgzfWriter};
26use crate::vendored::{BlockInfoRx, MultithreadedWriter, MultithreadedWriterBuilder};
29use fgumi_raw_bam::RawBamReader;
32use bgzf::CompressionLevel;
34use noodles_csi::binning_index::Indexer;
35use noodles_csi::binning_index::index::reference_sequence::bin::Chunk;
36use noodles_csi::binning_index::index::reference_sequence::index::LinearIndex;
37use std::collections::HashMap;
38use std::fs::File;
39use std::io::{self, BufRead, Read, Write};
40use std::num::NonZero;
41use std::path::Path;
42
43const MAX_BLOCK_SIZE: usize = 65280;
45
46pub enum BgzfReaderEnum {
50 SingleThreaded(BgzfReader<File>),
52 MultiThreaded(MultithreadedReader<File>),
54}
55
56impl Read for BgzfReaderEnum {
57 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
58 match self {
59 BgzfReaderEnum::SingleThreaded(r) => r.read(buf),
60 BgzfReaderEnum::MultiThreaded(r) => r.read(buf),
61 }
62 }
63}
64
65impl BufRead for BgzfReaderEnum {
66 fn fill_buf(&mut self) -> io::Result<&[u8]> {
67 match self {
68 BgzfReaderEnum::SingleThreaded(r) => r.fill_buf(),
69 BgzfReaderEnum::MultiThreaded(r) => r.fill_buf(),
70 }
71 }
72
73 fn consume(&mut self, amt: usize) {
74 match self {
75 BgzfReaderEnum::SingleThreaded(r) => r.consume(amt),
76 BgzfReaderEnum::MultiThreaded(r) => r.consume(amt),
77 }
78 }
79}
80
81pub type BamReaderAuto = noodles::bam::io::Reader<BgzfReaderEnum>;
83
84pub enum BgzfWriterEnum {
88 SingleThreaded(BgzfWriter<Box<dyn Write + Send>>),
90 MultiThreaded(MultithreadedWriter<Box<dyn Write + Send>>),
92}
93
94impl Write for BgzfWriterEnum {
95 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
96 match self {
97 BgzfWriterEnum::SingleThreaded(w) => w.write(buf),
98 BgzfWriterEnum::MultiThreaded(w) => w.write(buf),
99 }
100 }
101
102 fn flush(&mut self) -> io::Result<()> {
103 match self {
104 BgzfWriterEnum::SingleThreaded(w) => w.flush(),
105 BgzfWriterEnum::MultiThreaded(w) => w.flush(),
106 }
107 }
108}
109
110impl BgzfWriterEnum {
111 pub fn finish(self) -> io::Result<()> {
118 match self {
119 BgzfWriterEnum::SingleThreaded(mut w) => {
120 w.flush()?;
121 Ok(())
123 }
124 BgzfWriterEnum::MultiThreaded(mut w) => {
125 w.finish().map_err(|e| io::Error::other(e.to_string()))?;
126 Ok(())
127 }
128 }
129 }
130}
131
132pub type BamWriter = noodles::bam::io::Writer<BgzfWriterEnum>;
134
135fn make_bgzf_reader(file: File, threads: usize) -> BgzfReaderEnum {
137 if threads > 1 {
138 let worker_count = NonZero::new(threads).expect("threads > 1 checked above");
139 BgzfReaderEnum::MultiThreaded(MultithreadedReader::with_worker_count(worker_count, file))
140 } else {
141 BgzfReaderEnum::SingleThreaded(BgzfReader::new(file))
142 }
143}
144
145#[allow(clippy::cast_possible_truncation)]
148fn make_bgzf_writer(
149 output: Box<dyn Write + Send>,
150 threads: usize,
151 compression_level: u32,
152) -> BgzfWriterEnum {
153 if threads > 1 {
154 let worker_count = NonZero::new(threads).expect("threads > 1 checked above");
155 let mut builder = MultithreadedWriterBuilder::default().set_worker_count(worker_count);
156
157 if let Ok(cl) = CompressionLevel::new(compression_level as u8) {
158 builder = builder.set_compression_level(cl);
159 }
160
161 BgzfWriterEnum::MultiThreaded(builder.build_from_writer(output))
162 } else {
163 let level = noodles_bgzf::io::writer::CompressionLevel::new(compression_level as u8)
164 .unwrap_or_default();
165 let writer = noodles_bgzf::io::writer::Builder::default()
166 .set_compression_level(level)
167 .build_from_writer(output);
168 BgzfWriterEnum::SingleThreaded(writer)
169 }
170}
171
172#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
181fn write_bam_header(writer: &mut impl Write, header: &Header) -> io::Result<()> {
182 writer.write_all(fgumi_raw_bam::BAM_MAGIC)?;
184
185 let mut sam_writer = noodles::sam::io::Writer::new(Vec::new());
187 sam_writer.write_header(header)?;
188 let header_bytes = sam_writer.into_inner();
189 let l_text = header_bytes.len() as i32;
190 writer.write_all(&l_text.to_le_bytes())?;
191 writer.write_all(&header_bytes)?;
192
193 let n_ref = header.reference_sequences().len() as i32;
195 writer.write_all(&n_ref.to_le_bytes())?;
196
197 for (name, map) in header.reference_sequences() {
198 let l_name = (name.len() + 1) as u32;
200 writer.write_all(&l_name.to_le_bytes())?;
201 writer.write_all(name)?;
202 writer.write_all(&[0u8])?; let l_ref = map.length().get() as i32;
206 writer.write_all(&l_ref.to_le_bytes())?;
207 }
208
209 Ok(())
210}
211
212pub struct RawBamWriter {
219 inner: BgzfWriterEnum,
220}
221
222impl RawBamWriter {
223 #[must_use]
225 pub fn new(inner: BgzfWriterEnum) -> Self {
226 Self { inner }
227 }
228
229 pub fn write_header(&mut self, header: &Header) -> io::Result<()> {
234 write_bam_header(&mut self.inner, header)
235 }
236
237 #[inline]
244 #[allow(clippy::cast_possible_truncation)]
245 pub fn write_raw_record(&mut self, record_bytes: &[u8]) -> io::Result<()> {
246 use std::io::Write;
247
248 let block_size = record_bytes.len() as u32;
250 self.inner.write_all(&block_size.to_le_bytes())?;
251
252 self.inner.write_all(record_bytes)
254 }
255
256 pub fn write_raw_bytes(&mut self, data: &[u8]) -> io::Result<()> {
265 use std::io::Write;
266 self.inner.write_all(data)
267 }
268
269 pub fn finish(self) -> io::Result<()> {
274 self.inner.finish()
275 }
276}
277
278pub fn create_raw_bam_writer<P: AsRef<Path>>(
289 path: P,
290 header: &Header,
291 threads: usize,
292 compression_level: u32,
293) -> Result<RawBamWriter> {
294 let path_ref = path.as_ref();
295 let output = open_output_writer(path_ref)?;
296
297 let bgzf_writer = make_bgzf_writer(output, threads, compression_level);
298
299 let mut writer = RawBamWriter::new(bgzf_writer);
300 writer
301 .write_header(header)
302 .with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
303 Ok(writer)
304}
305
306#[derive(Debug)]
312struct CachedIndexEntry {
313 block_number: u64,
315 offset_in_block: usize,
317 record_len: usize,
319 alignment_context: Option<(usize, Position, Position, bool)>,
321}
322
323pub struct IndexingBamWriter {
343 inner: MultithreadedWriter<File>,
344 block_info_rx: BlockInfoRx,
345 indexer: Indexer<LinearIndex>,
346 num_refs: usize,
347 entry_cache: Vec<CachedIndexEntry>,
349 block_positions: HashMap<u64, u64>,
350 current_block_number: u64,
351 current_block_offset: usize,
352}
353
354impl IndexingBamWriter {
355 fn new(inner: MultithreadedWriter<File>, num_refs: usize) -> Self {
357 let block_info_rx = inner.block_info_receiver().unwrap().clone();
358 Self {
359 current_block_number: inner.current_block_number(),
360 current_block_offset: inner.buffer_offset(),
361 inner,
362 block_info_rx,
363 indexer: Indexer::default(),
364 num_refs,
365 entry_cache: Vec::new(),
366 block_positions: HashMap::new(),
367 }
368 }
369
370 fn write_header(&mut self, header: &Header) -> io::Result<()> {
372 write_bam_header(&mut self.inner, header)?;
373
374 self.inner.flush()?;
376
377 self.current_block_number = self.inner.current_block_number();
379 self.current_block_offset = self.inner.buffer_offset();
380
381 Ok(())
382 }
383
384 #[inline]
395 #[allow(clippy::cast_possible_truncation)]
396 pub fn write_raw_record(&mut self, record_bytes: &[u8]) -> io::Result<()> {
397 let block_number = self.current_block_number;
399 let offset_in_block = self.current_block_offset;
400
401 let block_size = record_bytes.len() as u32;
403 self.inner.write_all(&block_size.to_le_bytes())?;
404 self.inner.write_all(record_bytes)?;
405
406 let record_len = 4 + record_bytes.len();
407 self.current_block_offset += record_len;
408
409 let new_block_number = self.inner.current_block_number();
411 if new_block_number > self.current_block_number {
412 self.current_block_number = new_block_number;
414 self.current_block_offset = self.inner.buffer_offset();
415 }
416
417 let alignment_context = Self::extract_alignment_context(record_bytes);
419 self.entry_cache.push(CachedIndexEntry {
420 block_number,
421 offset_in_block,
422 record_len,
423 alignment_context,
424 });
425
426 self.flush_completed_blocks()?;
428
429 Ok(())
430 }
431
432 pub fn write_record_buf(
440 &mut self,
441 header: &Header,
442 record: &noodles::sam::alignment::record_buf::RecordBuf,
443 ) -> io::Result<()> {
444 use crate::vendored::bam_codec::encode_record_buf;
445
446 let mut buf = Vec::new();
448 encode_record_buf(&mut buf, header, record)?;
449
450 self.write_raw_record(&buf)
452 }
453
454 #[allow(clippy::cast_possible_truncation)]
456 fn flush_completed_blocks(&mut self) -> io::Result<()> {
457 while let Ok(info) = self.block_info_rx.try_recv() {
459 self.block_positions.insert(info.block_number, info.compressed_start);
460 }
461
462 let mut i = 0;
464 while i < self.entry_cache.len() {
465 let entry = &self.entry_cache[i];
466
467 if let Some(&block_start) = self.block_positions.get(&entry.block_number) {
468 let start_vpos =
470 VirtualPosition::try_from((block_start, entry.offset_in_block as u16))
471 .unwrap_or(VirtualPosition::MIN);
472
473 let end_offset = entry.offset_in_block + entry.record_len;
475 let end_vpos = if end_offset <= MAX_BLOCK_SIZE {
476 VirtualPosition::try_from((block_start, end_offset as u16))
477 .unwrap_or(VirtualPosition::MIN)
478 } else {
479 let next_block = entry.block_number + 1;
481 if let Some(&next_start) = self.block_positions.get(&next_block) {
482 let overflow = end_offset - MAX_BLOCK_SIZE;
483 VirtualPosition::try_from((next_start, overflow as u16))
484 .unwrap_or(VirtualPosition::MIN)
485 } else {
486 i += 1;
488 continue;
489 }
490 };
491
492 let chunk = Chunk::new(start_vpos, end_vpos);
493 if let Some(ctx) = entry.alignment_context {
494 self.indexer.add_record(Some(ctx), chunk).map_err(io::Error::other)?;
495 } else {
496 self.indexer.add_record(None, chunk).map_err(io::Error::other)?;
498 }
499
500 self.entry_cache.remove(i);
501 } else {
502 i += 1;
503 }
504 }
505
506 Ok(())
507 }
508
509 #[inline]
514 #[allow(clippy::cast_sign_loss, clippy::cast_possible_wrap)]
515 fn extract_alignment_context(bam: &[u8]) -> Option<(usize, Position, Position, bool)> {
516 let tid = fgumi_raw_bam::ref_id(bam);
518 let pos = fgumi_raw_bam::pos(bam);
519 let flags = fgumi_raw_bam::flags(bam);
520
521 let is_unmapped = (flags & fgumi_raw_bam::flags::UNMAPPED) != 0;
522
523 if tid < 0 || is_unmapped {
525 return None;
526 }
527
528 let ref_len = fgumi_raw_bam::reference_length_from_raw_bam(bam);
531
532 let start = Position::try_from((pos + 1) as usize).ok()?;
534 let end = Position::try_from((pos + ref_len) as usize).ok()?;
535
536 Some((tid as usize, start, end, true))
537 }
538
539 pub fn finish(mut self) -> io::Result<bai::Index> {
544 self.inner.flush()?;
546
547 for _ in 0..100 {
550 self.flush_completed_blocks()?;
551 if self.entry_cache.is_empty() {
552 break;
553 }
554 std::thread::sleep(std::time::Duration::from_millis(10));
555 }
556
557 let _ = self.inner.finish().map_err(|e| io::Error::other(e.to_string()))?;
559
560 self.flush_completed_blocks()?;
562
563 if !self.entry_cache.is_empty() {
565 return Err(io::Error::other(format!(
566 "Unflushed index entries remain: {} entries",
567 self.entry_cache.len()
568 )));
569 }
570
571 let index = self.indexer.build(self.num_refs);
573 Ok(index)
574 }
575}
576
577pub fn create_indexing_bam_writer<P: AsRef<Path>>(
602 path: P,
603 header: &Header,
604 compression_level: u32,
605 threads: usize,
606) -> Result<IndexingBamWriter> {
607 let path_ref = path.as_ref();
608 if is_stdout_path(path_ref) {
609 anyhow::bail!(
610 "Cannot create an indexing BAM writer for stdout (indexing requires a seekable file)"
611 );
612 }
613 let output_file = File::create(path_ref)
614 .with_context(|| format!("Failed to create output BAM: {}", path_ref.display()))?;
615
616 let worker_count = NonZero::new(threads.max(1)).expect("threads.max(1) >= 1");
618 let mut builder = MultithreadedWriterBuilder::default().set_worker_count(worker_count);
619
620 #[allow(clippy::cast_possible_truncation)]
621 if let Ok(cl) = CompressionLevel::new(compression_level as u8) {
622 builder = builder.set_compression_level(cl);
623 }
624
625 let bgzf_writer = builder.build_from_writer(output_file);
626
627 let num_refs = header.reference_sequences().len();
628 let mut writer = IndexingBamWriter::new(bgzf_writer, num_refs);
629 writer
630 .write_header(header)
631 .with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
632
633 Ok(writer)
634}
635
636pub fn write_bai_index<P: AsRef<Path>>(path: P, index: &bai::Index) -> Result<()> {
645 let path_ref = path.as_ref();
646 let file = File::create(path_ref)
647 .with_context(|| format!("Failed to create index file: {}", path_ref.display()))?;
648 let mut writer = bai::io::Writer::new(file);
649 writer
650 .write_index(index)
651 .with_context(|| format!("Failed to write index to: {}", path_ref.display()))?;
652 Ok(())
653}
654
655pub fn create_bam_reader<P: AsRef<Path>>(
686 path: P,
687 threads: usize,
688) -> Result<(BamReaderAuto, Header)> {
689 let path_ref = path.as_ref();
690 let file = File::open(path_ref)
691 .with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
692
693 let bgzf_reader = make_bgzf_reader(file, threads);
694
695 let mut reader = noodles::bam::io::Reader::from(bgzf_reader);
696 let header = reader
697 .read_header()
698 .with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
699
700 Ok((reader, header))
701}
702
703pub type RawBamReaderAuto = RawBamReader<BgzfReaderEnum>;
705
706pub fn create_raw_bam_reader<P: AsRef<Path>>(
724 path: P,
725 threads: usize,
726) -> Result<(RawBamReaderAuto, Header)> {
727 let path_ref = path.as_ref();
728 let file = File::open(path_ref)
729 .with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
730
731 let bgzf_reader = make_bgzf_reader(file, threads);
732
733 let mut noodles_reader = noodles::bam::io::Reader::from(bgzf_reader);
735 let header = noodles_reader
736 .read_header()
737 .with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
738
739 let bgzf_reader = noodles_reader.into_inner();
741
742 let raw_reader = RawBamReader::new(bgzf_reader);
744
745 Ok((raw_reader, header))
746}
747
748pub fn create_bam_writer<P: AsRef<Path>>(
778 path: P,
779 header: &Header,
780 threads: usize,
781 compression_level: u32,
782) -> Result<BamWriter> {
783 let path_ref = path.as_ref();
784 let output = open_output_writer(path_ref)?;
785
786 let bgzf_writer = make_bgzf_writer(output, threads, compression_level);
787
788 let mut writer = noodles::bam::io::Writer::from(bgzf_writer);
789 writer
790 .write_header(header)
791 .with_context(|| format!("Failed to write header to: {}", path_ref.display()))?;
792 Ok(writer)
793}
794
795pub fn create_optional_bam_writer<P: AsRef<Path>>(
824 path: Option<P>,
825 header: &Header,
826 threads: usize,
827 compression_level: u32,
828) -> Result<Option<BamWriter>> {
829 match path {
830 Some(p) => Ok(Some(create_bam_writer(p, header, threads, compression_level)?)),
831 None => Ok(None),
832 }
833}
834
835pub fn is_stdin_path<P: AsRef<Path>>(path: P) -> bool {
849 let path_str = path.as_ref().to_string_lossy();
850 path_str == "-" || path_str == "/dev/stdin"
851}
852
853pub fn is_stdout_path<P: AsRef<Path>>(path: P) -> bool {
867 let path_str = path.as_ref().to_string_lossy();
868 path_str == "-" || path_str == "/dev/stdout"
869}
870
871fn open_output_writer<P: AsRef<Path>>(path: P) -> Result<Box<dyn Write + Send>> {
873 let path_ref = path.as_ref();
874 if is_stdout_path(path_ref) {
875 Ok(Box::new(std::io::stdout()))
876 } else {
877 let file = File::create(path_ref)
878 .with_context(|| format!("Failed to create output BAM: {}", path_ref.display()))?;
879 Ok(Box::new(file))
880 }
881}
882
883struct TeeReader<R> {
888 inner: R,
889 buffer: Vec<u8>,
890}
891
892impl<R: Read> TeeReader<R> {
893 fn new(inner: R) -> Self {
894 Self { inner, buffer: Vec::new() }
895 }
896
897 fn into_parts(self) -> (Vec<u8>, R) {
899 (self.buffer, self.inner)
900 }
901}
902
903impl<R: Read> Read for TeeReader<R> {
904 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
905 let n = self.inner.read(buf)?;
906 self.buffer.extend_from_slice(&buf[..n]);
907 Ok(n)
908 }
909}
910
911pub struct ChainedReader<R> {
915 buffer: io::Cursor<Vec<u8>>,
916 inner: R,
917 buffer_exhausted: bool,
918}
919
920impl<R: Read> ChainedReader<R> {
921 pub fn new(buffer: Vec<u8>, inner: R) -> Self {
923 Self { buffer: io::Cursor::new(buffer), inner, buffer_exhausted: false }
924 }
925}
926
927impl<R: Read> Read for ChainedReader<R> {
928 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
929 if !self.buffer_exhausted {
930 let n = self.buffer.read(buf)?;
931 if n > 0 {
932 return Ok(n);
933 }
934 self.buffer_exhausted = true;
935 }
936 self.inner.read(buf)
937 }
938}
939
940impl<R: Read + Send> ChainedReader<R> {
941 pub fn into_boxed(self) -> Box<dyn Read + Send>
943 where
944 R: 'static,
945 {
946 Box::new(self)
947 }
948}
949
950pub fn create_bam_reader_for_pipeline<P: AsRef<Path>>(
982 path: P,
983) -> Result<(Box<dyn Read + Send>, Header)> {
984 use std::io::{Seek, SeekFrom};
985
986 let path_ref = path.as_ref();
987
988 if is_stdin_path(path_ref) {
989 let stdin = io::stdin();
991 let tee_reader = TeeReader::new(stdin);
992 let bgzf_reader = BgzfReader::new(tee_reader);
993 let mut bam_reader = noodles::bam::io::Reader::from(bgzf_reader);
994
995 let header =
997 bam_reader.read_header().with_context(|| "Failed to read header from stdin")?;
998
999 let bgzf_reader = bam_reader.into_inner();
1001 let tee_reader = bgzf_reader.into_inner();
1002 let (buffered_bytes, stdin) = tee_reader.into_parts();
1003
1004 let chained = ChainedReader::new(buffered_bytes, stdin);
1006
1007 Ok((Box::new(chained), header))
1008 } else {
1009 let mut file = File::open(path_ref)
1011 .with_context(|| format!("Failed to open input BAM: {}", path_ref.display()))?;
1012
1013 let bgzf_reader = BgzfReader::new(&file);
1015 let mut bam_reader = noodles::bam::io::Reader::from(bgzf_reader);
1016 let header = bam_reader
1017 .read_header()
1018 .with_context(|| format!("Failed to read header from: {}", path_ref.display()))?;
1019
1020 file.seek(SeekFrom::Start(0))
1022 .with_context(|| format!("Failed to seek in file: {}", path_ref.display()))?;
1023
1024 Ok((Box::new(file), header))
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031 use noodles::sam::header::record::value::{Map, map::ReferenceSequence};
1032 use std::num::NonZeroUsize;
1033 use tempfile::NamedTempFile;
1034
1035 fn create_test_header() -> Header {
1036 let mut builder = Header::builder();
1037 let ref_seq = Map::<ReferenceSequence>::new(
1038 NonZeroUsize::new(100).expect("100 is non-zero constant"),
1039 );
1040 builder = builder.add_reference_sequence(b"chr1", ref_seq);
1041 builder.build()
1042 }
1043
1044 #[test]
1045 fn test_create_bam_reader_nonexistent_file() {
1046 let result = create_bam_reader("/nonexistent/file.bam", 1);
1047 assert!(result.is_err());
1048 if let Err(e) = result {
1049 let err_msg = e.to_string();
1050 assert!(err_msg.contains("Failed to open input BAM"));
1051 }
1052 }
1053
1054 #[test]
1055 fn test_create_bam_writer() -> Result<()> {
1056 let temp_file = NamedTempFile::new()?;
1057 let header = create_test_header();
1058
1059 let writer = create_bam_writer(temp_file.path(), &header, 1, 6);
1060 assert!(writer.is_ok());
1061
1062 Ok(())
1063 }
1064
1065 #[test]
1066 fn test_create_bam_writer_multithreaded() -> Result<()> {
1067 let temp_file = NamedTempFile::new()?;
1068 let header = create_test_header();
1069
1070 let writer = create_bam_writer(temp_file.path(), &header, 4, 6);
1071 assert!(writer.is_ok());
1072
1073 Ok(())
1074 }
1075
1076 #[test]
1077 fn test_create_bam_writer_invalid_path() {
1078 let header = create_test_header();
1079 let result = create_bam_writer("/invalid/path/output.bam", &header, 1, 6);
1080 assert!(result.is_err());
1081 if let Err(e) = result {
1082 let err_msg = e.to_string();
1083 assert!(err_msg.contains("Failed to create output BAM"));
1084 }
1085 }
1086
1087 #[test]
1088 fn test_create_optional_bam_writer_some() -> Result<()> {
1089 let temp_file = NamedTempFile::new()?;
1090 let header = create_test_header();
1091
1092 let writer = create_optional_bam_writer(Some(temp_file.path()), &header, 1, 6)?;
1093 assert!(writer.is_some());
1094
1095 Ok(())
1096 }
1097
1098 #[test]
1099 fn test_create_optional_bam_writer_none() -> Result<()> {
1100 let header = create_test_header();
1101
1102 let writer = create_optional_bam_writer(None::<&str>, &header, 1, 6)?;
1103 assert!(writer.is_none());
1104
1105 Ok(())
1106 }
1107
1108 #[test]
1109 fn test_create_optional_bam_writer_invalid_path() {
1110 let header = create_test_header();
1111 let result = create_optional_bam_writer(Some("/invalid/path/output.bam"), &header, 1, 6);
1112 assert!(result.is_err());
1113 }
1114
1115 #[test]
1116 fn test_roundtrip_write_and_read() -> Result<()> {
1117 let temp_file = NamedTempFile::new()?;
1118 let header = create_test_header();
1119
1120 {
1122 let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
1123 }
1125
1126 let (mut reader, read_header) = create_bam_reader(temp_file.path(), 1)?;
1128
1129 assert_eq!(read_header.reference_sequences().len(), 1);
1131
1132 let records: Result<Vec<_>, _> = reader.records().collect();
1134 assert!(records.is_ok());
1135
1136 Ok(())
1137 }
1138
1139 #[test]
1140 fn test_roundtrip_write_and_read_multithreaded() -> Result<()> {
1141 let temp_file = NamedTempFile::new()?;
1142 let header = create_test_header();
1143
1144 {
1146 let _writer = create_bam_writer(temp_file.path(), &header, 4, 6)?;
1147 }
1149
1150 let (mut reader, read_header) = create_bam_reader(temp_file.path(), 4)?;
1152
1153 assert_eq!(read_header.reference_sequences().len(), 1);
1155
1156 let records: Result<Vec<_>, _> = reader.records().collect();
1158 assert!(records.is_ok());
1159
1160 Ok(())
1161 }
1162
1163 #[test]
1164 fn test_bgzf_writer_flush_single_threaded() -> Result<()> {
1165 let temp_file = NamedTempFile::new()?;
1166 let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1167 let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
1168
1169 writer.write_all(b"test data")?;
1171 let result = writer.flush();
1172 assert!(result.is_ok());
1173
1174 Ok(())
1175 }
1176
1177 #[test]
1178 fn test_bgzf_writer_flush_multithreaded() -> Result<()> {
1179 let temp_file = NamedTempFile::new()?;
1180 let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1181 let worker_count = NonZero::new(2).expect("2 is non-zero");
1182 let compression_level = CompressionLevel::new(6).expect("valid compression level");
1183 let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
1184 worker_count,
1185 output_file,
1186 compression_level,
1187 ));
1188
1189 writer.write_all(b"test data")?;
1191 let result = writer.flush();
1192 assert!(result.is_ok());
1193
1194 Ok(())
1195 }
1196
1197 #[test]
1198 fn test_bgzf_writer_finish_single_threaded() -> Result<()> {
1199 let temp_file = NamedTempFile::new()?;
1200 let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1201 let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
1202
1203 writer.write_all(b"test data")?;
1205
1206 let result = writer.finish();
1208 assert!(result.is_ok());
1209
1210 Ok(())
1211 }
1212
1213 #[test]
1214 fn test_bgzf_writer_finish_multithreaded() -> Result<()> {
1215 let temp_file = NamedTempFile::new()?;
1216 let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1217 let worker_count = NonZero::new(2).expect("2 is non-zero");
1218 let compression_level = CompressionLevel::new(6).expect("valid compression level");
1219 let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
1220 worker_count,
1221 output_file,
1222 compression_level,
1223 ));
1224
1225 writer.write_all(b"test data")?;
1227
1228 let result = writer.finish();
1230 assert!(result.is_ok());
1231
1232 Ok(())
1233 }
1234
1235 #[test]
1236 fn test_bgzf_writer_write_single_threaded() -> Result<()> {
1237 let temp_file = NamedTempFile::new()?;
1238 let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1239 let mut writer = BgzfWriterEnum::SingleThreaded(BgzfWriter::new(output_file));
1240
1241 let bytes_written = writer.write(b"test")?;
1243 assert_eq!(bytes_written, 4);
1244
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn test_bgzf_writer_write_multithreaded() -> Result<()> {
1250 let temp_file = NamedTempFile::new()?;
1251 let output_file: Box<dyn Write + Send> = Box::new(File::create(temp_file.path())?);
1252 let worker_count = NonZero::new(2).expect("2 is non-zero");
1253 let compression_level = CompressionLevel::new(6).expect("valid compression level");
1254 let mut writer = BgzfWriterEnum::MultiThreaded(MultithreadedWriter::with_worker_count(
1255 worker_count,
1256 output_file,
1257 compression_level,
1258 ));
1259
1260 let bytes_written = writer.write(b"test")?;
1262 assert_eq!(bytes_written, 4);
1263
1264 Ok(())
1265 }
1266
1267 #[test]
1268 fn test_is_stdin_path() {
1269 assert!(is_stdin_path("-"));
1271 assert!(is_stdin_path("/dev/stdin"));
1272 assert!(is_stdin_path(Path::new("-")));
1273 assert!(is_stdin_path(Path::new("/dev/stdin")));
1274
1275 assert!(!is_stdin_path("input.bam"));
1277 assert!(!is_stdin_path("/path/to/file.bam"));
1278 assert!(!is_stdin_path(""));
1279 assert!(!is_stdin_path("/dev/null"));
1280 }
1281
1282 #[test]
1283 fn test_create_bam_reader_for_pipeline_from_file() -> Result<()> {
1284 let temp_file = NamedTempFile::new()?;
1285 let header = create_test_header();
1286
1287 {
1289 let _writer = create_bam_writer(temp_file.path(), &header, 1, 6)?;
1290 }
1291
1292 let (mut reader, read_header) = create_bam_reader_for_pipeline(temp_file.path())?;
1294 assert_eq!(read_header.reference_sequences().len(), 1);
1295
1296 let mut buf = [0u8; 16];
1298 let n = reader.read(&mut buf)?;
1299 assert!(n > 0, "Should read some bytes from the file");
1300
1301 Ok(())
1302 }
1303
1304 #[test]
1305 fn test_create_bam_reader_for_pipeline_nonexistent_file() {
1306 let result = create_bam_reader_for_pipeline("/nonexistent/file.bam");
1307 assert!(result.is_err());
1308 if let Err(e) = result {
1309 let err_msg = e.to_string();
1310 assert!(err_msg.contains("Failed to open input BAM"));
1311 }
1312 }
1313
1314 #[test]
1315 fn test_chained_reader() {
1316 let buffer = vec![1, 2, 3, 4, 5];
1317 let remaining = io::Cursor::new(vec![6, 7, 8, 9, 10]);
1318 let mut chained = ChainedReader::new(buffer, remaining);
1319
1320 let mut result = Vec::new();
1321 chained.read_to_end(&mut result).unwrap();
1322
1323 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1324 }
1325
1326 #[test]
1327 fn test_chained_reader_empty_buffer() {
1328 let buffer = vec![];
1329 let remaining = io::Cursor::new(vec![1, 2, 3]);
1330 let mut chained = ChainedReader::new(buffer, remaining);
1331
1332 let mut result = Vec::new();
1333 chained.read_to_end(&mut result).unwrap();
1334
1335 assert_eq!(result, vec![1, 2, 3]);
1336 }
1337
1338 #[test]
1339 fn test_chained_reader_empty_remaining() {
1340 let buffer = vec![1, 2, 3];
1341 let remaining = io::Cursor::new(vec![]);
1342 let mut chained = ChainedReader::new(buffer, remaining);
1343
1344 let mut result = Vec::new();
1345 chained.read_to_end(&mut result).unwrap();
1346
1347 assert_eq!(result, vec![1, 2, 3]);
1348 }
1349
1350 #[allow(clippy::cast_possible_truncation)]
1356 fn create_test_bam_record(ref_id: i32, pos: i32, read_name: &[u8]) -> Vec<u8> {
1357 let name_with_null = read_name.len() + 1;
1367 let padding = (4 - (name_with_null % 4)) % 4;
1368 let l_read_name = (name_with_null + padding) as u8;
1369
1370 let mapq: u8 = 60;
1371 let bin: u16 = 4681; let n_cigar_op: u16 = 1;
1373 let flag: u16 = 0; let l_seq: u32 = 10;
1375 let next_ref_id: i32 = -1;
1376 let next_pos: i32 = -1;
1377 let tlen: i32 = 0;
1378
1379 let mut record = Vec::new();
1380
1381 record.extend_from_slice(&ref_id.to_le_bytes());
1383 record.extend_from_slice(&pos.to_le_bytes());
1384 record.push(l_read_name);
1385 record.push(mapq);
1386 record.extend_from_slice(&bin.to_le_bytes());
1387 record.extend_from_slice(&n_cigar_op.to_le_bytes());
1388 record.extend_from_slice(&flag.to_le_bytes());
1389 record.extend_from_slice(&l_seq.to_le_bytes());
1390 record.extend_from_slice(&next_ref_id.to_le_bytes());
1391 record.extend_from_slice(&next_pos.to_le_bytes());
1392 record.extend_from_slice(&tlen.to_le_bytes());
1393
1394 record.extend_from_slice(read_name);
1396 record.push(0); record.extend(std::iter::repeat_n(0u8, padding));
1398
1399 let cigar_op: u32 = 10 << 4; record.extend_from_slice(&cigar_op.to_le_bytes());
1402
1403 record.extend_from_slice(&[0x11, 0x11, 0x11, 0x11, 0x11]);
1406
1407 record.extend_from_slice(&[30u8; 10]);
1409
1410 record
1411 }
1412
1413 #[test]
1414 fn test_create_indexing_bam_writer() -> Result<()> {
1415 let temp_file = NamedTempFile::new()?;
1416 let header = create_test_header();
1417
1418 let writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2);
1419 assert!(writer.is_ok());
1420
1421 let writer = writer.unwrap();
1423 let index = writer.finish();
1424 assert!(index.is_ok());
1425
1426 Ok(())
1427 }
1428
1429 #[test]
1430 fn test_indexing_bam_writer_with_records() -> Result<()> {
1431 let temp_file = NamedTempFile::new()?;
1432 let header = create_test_header();
1433
1434 let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
1435
1436 for i in 0..100 {
1438 let record = create_test_bam_record(0, i * 100, format!("read{i}").as_bytes());
1439 writer.write_raw_record(&record)?;
1440 }
1441
1442 let index = writer.finish()?;
1443
1444 assert!(!index.reference_sequences().is_empty());
1446
1447 Ok(())
1448 }
1449
1450 #[test]
1451 fn test_indexing_bam_writer_produces_valid_bam() -> Result<()> {
1452 let temp_file = NamedTempFile::new()?;
1453 let header = create_test_header();
1454
1455 {
1457 let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
1458
1459 for i in 0..10 {
1460 let record = create_test_bam_record(0, i * 100, format!("read{i}").as_bytes());
1461 writer.write_raw_record(&record)?;
1462 }
1463
1464 let _index = writer.finish()?;
1465 }
1466
1467 let (mut reader, read_header) = create_bam_reader(temp_file.path(), 1)?;
1469 assert_eq!(read_header.reference_sequences().len(), 1);
1470
1471 let records: Vec<_> = reader.records().collect();
1472 assert_eq!(records.len(), 10);
1473
1474 Ok(())
1475 }
1476
1477 #[test]
1478 fn test_indexing_bam_writer_multithreaded() -> Result<()> {
1479 let temp_file = NamedTempFile::new()?;
1480 let header = create_test_header();
1481
1482 let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 4)?;
1483
1484 for i in 0..1000 {
1486 let record = create_test_bam_record(0, i * 10, format!("read{i:04}").as_bytes());
1487 writer.write_raw_record(&record)?;
1488 }
1489
1490 let index = writer.finish()?;
1491
1492 assert!(!index.reference_sequences().is_empty());
1494
1495 Ok(())
1496 }
1497
1498 #[test]
1499 fn test_indexing_bam_writer_unmapped_records() -> Result<()> {
1500 let temp_file = NamedTempFile::new()?;
1501 let header = create_test_header();
1502
1503 let mut writer = create_indexing_bam_writer(temp_file.path(), &header, 6, 2)?;
1504
1505 let record = create_test_bam_record(-1, -1, b"unmapped_read");
1507 writer.write_raw_record(&record)?;
1508
1509 let record = create_test_bam_record(0, 100, b"mapped_read");
1511 writer.write_raw_record(&record)?;
1512
1513 let index = writer.finish()?;
1514 assert!(!index.reference_sequences().is_empty());
1515
1516 Ok(())
1517 }
1518
1519 #[test]
1520 fn test_write_raw_bytes() -> Result<()> {
1521 let temp = tempfile::NamedTempFile::new()?;
1522 let header = noodles::sam::Header::default();
1523 let mut writer = create_raw_bam_writer(temp.path(), &header, 1, 1)?;
1524
1525 let record_bytes = vec![1, 2, 3, 4, 5];
1527 let mut formatted = Vec::new();
1528 #[allow(clippy::cast_possible_truncation)]
1529 let len = record_bytes.len() as u32;
1530 formatted.extend_from_slice(&len.to_le_bytes());
1531 formatted.extend_from_slice(&record_bytes);
1532
1533 writer.write_raw_bytes(&formatted)?;
1534 writer.finish()?;
1535
1536 assert!(temp.path().metadata()?.len() > 0);
1538 Ok(())
1539 }
1540
1541 #[test]
1542 fn test_is_stdout_path() {
1543 assert!(is_stdout_path("-"));
1544 assert!(is_stdout_path("/dev/stdout"));
1545 assert!(is_stdout_path(Path::new("-")));
1546 assert!(is_stdout_path(Path::new("/dev/stdout")));
1547
1548 assert!(!is_stdout_path("output.bam"));
1549 assert!(!is_stdout_path("/path/to/file.bam"));
1550 assert!(!is_stdout_path(""));
1551 assert!(!is_stdout_path("/dev/null"));
1552 }
1553
1554 #[test]
1555 fn test_indexing_writer_rejects_stdout() {
1556 let header = Header::default();
1557 let result = create_indexing_bam_writer("-", &header, 6, 2);
1558 assert!(result.is_err());
1559 let err = result.err().unwrap();
1560 assert!(err.to_string().contains("stdout"));
1561 }
1562}