1use crate::bam_io::create_raw_bam_reader;
21use crate::sort::inline_buffer::{RecordBuffer, TemplateKey, TemplateRecordBuffer};
22use crate::sort::keys::SortOrder;
23use crate::sort::radix::{heap_make, heap_sift_down};
24use crate::sort::read_ahead::RawReadAheadReader;
25use anyhow::Result;
26use bstr::BString;
27use crossbeam_channel::{Receiver, bounded};
28use log::info;
29use noodles::sam::Header;
30use noodles::sam::header::record::value::Map;
31use noodles::sam::header::record::value::map::header::tag as header_tag;
32use noodles::sam::header::record::value::map::read_group::tag as rg_tag;
33use noodles_bgzf::io::{
34 MultithreadedWriter, Reader as BgzfReader, Writer as BgzfWriter, multithreaded_writer,
35 writer::CompressionLevel,
36};
37use std::cmp::Ordering;
38use std::collections::HashMap;
39use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
40use std::num::NonZero;
41use std::path::{Path, PathBuf};
42use std::thread::{self, JoinHandle};
43use tempfile::TempDir;
44
45pub struct LibraryLookup {
54 rg_to_ordinal: HashMap<Vec<u8>, u32>,
56 hasher: ahash::RandomState,
58}
59
60impl LibraryLookup {
61 #[must_use]
63 #[allow(clippy::cast_possible_truncation)]
64 pub fn from_header(header: &Header) -> Self {
65 let mut libraries: Vec<String> = header
67 .read_groups()
68 .iter()
69 .filter_map(|(_, rg)| {
70 rg.other_fields().get(&rg_tag::LIBRARY).map(std::string::ToString::to_string)
71 })
72 .collect();
73
74 libraries.sort();
76 libraries.dedup();
77
78 let mut lib_to_ordinal: HashMap<String, u32> = HashMap::new();
81 lib_to_ordinal.insert(String::new(), 0);
82 for (i, lib) in libraries.iter().enumerate() {
83 lib_to_ordinal.insert(lib.clone(), (i + 1) as u32);
84 }
85
86 let rg_to_ordinal: HashMap<Vec<u8>, u32> = header
88 .read_groups()
89 .iter()
90 .map(|(id, rg)| {
91 let lib = rg
92 .other_fields()
93 .get(&rg_tag::LIBRARY)
94 .map(std::string::ToString::to_string)
95 .unwrap_or_default();
96 let ordinal = *lib_to_ordinal.get(&lib).unwrap_or(&0);
97 (id.to_vec(), ordinal)
98 })
99 .collect();
100
101 let hasher = ahash::RandomState::with_seeds(
102 0x517c_c1b7_2722_0a95,
103 0x1234_5678_90ab_cdef,
104 0xfedc_ba98_7654_3210,
105 0x0123_4567_89ab_cdef,
106 );
107
108 Self { rg_to_ordinal, hasher }
109 }
110
111 #[inline]
113 #[must_use]
114 pub fn hash_name(&self, name: &[u8]) -> u64 {
115 self.hasher.hash_one(name)
116 }
117
118 #[inline]
120 #[must_use]
121 pub fn get_ordinal(&self, bam: &[u8]) -> u32 {
122 fgumi_raw_bam::tags::find_string_tag_in_record(bam, b"RG")
123 .and_then(|rg| self.rg_to_ordinal.get(rg))
124 .copied()
125 .unwrap_or(0)
126 }
127}
128
129const MERGE_PREFETCH_SIZE: usize = 1024;
132
133const DEFAULT_MAX_TEMP_FILES: usize = 64;
136
137use crate::sort::keys::RawSortKey;
145use std::marker::PhantomData;
146
147enum ChunkWriterInner {
149 Raw(BufWriter<std::fs::File>),
151 SingleThreaded(BgzfWriter<BufWriter<std::fs::File>>),
153 MultiThreaded(MultithreadedWriter<BufWriter<std::fs::File>>),
155}
156
157impl Write for ChunkWriterInner {
158 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
159 match self {
160 ChunkWriterInner::Raw(w) => w.write(buf),
161 ChunkWriterInner::SingleThreaded(w) => w.write(buf),
162 ChunkWriterInner::MultiThreaded(w) => w.write(buf),
163 }
164 }
165
166 fn flush(&mut self) -> std::io::Result<()> {
167 match self {
168 ChunkWriterInner::Raw(w) => w.flush(),
169 ChunkWriterInner::SingleThreaded(w) => w.flush(),
170 ChunkWriterInner::MultiThreaded(w) => w.flush(),
171 }
172 }
173}
174
175impl ChunkWriterInner {
176 fn finish(self) -> Result<()> {
177 match self {
178 ChunkWriterInner::Raw(mut w) => {
179 w.flush()?;
180 Ok(())
181 }
182 ChunkWriterInner::SingleThreaded(w) => {
183 w.finish()?;
184 Ok(())
185 }
186 ChunkWriterInner::MultiThreaded(mut w) => {
187 w.finish()?;
188 Ok(())
189 }
190 }
191 }
192}
193
194pub struct GenericKeyedChunkWriter<K: RawSortKey> {
199 writer: ChunkWriterInner,
200 _marker: PhantomData<K>,
201}
202
203impl<K: RawSortKey> GenericKeyedChunkWriter<K> {
204 pub fn create(path: &Path, compression_level: u32, threads: usize) -> Result<Self> {
218 let file = std::fs::File::create(path)?;
219 let buf = BufWriter::with_capacity(256 * 1024, file);
220
221 let writer = if compression_level == 0 {
222 ChunkWriterInner::Raw(buf)
223 } else if threads > 1 {
224 let worker_count = NonZero::new(threads).expect("threads > 1");
226 let mut builder =
227 multithreaded_writer::Builder::default().set_worker_count(worker_count);
228 #[allow(clippy::cast_possible_truncation)]
229 if let Some(level) = CompressionLevel::new(compression_level as u8) {
230 builder = builder.set_compression_level(level);
231 }
232 ChunkWriterInner::MultiThreaded(builder.build_from_writer(buf))
233 } else {
234 #[allow(clippy::cast_possible_truncation)]
236 let level = CompressionLevel::new(compression_level as u8)
237 .unwrap_or_else(|| CompressionLevel::new(6).unwrap());
238 let writer = noodles_bgzf::io::writer::Builder::default()
239 .set_compression_level(level)
240 .build_from_writer(buf);
241 ChunkWriterInner::SingleThreaded(writer)
242 };
243
244 Ok(Self { writer, _marker: PhantomData })
245 }
246
247 #[inline]
253 #[allow(clippy::cast_possible_truncation)]
254 pub fn write_record(&mut self, key: &K, record: &[u8]) -> Result<()> {
255 key.write_to(&mut self.writer)?;
256 self.writer.write_all(&(record.len() as u32).to_le_bytes())?;
257 self.writer.write_all(record)?;
258 Ok(())
259 }
260
261 pub fn finish(self) -> Result<()> {
267 self.writer.finish()
268 }
269}
270
271pub struct GenericKeyedChunkReader<K: RawSortKey + 'static> {
276 receiver: Receiver<Option<(K, Vec<u8>)>>,
277 _handle: JoinHandle<()>,
278}
279
280impl<K: RawSortKey + 'static> GenericKeyedChunkReader<K> {
281 pub fn open(path: &Path) -> Result<Self> {
288 let (tx, rx) = bounded(MERGE_PREFETCH_SIZE);
289 let path = path.to_path_buf();
290
291 let handle = thread::spawn(move || {
292 let file = match std::fs::File::open(&path) {
293 Ok(f) => f,
294 Err(e) => {
295 log::error!("Failed to open keyed chunk {}: {}", path.display(), e);
296 let _ = tx.send(None);
297 return;
298 }
299 };
300 let mut buf_reader = BufReader::with_capacity(256 * 1024, file);
301
302 let mut magic = [0u8; 2];
304 let is_compressed = if buf_reader.read_exact(&mut magic).is_ok() {
305 magic == [0x1f, 0x8b]
306 } else {
307 false
308 };
309
310 if buf_reader.seek(SeekFrom::Start(0)).is_err() {
312 log::error!("Failed to seek in keyed chunk {}", path.display());
313 let _ = tx.send(None);
314 return;
315 }
316
317 if is_compressed {
319 let bgzf_reader = BgzfReader::new(buf_reader);
320 Self::read_records(bgzf_reader, tx);
321 } else {
322 Self::read_records(buf_reader, tx);
323 }
324 });
325
326 Ok(Self { receiver: rx, _handle: handle })
327 }
328
329 #[allow(clippy::needless_pass_by_value)]
331 fn read_records<R: Read>(mut reader: R, tx: crossbeam_channel::Sender<Option<(K, Vec<u8>)>>) {
332 loop {
333 let key = match K::read_from(&mut reader) {
335 Ok(k) => k,
336 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
337 let _ = tx.send(None);
339 break;
340 }
341 Err(e) => {
342 log::error!("Error reading keyed chunk key: {e}");
343 let _ = tx.send(None);
344 break;
345 }
346 };
347
348 let mut len_buf = [0u8; 4];
350 if reader.read_exact(&mut len_buf).is_err() {
351 log::error!("Error reading keyed chunk length");
352 let _ = tx.send(None);
353 break;
354 }
355 let len = u32::from_le_bytes(len_buf) as usize;
356
357 let mut record = vec![0u8; len];
359 if reader.read_exact(&mut record).is_err() {
360 log::error!("Error reading keyed chunk record");
361 let _ = tx.send(None);
362 break;
363 }
364
365 if tx.send(Some((key, record))).is_err() {
366 break; }
368 }
369 }
370
371 pub fn next_record(&mut self) -> Option<(K, Vec<u8>)> {
373 match self.receiver.recv() {
374 Ok(Some(entry)) => Some(entry),
375 Ok(None) | Err(_) => None,
376 }
377 }
378}
379
380pub struct RawExternalSorter {
386 sort_order: SortOrder,
388 memory_limit: usize,
390 temp_dir: Option<PathBuf>,
392 threads: usize,
394 output_compression: u32,
396 temp_compression: u32,
398 write_index: bool,
400 pg_info: Option<(String, String)>,
402 max_temp_files: usize,
404}
405
406impl RawExternalSorter {
407 #[must_use]
409 pub fn new(sort_order: SortOrder) -> Self {
410 Self {
411 sort_order,
412 memory_limit: 512 * 1024 * 1024, temp_dir: None,
414 threads: 1,
415 output_compression: 6,
416 temp_compression: 1, write_index: false,
418 pg_info: None,
419 max_temp_files: DEFAULT_MAX_TEMP_FILES,
420 }
421 }
422
423 #[must_use]
425 pub fn memory_limit(mut self, limit: usize) -> Self {
426 self.memory_limit = limit;
427 self
428 }
429
430 #[must_use]
432 pub fn temp_dir(mut self, path: PathBuf) -> Self {
433 self.temp_dir = Some(path);
434 self
435 }
436
437 #[must_use]
439 pub fn threads(mut self, threads: usize) -> Self {
440 self.threads = threads;
441 self
442 }
443
444 #[must_use]
446 pub fn output_compression(mut self, level: u32) -> Self {
447 self.output_compression = level;
448 self
449 }
450
451 #[must_use]
457 pub fn temp_compression(mut self, level: u32) -> Self {
458 self.temp_compression = level;
459 self
460 }
461
462 #[must_use]
468 pub fn write_index(mut self, enabled: bool) -> Self {
469 self.write_index = enabled;
470 self
471 }
472
473 #[must_use]
475 pub fn pg_info(mut self, version: String, command_line: String) -> Self {
476 self.pg_info = Some((version, command_line));
477 self
478 }
479
480 #[must_use]
486 pub fn max_temp_files(mut self, max: usize) -> Self {
487 self.max_temp_files = max;
488 self
489 }
490
491 fn maybe_consolidate_temp_files<K: RawSortKey + Default + 'static>(
496 &self,
497 chunk_files: &mut Vec<PathBuf>,
498 temp_path: &Path,
499 consolidation_count: &mut usize,
500 ) -> Result<()> {
501 struct HeapEntry<K> {
502 key: K,
503 record: Vec<u8>,
504 reader_idx: usize,
505 }
506
507 if self.max_temp_files == 0 || chunk_files.len() < self.max_temp_files {
508 return Ok(());
509 }
510
511 if self.max_temp_files < 2 {
513 return Ok(());
514 }
515
516 let merge_count = (self.max_temp_files / 2).max(2).min(chunk_files.len());
518 let files_to_merge: Vec<PathBuf> = chunk_files.drain(..merge_count).collect();
519
520 info!(
521 "Consolidating {} temp files into 1 (total was {})...",
522 merge_count,
523 merge_count + chunk_files.len()
524 );
525
526 let merged_path = temp_path.join(format!("merged_{:04}.keyed", *consolidation_count));
528 *consolidation_count += 1;
529
530 let mut readers: Vec<GenericKeyedChunkReader<K>> = files_to_merge
532 .iter()
533 .map(|p| GenericKeyedChunkReader::<K>::open(p))
534 .collect::<Result<Vec<_>>>()?;
535
536 let mut writer = GenericKeyedChunkWriter::<K>::create(
538 &merged_path,
539 self.temp_compression,
540 self.threads,
541 )?;
542
543 let mut heap: Vec<HeapEntry<K>> = Vec::with_capacity(readers.len());
545 for (reader_idx, reader) in readers.iter_mut().enumerate() {
546 if let Some((key, record)) = reader.next_record() {
547 heap.push(HeapEntry { key, record, reader_idx });
548 }
549 }
550
551 let mut heap_size = heap.len();
552 if heap_size == 0 {
553 writer.finish()?;
554 chunk_files.insert(0, merged_path);
556 for path in &files_to_merge {
558 let _ = std::fs::remove_file(path);
559 }
560 return Ok(());
561 }
562
563 let lt = |a: &HeapEntry<K>, b: &HeapEntry<K>| -> bool {
565 a.key.cmp(&b.key).then_with(|| a.reader_idx.cmp(&b.reader_idx)) == Ordering::Greater
566 };
567
568 heap_make(&mut heap, <);
569
570 while heap_size > 0 {
572 let reader_idx = heap[0].reader_idx;
573 let key = std::mem::take(&mut heap[0].key);
574 let record = std::mem::take(&mut heap[0].record);
575
576 writer.write_record(&key, &record)?;
577
578 if let Some((next_key, next_record)) = readers[reader_idx].next_record() {
579 heap[0].key = next_key;
580 heap[0].record = next_record;
581 heap_sift_down(&mut heap, 0, heap_size, <);
582 } else {
583 heap_size -= 1;
584 if heap_size > 0 {
585 heap.swap(0, heap_size);
586 heap_sift_down(&mut heap, 0, heap_size, <);
587 }
588 }
589 }
590
591 writer.finish()?;
592
593 chunk_files.insert(0, merged_path);
596
597 for path in &files_to_merge {
599 let _ = std::fs::remove_file(path);
600 }
601
602 info!("Consolidation complete, {} temp files remain", chunk_files.len());
603
604 Ok(())
605 }
606
607 pub fn sort(&self, input: &Path, output: &Path) -> Result<RawSortStats> {
613 info!("Starting raw-bytes sort with order: {:?}", self.sort_order);
614 info!("Memory limit: {} MB", self.memory_limit / (1024 * 1024));
615 info!("Threads: {}", self.threads);
616
617 let (reader, header) = create_raw_bam_reader(input, self.threads)?;
619
620 let header = if let Some((ref version, ref command_line)) = self.pg_info {
622 crate::header::add_pg_record(header, version, command_line)?
623 } else {
624 header
625 };
626
627 let temp_dir = self.create_temp_dir()?;
629 let temp_path = temp_dir.path();
630
631 match self.sort_order {
633 SortOrder::Coordinate => self.sort_coordinate(reader, &header, output, temp_path),
634 SortOrder::Queryname => self.sort_queryname(reader, &header, output, temp_path),
635 SortOrder::TemplateCoordinate => {
636 self.sort_template_coordinate(reader, &header, output, temp_path)
637 }
638 }
639 }
640
641 fn sort_coordinate(
643 &self,
644 reader: crate::bam_io::RawBamReaderAuto,
645 header: &Header,
646 output: &Path,
647 temp_path: &Path,
648 ) -> Result<RawSortStats> {
649 if self.write_index {
650 self.sort_coordinate_with_index(reader, header, output, temp_path)
651 } else {
652 self.sort_coordinate_optimized(reader, header, output, temp_path)
653 }
654 }
655
656 #[allow(clippy::cast_possible_truncation)]
661 fn sort_coordinate_optimized(
662 &self,
663 reader: crate::bam_io::RawBamReaderAuto,
664 header: &Header,
665 output: &Path,
666 temp_path: &Path,
667 ) -> Result<RawSortStats> {
668 use crate::sort::keys::RawCoordinateKey;
669
670 let mut stats = RawSortStats::default();
671
672 let nref = header.reference_sequences().len() as u32;
674
675 let estimated_records = self.memory_limit / 200;
677
678 let mut chunk_files: Vec<PathBuf> = Vec::new();
679 let mut buffer = RecordBuffer::with_capacity(estimated_records, self.memory_limit, nref);
680 let mut consolidation_count = 0usize;
681
682 let read_ahead = RawReadAheadReader::new(reader);
683
684 info!("Phase 1: Reading and sorting chunks (inline buffer, keyed output)...");
685
686 for record in read_ahead {
687 stats.total_records += 1;
688
689 buffer.push_coordinate(record.as_ref());
691
692 if buffer.memory_usage() >= self.memory_limit {
694 let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
695
696 if self.threads > 1 {
698 buffer.par_sort();
699 } else {
700 buffer.sort();
701 }
702
703 let mut writer = GenericKeyedChunkWriter::<RawCoordinateKey>::create(
705 &chunk_path,
706 self.temp_compression,
707 self.threads,
708 )?;
709 for r in buffer.refs() {
710 let key = RawCoordinateKey { sort_key: r.sort_key };
711 let record_bytes = buffer.get_record(r);
712 writer.write_record(&key, record_bytes)?;
713 }
714 writer.finish()?;
715
716 stats.chunks_written += 1;
717 chunk_files.push(chunk_path);
718
719 self.maybe_consolidate_temp_files::<RawCoordinateKey>(
721 &mut chunk_files,
722 temp_path,
723 &mut consolidation_count,
724 )?;
725
726 buffer.clear();
727 }
728 }
729
730 info!("Read {} records total", stats.total_records);
731
732 if chunk_files.is_empty() {
733 info!("All records fit in memory, performing in-memory sort");
735
736 if self.threads > 1 {
737 buffer.par_sort();
738 } else {
739 buffer.sort();
740 }
741
742 let output_header = self.create_output_header(header);
743 let mut writer = crate::bam_io::create_raw_bam_writer(
744 output,
745 &output_header,
746 self.threads,
747 self.output_compression,
748 )?;
749
750 for record_bytes in buffer.iter_sorted() {
751 writer.write_raw_record(record_bytes)?;
752 }
753 writer.finish()?;
754 } else {
755 if self.threads > 1 {
757 buffer.par_sort();
758 } else {
759 buffer.sort();
760 }
761
762 let memory_keyed: Vec<(RawCoordinateKey, Vec<u8>)> = if buffer.is_empty() {
764 Vec::new()
765 } else {
766 buffer
767 .refs()
768 .iter()
769 .map(|r| {
770 let key = RawCoordinateKey { sort_key: r.sort_key };
771 let record_bytes = buffer.get_record(r).to_vec();
772 (key, record_bytes)
773 })
774 .collect()
775 };
776
777 info!(
778 "Phase 2: Merging {} chunks (keyed O(1) comparisons)...",
779 chunk_files.len() + usize::from(!memory_keyed.is_empty())
780 );
781
782 self.merge_chunks_generic::<RawCoordinateKey>(
784 &chunk_files,
785 memory_keyed,
786 header,
787 output,
788 )?;
789 }
790
791 stats.output_records = stats.total_records;
792 info!("Sort complete: {} records processed", stats.total_records);
793
794 Ok(stats)
795 }
796
797 #[allow(clippy::cast_possible_truncation)]
803 fn sort_coordinate_with_index(
804 &self,
805 reader: crate::bam_io::RawBamReaderAuto,
806 header: &Header,
807 output: &Path,
808 temp_path: &Path,
809 ) -> Result<RawSortStats> {
810 use crate::bam_io::{create_indexing_bam_writer, write_bai_index};
811 use crate::sort::keys::RawCoordinateKey;
812
813 info!("Indexing enabled: will write BAM index alongside output");
814
815 let mut stats = RawSortStats::default();
816 let nref = header.reference_sequences().len() as u32;
817 let estimated_records = self.memory_limit / 200;
818
819 let mut chunk_files: Vec<PathBuf> = Vec::new();
820 let mut buffer = RecordBuffer::with_capacity(estimated_records, self.memory_limit, nref);
821 let mut consolidation_count = 0usize;
822 let read_ahead = RawReadAheadReader::new(reader);
823
824 info!("Phase 1: Reading and sorting chunks (inline buffer, keyed output)...");
825
826 for record in read_ahead {
827 stats.total_records += 1;
828 buffer.push_coordinate(record.as_ref());
829
830 if buffer.memory_usage() >= self.memory_limit {
831 let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
832
833 if self.threads > 1 {
834 buffer.par_sort();
835 } else {
836 buffer.sort();
837 }
838
839 let mut writer = GenericKeyedChunkWriter::<RawCoordinateKey>::create(
840 &chunk_path,
841 self.temp_compression,
842 self.threads,
843 )?;
844 for r in buffer.refs() {
845 let key = RawCoordinateKey { sort_key: r.sort_key };
846 let record_bytes = buffer.get_record(r);
847 writer.write_record(&key, record_bytes)?;
848 }
849 writer.finish()?;
850
851 stats.chunks_written += 1;
852 chunk_files.push(chunk_path);
853
854 self.maybe_consolidate_temp_files::<RawCoordinateKey>(
856 &mut chunk_files,
857 temp_path,
858 &mut consolidation_count,
859 )?;
860
861 buffer.clear();
862 }
863 }
864
865 info!("Read {} records total", stats.total_records);
866
867 let output_header = self.create_output_header(header);
868
869 if chunk_files.is_empty() {
870 info!("All records fit in memory, performing in-memory sort");
872
873 if self.threads > 1 {
874 buffer.par_sort();
875 } else {
876 buffer.sort();
877 }
878
879 let mut writer = create_indexing_bam_writer(
881 output,
882 &output_header,
883 self.output_compression,
884 self.threads,
885 )?;
886
887 for record_bytes in buffer.iter_sorted() {
888 writer.write_raw_record(record_bytes)?;
889 }
890
891 let index = writer.finish()?;
892
893 let index_path = output.with_extension("bam.bai");
895 write_bai_index(&index_path, &index)?;
896 info!("Wrote BAM index: {}", index_path.display());
897 } else {
898 if self.threads > 1 {
900 buffer.par_sort();
901 } else {
902 buffer.sort();
903 }
904
905 let memory_keyed: Vec<(RawCoordinateKey, Vec<u8>)> = if buffer.is_empty() {
906 Vec::new()
907 } else {
908 buffer
909 .refs()
910 .iter()
911 .map(|r| {
912 let key = RawCoordinateKey { sort_key: r.sort_key };
913 let record_bytes = buffer.get_record(r).to_vec();
914 (key, record_bytes)
915 })
916 .collect()
917 };
918
919 info!(
920 "Phase 2: Merging {} chunks with index generation...",
921 chunk_files.len() + usize::from(!memory_keyed.is_empty())
922 );
923
924 let index = self.merge_chunks_with_index::<RawCoordinateKey>(
926 &chunk_files,
927 memory_keyed,
928 header,
929 output,
930 )?;
931
932 let index_path = output.with_extension("bam.bai");
934 write_bai_index(&index_path, &index)?;
935 info!("Wrote BAM index: {}", index_path.display());
936 }
937
938 stats.output_records = stats.total_records;
939 info!("Sort complete: {} records processed", stats.total_records);
940
941 Ok(stats)
942 }
943
944 fn sort_queryname(
949 &self,
950 reader: crate::bam_io::RawBamReaderAuto,
951 header: &Header,
952 output: &Path,
953 temp_path: &Path,
954 ) -> Result<RawSortStats> {
955 use crate::sort::keys::{RawQuerynameKey, SortContext};
956
957 let mut stats = RawSortStats::default();
958 let ctx = SortContext::from_header(header);
959
960 let estimated_records = self.memory_limit / 300;
962
963 let mut chunk_files: Vec<PathBuf> = Vec::new();
964 let mut entries: Vec<(RawQuerynameKey, Vec<u8>)> = Vec::with_capacity(estimated_records);
965 let mut memory_used = 0usize;
966 let mut consolidation_count = 0usize;
967
968 let read_ahead = RawReadAheadReader::new(reader);
969
970 info!("Phase 1: Reading and sorting chunks (keyed output)...");
971
972 for record in read_ahead {
973 stats.total_records += 1;
974
975 let bam_bytes = record.as_ref();
977 let key = RawQuerynameKey::extract(bam_bytes, &ctx);
978
979 let record_size = bam_bytes.len() + key.name.len() + 2;
981 memory_used += record_size;
982
983 entries.push((key, bam_bytes.to_vec()));
984
985 if memory_used >= self.memory_limit {
987 let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
988
989 if self.threads > 1 {
991 use rayon::prelude::*;
992 entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
993 } else {
994 entries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
995 }
996
997 let mut writer = GenericKeyedChunkWriter::<RawQuerynameKey>::create(
999 &chunk_path,
1000 self.temp_compression,
1001 self.threads,
1002 )?;
1003 for (key, record) in entries.drain(..) {
1004 writer.write_record(&key, &record)?;
1005 }
1006 writer.finish()?;
1007
1008 stats.chunks_written += 1;
1009 chunk_files.push(chunk_path);
1010
1011 self.maybe_consolidate_temp_files::<RawQuerynameKey>(
1013 &mut chunk_files,
1014 temp_path,
1015 &mut consolidation_count,
1016 )?;
1017
1018 memory_used = 0;
1019 }
1020 }
1021
1022 info!("Read {} records total", stats.total_records);
1023
1024 if chunk_files.is_empty() {
1025 info!("All records fit in memory, performing in-memory sort");
1027
1028 if self.threads > 1 {
1029 use rayon::prelude::*;
1030 entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1031 } else {
1032 entries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
1033 }
1034
1035 let output_header = self.create_output_header(header);
1036 let mut writer = crate::bam_io::create_raw_bam_writer(
1037 output,
1038 &output_header,
1039 self.threads,
1040 self.output_compression,
1041 )?;
1042
1043 for (_key, record) in entries {
1044 writer.write_raw_record(&record)?;
1045 }
1046 writer.finish()?;
1047 } else {
1048 if self.threads > 1 {
1050 use rayon::prelude::*;
1051 entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1052 } else {
1053 entries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
1054 }
1055
1056 info!(
1057 "Phase 2: Merging {} chunks (keyed comparisons)...",
1058 chunk_files.len() + usize::from(!entries.is_empty())
1059 );
1060
1061 self.merge_chunks_generic::<RawQuerynameKey>(&chunk_files, entries, header, output)?;
1063 }
1064
1065 stats.output_records = stats.total_records;
1066 info!("Sort complete: {} records processed", stats.total_records);
1067
1068 Ok(stats)
1069 }
1070
1071 fn sort_template_coordinate(
1079 &self,
1080 reader: crate::bam_io::RawBamReaderAuto,
1081 header: &Header,
1082 output: &Path,
1083 temp_path: &Path,
1084 ) -> Result<RawSortStats> {
1085 let mut stats = RawSortStats::default();
1086
1087 let lib_lookup = LibraryLookup::from_header(header);
1089
1090 let bytes_per_record = 338;
1097 let estimated_records = self.memory_limit / bytes_per_record;
1098 let estimated_data_bytes = self.memory_limit * 86 / 100;
1100
1101 let mut chunk_files: Vec<PathBuf> = Vec::new();
1102 let mut buffer =
1103 TemplateRecordBuffer::with_capacity(estimated_records, estimated_data_bytes);
1104 let mut consolidation_count = 0usize;
1105
1106 let read_ahead = RawReadAheadReader::new(reader);
1107
1108 info!("Phase 1: Reading and sorting chunks (inline buffer)...");
1109
1110 for record in read_ahead {
1111 stats.total_records += 1;
1112
1113 let bam_bytes = record.as_ref();
1115 let key = extract_template_key_inline(bam_bytes, &lib_lookup);
1116 buffer.push(bam_bytes, key);
1117
1118 if buffer.memory_usage() >= self.memory_limit {
1120 let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
1122
1123 if self.threads > 1 {
1125 buffer.par_sort();
1126 } else {
1127 buffer.sort();
1128 }
1129
1130 let mut keyed_writer = GenericKeyedChunkWriter::<TemplateKey>::create(
1132 &chunk_path,
1133 self.temp_compression,
1134 self.threads,
1135 )?;
1136 for (key, record) in buffer.iter_sorted_keyed() {
1137 keyed_writer.write_record(&key, record)?;
1138 }
1139 keyed_writer.finish()?;
1140
1141 stats.chunks_written += 1;
1142 chunk_files.push(chunk_path);
1143
1144 self.maybe_consolidate_temp_files::<TemplateKey>(
1146 &mut chunk_files,
1147 temp_path,
1148 &mut consolidation_count,
1149 )?;
1150
1151 buffer.clear();
1152 }
1153 }
1154
1155 info!("Read {} records total", stats.total_records);
1156
1157 if chunk_files.is_empty() {
1158 info!("All records fit in memory, performing in-memory sort");
1160
1161 if self.threads > 1 {
1162 buffer.par_sort();
1163 } else {
1164 buffer.sort();
1165 }
1166
1167 let output_header = self.create_output_header(header);
1168 let mut writer = crate::bam_io::create_raw_bam_writer(
1169 output,
1170 &output_header,
1171 self.threads,
1172 self.output_compression,
1173 )?;
1174
1175 for record_bytes in buffer.iter_sorted() {
1176 writer.write_raw_record(record_bytes)?;
1177 }
1178 writer.finish()?;
1179 } else {
1180 if self.threads > 1 {
1182 buffer.par_sort();
1183 } else {
1184 buffer.sort();
1185 }
1186
1187 let memory_keyed: Vec<(TemplateKey, Vec<u8>)> = if buffer.is_empty() {
1189 Vec::new()
1190 } else {
1191 buffer.iter_sorted_keyed().map(|(k, r)| (k, r.to_vec())).collect()
1192 };
1193
1194 info!(
1195 "Phase 2: Merging {} chunks...",
1196 chunk_files.len() + usize::from(!memory_keyed.is_empty())
1197 );
1198
1199 self.merge_chunks_keyed(&chunk_files, memory_keyed, header, output)?;
1201 }
1202
1203 stats.output_records = stats.total_records;
1204 info!("Sort complete: {} records processed", stats.total_records);
1205
1206 Ok(stats)
1207 }
1208
1209 fn merge_chunks_keyed(
1215 &self,
1216 chunk_files: &[PathBuf],
1217 memory_keyed: Vec<(TemplateKey, Vec<u8>)>,
1218 header: &Header,
1219 output: &Path,
1220 ) -> Result<u64> {
1221 enum KeyedChunkSource {
1223 Disk(GenericKeyedChunkReader<TemplateKey>),
1225 Memory { records: Vec<(TemplateKey, Vec<u8>)>, idx: usize },
1227 }
1228
1229 impl KeyedChunkSource {
1230 fn next_record(&mut self) -> Option<(TemplateKey, Vec<u8>)> {
1231 match self {
1232 KeyedChunkSource::Disk(reader) => reader.next_record(),
1233 KeyedChunkSource::Memory { records, idx } => {
1234 if *idx < records.len() {
1235 let dummy = (TemplateKey::zeroed(), Vec::new());
1237 let entry = std::mem::replace(&mut records[*idx], dummy);
1238 *idx += 1;
1239 Some(entry)
1240 } else {
1241 None
1242 }
1243 }
1244 }
1245 }
1246 }
1247
1248 struct KeyedHeapEntry {
1250 key: TemplateKey,
1251 record: Vec<u8>,
1252 chunk_idx: usize,
1253 }
1254
1255 const OUTPUT_BUFFER_SIZE: usize = 2048;
1257
1258 let num_disk = chunk_files.len();
1259 let has_memory = !memory_keyed.is_empty();
1260 info!("Merging from {} files and {} in-memory blocks...", num_disk, i32::from(has_memory));
1261
1262 let output_header = self.create_output_header(header);
1264
1265 let mut sources: Vec<KeyedChunkSource> = Vec::with_capacity(num_disk + 1);
1267
1268 for path in chunk_files {
1270 sources
1271 .push(KeyedChunkSource::Disk(GenericKeyedChunkReader::<TemplateKey>::open(path)?));
1272 }
1273
1274 if has_memory {
1276 sources.push(KeyedChunkSource::Memory { records: memory_keyed, idx: 0 });
1277 }
1278
1279 let mut heap: Vec<KeyedHeapEntry> = Vec::with_capacity(sources.len());
1281 for (chunk_idx, source) in sources.iter_mut().enumerate() {
1282 if let Some((key, record)) = source.next_record() {
1283 heap.push(KeyedHeapEntry { key, record, chunk_idx });
1284 }
1285 }
1286
1287 let mut heap_size = heap.len();
1288 if heap_size == 0 {
1289 info!("Merge complete: 0 records merged");
1290 return Ok(0);
1291 }
1292
1293 let lt = |a: &KeyedHeapEntry, b: &KeyedHeapEntry| -> bool {
1297 a.key.cmp(&b.key).then_with(|| a.chunk_idx.cmp(&b.chunk_idx)) == Ordering::Greater
1298 };
1299
1300 heap_make(&mut heap, <);
1302
1303 let mut writer = crate::bam_io::create_raw_bam_writer(
1305 output,
1306 &output_header,
1307 self.threads,
1308 self.output_compression,
1309 )?;
1310
1311 let mut records_merged = 0u64;
1312 let mut output_buffer: Vec<Vec<u8>> = Vec::with_capacity(OUTPUT_BUFFER_SIZE);
1313
1314 while heap_size > 0 {
1316 let chunk_idx = heap[0].chunk_idx;
1318 let record_bytes = std::mem::take(&mut heap[0].record);
1319
1320 output_buffer.push(record_bytes);
1322 records_merged += 1;
1323
1324 if output_buffer.len() >= OUTPUT_BUFFER_SIZE {
1326 for rec in output_buffer.drain(..) {
1327 writer.write_raw_record(&rec)?;
1328 }
1329 }
1330
1331 if let Some((key, next_record)) = sources[chunk_idx].next_record() {
1333 heap[0].key = key;
1335 heap[0].record = next_record;
1336 heap_sift_down(&mut heap, 0, heap_size, <);
1337 } else {
1338 heap_size -= 1;
1340 if heap_size > 0 {
1341 heap.swap(0, heap_size);
1343 heap_sift_down(&mut heap, 0, heap_size, <);
1344 }
1345 }
1346 }
1347
1348 for rec in output_buffer {
1350 writer.write_raw_record(&rec)?;
1351 }
1352
1353 writer.finish()?;
1354
1355 info!("Merge complete: {records_merged} records merged");
1356 Ok(records_merged)
1357 }
1358
1359 fn merge_chunks_generic<K: RawSortKey + Default + 'static>(
1365 &self,
1366 chunk_files: &[PathBuf],
1367 memory_keyed: Vec<(K, Vec<u8>)>,
1368 header: &Header,
1369 output: &Path,
1370 ) -> Result<u64> {
1371 enum GenericKeyedChunkSource<K: RawSortKey + Default + 'static> {
1373 Disk(GenericKeyedChunkReader<K>),
1375 Memory { records: Vec<(K, Vec<u8>)>, idx: usize },
1377 }
1378
1379 impl<K: RawSortKey + Default + 'static> GenericKeyedChunkSource<K> {
1380 fn next_record(&mut self) -> Option<(K, Vec<u8>)> {
1381 match self {
1382 GenericKeyedChunkSource::Disk(reader) => reader.next_record(),
1383 GenericKeyedChunkSource::Memory { records, idx } => {
1384 if *idx < records.len() {
1385 let entry = std::mem::take(&mut records[*idx]);
1387 *idx += 1;
1388 Some(entry)
1389 } else {
1390 None
1391 }
1392 }
1393 }
1394 }
1395 }
1396
1397 struct GenericKeyedHeapEntry<K> {
1399 key: K,
1400 record: Vec<u8>,
1401 chunk_idx: usize,
1402 }
1403
1404 const OUTPUT_BUFFER_SIZE: usize = 2048;
1406
1407 let num_disk = chunk_files.len();
1408 let has_memory = !memory_keyed.is_empty();
1409 info!("Merging from {} files and {} in-memory blocks...", num_disk, i32::from(has_memory));
1410
1411 let output_header = self.create_output_header(header);
1413
1414 let mut sources: Vec<GenericKeyedChunkSource<K>> = Vec::with_capacity(num_disk + 1);
1416
1417 for path in chunk_files {
1419 sources.push(GenericKeyedChunkSource::Disk(GenericKeyedChunkReader::<K>::open(path)?));
1420 }
1421
1422 if has_memory {
1424 sources.push(GenericKeyedChunkSource::Memory { records: memory_keyed, idx: 0 });
1425 }
1426
1427 let mut heap: Vec<GenericKeyedHeapEntry<K>> = Vec::with_capacity(sources.len());
1429 for (chunk_idx, source) in sources.iter_mut().enumerate() {
1430 if let Some((key, record)) = source.next_record() {
1431 heap.push(GenericKeyedHeapEntry { key, record, chunk_idx });
1432 }
1433 }
1434
1435 let mut heap_size = heap.len();
1436 if heap_size == 0 {
1437 info!("Merge complete: 0 records merged");
1438 return Ok(0);
1439 }
1440
1441 let lt = |a: &GenericKeyedHeapEntry<K>, b: &GenericKeyedHeapEntry<K>| -> bool {
1444 a.key.cmp(&b.key).then_with(|| a.chunk_idx.cmp(&b.chunk_idx)) == Ordering::Greater
1445 };
1446
1447 heap_make(&mut heap, <);
1449
1450 let mut writer = crate::bam_io::create_raw_bam_writer(
1452 output,
1453 &output_header,
1454 self.threads,
1455 self.output_compression,
1456 )?;
1457
1458 let mut records_merged = 0u64;
1459 let mut output_buffer: Vec<Vec<u8>> = Vec::with_capacity(OUTPUT_BUFFER_SIZE);
1460
1461 while heap_size > 0 {
1463 let chunk_idx = heap[0].chunk_idx;
1465 let record_bytes = std::mem::take(&mut heap[0].record);
1466
1467 output_buffer.push(record_bytes);
1469 records_merged += 1;
1470
1471 if output_buffer.len() >= OUTPUT_BUFFER_SIZE {
1473 for rec in output_buffer.drain(..) {
1474 writer.write_raw_record(&rec)?;
1475 }
1476 }
1477
1478 if let Some((key, next_record)) = sources[chunk_idx].next_record() {
1480 heap[0].key = key;
1482 heap[0].record = next_record;
1483 heap_sift_down(&mut heap, 0, heap_size, <);
1484 } else {
1485 heap_size -= 1;
1487 if heap_size > 0 {
1488 heap.swap(0, heap_size);
1490 heap_sift_down(&mut heap, 0, heap_size, <);
1491 }
1492 }
1493 }
1494
1495 for rec in output_buffer {
1497 writer.write_raw_record(&rec)?;
1498 }
1499
1500 writer.finish()?;
1501
1502 info!("Merge complete: {records_merged} records merged");
1503 Ok(records_merged)
1504 }
1505
1506 fn merge_chunks_with_index<K: RawSortKey + Default + 'static>(
1511 &self,
1512 chunk_files: &[PathBuf],
1513 memory_keyed: Vec<(K, Vec<u8>)>,
1514 header: &Header,
1515 output: &Path,
1516 ) -> Result<noodles::bam::bai::Index> {
1517 use crate::bam_io::create_indexing_bam_writer;
1518
1519 enum KeyedSource<K: RawSortKey + Default + 'static> {
1521 Disk(GenericKeyedChunkReader<K>),
1522 Memory { records: Vec<(K, Vec<u8>)>, idx: usize },
1523 }
1524
1525 impl<K: RawSortKey + Default + 'static> KeyedSource<K> {
1526 fn next_record(&mut self) -> Option<(K, Vec<u8>)> {
1527 match self {
1528 KeyedSource::Disk(reader) => reader.next_record(),
1529 KeyedSource::Memory { records, idx } => {
1530 if *idx < records.len() {
1531 let entry = std::mem::take(&mut records[*idx]);
1532 *idx += 1;
1533 Some(entry)
1534 } else {
1535 None
1536 }
1537 }
1538 }
1539 }
1540 }
1541
1542 struct HeapEntry<K> {
1544 key: K,
1545 record: Vec<u8>,
1546 chunk_idx: usize,
1547 }
1548
1549 let num_disk = chunk_files.len();
1550 let has_memory = !memory_keyed.is_empty();
1551 info!("Merging from {} files and {} in-memory blocks...", num_disk, i32::from(has_memory));
1552
1553 let output_header = self.create_output_header(header);
1554
1555 let mut sources: Vec<KeyedSource<K>> = Vec::with_capacity(num_disk + 1);
1557 for path in chunk_files {
1558 sources.push(KeyedSource::Disk(GenericKeyedChunkReader::<K>::open(path)?));
1559 }
1560 if has_memory {
1561 sources.push(KeyedSource::Memory { records: memory_keyed, idx: 0 });
1562 }
1563
1564 let mut heap: Vec<HeapEntry<K>> = Vec::with_capacity(sources.len());
1566 for (chunk_idx, source) in sources.iter_mut().enumerate() {
1567 if let Some((key, record)) = source.next_record() {
1568 heap.push(HeapEntry { key, record, chunk_idx });
1569 }
1570 }
1571
1572 let mut heap_size = heap.len();
1573 if heap_size == 0 {
1574 let writer = create_indexing_bam_writer(
1576 output,
1577 &output_header,
1578 self.output_compression,
1579 self.threads,
1580 )?;
1581 let index = writer.finish()?;
1582 info!("Merge complete: 0 records merged");
1583 return Ok(index);
1584 }
1585
1586 let lt = |a: &HeapEntry<K>, b: &HeapEntry<K>| -> bool {
1588 a.key.cmp(&b.key).then_with(|| a.chunk_idx.cmp(&b.chunk_idx)) == Ordering::Greater
1589 };
1590 heap_make(&mut heap, <);
1591
1592 let mut writer = create_indexing_bam_writer(
1594 output,
1595 &output_header,
1596 self.output_compression,
1597 self.threads,
1598 )?;
1599 let mut records_merged = 0u64;
1600
1601 while heap_size > 0 {
1604 let chunk_idx = heap[0].chunk_idx;
1605 let record_bytes = std::mem::take(&mut heap[0].record);
1606
1607 writer.write_raw_record(&record_bytes)?;
1608 records_merged += 1;
1609
1610 if let Some((key, next_record)) = sources[chunk_idx].next_record() {
1611 heap[0].key = key;
1612 heap[0].record = next_record;
1613 heap_sift_down(&mut heap, 0, heap_size, <);
1614 } else {
1615 heap_size -= 1;
1616 if heap_size > 0 {
1617 heap.swap(0, heap_size);
1618 heap_sift_down(&mut heap, 0, heap_size, <);
1619 }
1620 }
1621 }
1622
1623 let index = writer.finish()?;
1624 info!("Merge complete: {records_merged} records merged");
1625 Ok(index)
1626 }
1627
1628 fn create_output_header(&self, header: &Header) -> Header {
1630 let mut builder = Header::builder();
1631
1632 for (name, seq) in header.reference_sequences() {
1634 builder = builder.add_reference_sequence(name.as_slice(), seq.clone());
1635 }
1636
1637 for (id, rg) in header.read_groups() {
1639 builder = builder.add_read_group(id.as_slice(), rg.clone());
1640 }
1641
1642 for (id, pg) in header.programs().as_ref() {
1644 builder = builder.add_program(id.as_slice(), pg.clone());
1645 }
1646
1647 for comment in header.comments() {
1649 builder = builder.add_comment(comment.clone());
1650 }
1651
1652 let hd = match self.sort_order {
1654 SortOrder::Coordinate => {
1655 Map::<noodles::sam::header::record::value::map::Header>::builder()
1656 .insert(header_tag::SORT_ORDER, BString::from("coordinate"))
1657 .build()
1658 .expect("valid header")
1659 }
1660 SortOrder::Queryname => {
1661 Map::<noodles::sam::header::record::value::map::Header>::builder()
1662 .insert(header_tag::SORT_ORDER, BString::from("queryname"))
1663 .build()
1664 .expect("valid header")
1665 }
1666 SortOrder::TemplateCoordinate => {
1667 Map::<noodles::sam::header::record::value::map::Header>::builder()
1668 .insert(header_tag::SORT_ORDER, BString::from("unsorted"))
1669 .insert(header_tag::GROUP_ORDER, BString::from("query"))
1670 .insert(header_tag::SUBSORT_ORDER, BString::from("template-coordinate"))
1671 .build()
1672 .expect("valid header")
1673 }
1674 };
1675
1676 builder = builder.set_header(hd);
1677 builder.build()
1678 }
1679
1680 fn create_temp_dir(&self) -> Result<TempDir> {
1682 super::create_temp_dir(self.temp_dir.as_deref())
1683 }
1684}
1685
1686#[must_use]
1691pub fn extract_template_key_inline(bam_bytes: &[u8], lib_lookup: &LibraryLookup) -> TemplateKey {
1692 use crate::sort::bam_fields;
1693 use bam_fields::{
1694 find_mc_tag_in_record, find_mi_tag_in_record, flags, get_cigar_ops, mate_unclipped_5prime,
1695 unclipped_5prime,
1696 };
1697
1698 let mi = find_mi_tag_in_record(bam_bytes);
1700
1701 let library = lib_lookup.get_ordinal(bam_bytes);
1703
1704 let tid = bam_fields::ref_id(bam_bytes);
1706 let pos = bam_fields::pos(bam_bytes);
1707 let l_read_name = bam_fields::l_read_name(bam_bytes) as usize;
1708 let flag = bam_fields::flags(bam_bytes);
1709 let mate_tid = bam_fields::mate_ref_id(bam_bytes);
1710 let mate_pos = bam_fields::mate_pos(bam_bytes);
1711
1712 let is_unmapped = (flag & flags::UNMAPPED) != 0;
1714 let mate_unmapped = (flag & flags::MATE_UNMAPPED) != 0;
1715 let is_reverse = (flag & flags::REVERSE) != 0;
1716 let mate_reverse = (flag & flags::MATE_REVERSE) != 0;
1717 let is_paired = (flag & flags::PAIRED) != 0;
1718
1719 let name_len = l_read_name.saturating_sub(1);
1721 let name = if name_len > 0 && 32 + name_len <= bam_bytes.len() {
1722 &bam_bytes[32..32 + name_len]
1723 } else {
1724 &[]
1725 };
1726 let name_hash = lib_lookup.hash_name(name);
1727
1728 if is_unmapped {
1730 if is_paired && !mate_unmapped {
1731 let mate_unclipped = find_mc_tag_in_record(bam_bytes)
1733 .map_or(mate_pos, |mc| mate_unclipped_5prime(mate_pos, mate_reverse, mc));
1734
1735 return TemplateKey::new(
1736 mate_tid,
1737 mate_unclipped,
1738 mate_reverse,
1739 i32::MAX,
1740 i32::MAX,
1741 false,
1742 library,
1743 mi,
1744 name_hash,
1745 true, );
1747 }
1748
1749 let is_read2 = (flag & 0x80) != 0; return TemplateKey::unmapped(name_hash, is_read2);
1752 }
1753
1754 let cigar_ops = get_cigar_ops(bam_bytes);
1756 let this_pos = unclipped_5prime(pos, is_reverse, &cigar_ops);
1757
1758 let mate_unclipped = if is_paired && !mate_unmapped {
1760 find_mc_tag_in_record(bam_bytes)
1761 .map_or(mate_pos, |mc| mate_unclipped_5prime(mate_pos, mate_reverse, mc))
1762 } else {
1763 mate_pos
1764 };
1765
1766 let (tid1, tid2, pos1, pos2, neg1, neg2, is_upper) = if is_paired && !mate_unmapped {
1768 let is_upper = (tid, this_pos) > (mate_tid, mate_unclipped)
1770 || ((tid, this_pos) == (mate_tid, mate_unclipped) && is_reverse);
1771
1772 if is_upper {
1773 (mate_tid, tid, mate_unclipped, this_pos, mate_reverse, is_reverse, true)
1775 } else {
1776 (tid, mate_tid, this_pos, mate_unclipped, is_reverse, mate_reverse, false)
1778 }
1779 } else {
1780 (tid, i32::MAX, this_pos, i32::MAX, is_reverse, false, false)
1782 };
1783
1784 TemplateKey::new(tid1, pos1, neg1, tid2, pos2, neg2, library, mi, name_hash, is_upper)
1785}
1786
1787pub use super::SortStats as RawSortStats;
1789
1790#[cfg(test)]
1791mod tests {
1792 use super::*;
1793 use noodles::sam::header::record::value::map::ReadGroup;
1794
1795 #[test]
1800 fn test_library_lookup_empty_header() {
1801 let header = Header::builder().build();
1802 let lookup = LibraryLookup::from_header(&header);
1803 assert!(lookup.rg_to_ordinal.is_empty());
1804 }
1805
1806 #[test]
1807 fn test_library_lookup_single_rg() {
1808 let rg = Map::<ReadGroup>::builder()
1809 .insert(rg_tag::LIBRARY, String::from("LibA"))
1810 .build()
1811 .expect("valid");
1812 let header = Header::builder().add_read_group(BString::from("rg1"), rg).build();
1813
1814 let lookup = LibraryLookup::from_header(&header);
1815 assert_eq!(lookup.rg_to_ordinal.len(), 1);
1816 assert_eq!(*lookup.rg_to_ordinal.get(b"rg1".as_slice()).unwrap(), 1);
1818 }
1819
1820 #[test]
1821 fn test_library_lookup_multiple_libraries() {
1822 let rg_a = Map::<ReadGroup>::builder()
1823 .insert(rg_tag::LIBRARY, String::from("LibC"))
1824 .build()
1825 .expect("valid");
1826 let rg_b = Map::<ReadGroup>::builder()
1827 .insert(rg_tag::LIBRARY, String::from("LibA"))
1828 .build()
1829 .expect("valid");
1830 let rg_c = Map::<ReadGroup>::builder()
1831 .insert(rg_tag::LIBRARY, String::from("LibB"))
1832 .build()
1833 .expect("valid");
1834
1835 let header = Header::builder()
1836 .add_read_group(BString::from("rg1"), rg_a)
1837 .add_read_group(BString::from("rg2"), rg_b)
1838 .add_read_group(BString::from("rg3"), rg_c)
1839 .build();
1840
1841 let lookup = LibraryLookup::from_header(&header);
1842 assert_eq!(lookup.rg_to_ordinal.len(), 3);
1843
1844 assert_eq!(*lookup.rg_to_ordinal.get(b"rg2".as_slice()).unwrap(), 1); assert_eq!(*lookup.rg_to_ordinal.get(b"rg3".as_slice()).unwrap(), 2); assert_eq!(*lookup.rg_to_ordinal.get(b"rg1".as_slice()).unwrap(), 3); }
1849
1850 #[test]
1851 fn test_library_lookup_unknown_rg_returns_zero() {
1852 let rg = Map::<ReadGroup>::builder()
1853 .insert(rg_tag::LIBRARY, String::from("LibA"))
1854 .build()
1855 .expect("valid");
1856 let header = Header::builder().add_read_group(BString::from("rg1"), rg).build();
1857
1858 let lookup = LibraryLookup::from_header(&header);
1859 let mut bam = vec![0u8; 36];
1862 bam[8] = 4; bam[32..36].copy_from_slice(b"rea\0");
1864 assert_eq!(lookup.get_ordinal(&bam), 0);
1865 }
1866
1867 #[test]
1872 fn test_raw_sorter_defaults() {
1873 let sorter = RawExternalSorter::new(SortOrder::Coordinate);
1874 assert_eq!(sorter.memory_limit, 512 * 1024 * 1024);
1875 assert!(sorter.temp_dir.is_none());
1876 assert_eq!(sorter.threads, 1);
1877 assert_eq!(sorter.output_compression, 6);
1878 assert_eq!(sorter.temp_compression, 1);
1879 assert!(!sorter.write_index);
1880 assert!(sorter.pg_info.is_none());
1881 assert_eq!(sorter.max_temp_files, DEFAULT_MAX_TEMP_FILES);
1882 }
1883
1884 #[test]
1885 fn test_raw_sorter_builder_chain() {
1886 let sorter = RawExternalSorter::new(SortOrder::Queryname)
1887 .memory_limit(1024)
1888 .temp_dir(PathBuf::from("/tmp/test"))
1889 .threads(8)
1890 .output_compression(9)
1891 .temp_compression(3)
1892 .write_index(true)
1893 .pg_info("1.0".to_string(), "fgumi sort".to_string())
1894 .max_temp_files(128);
1895
1896 assert_eq!(sorter.memory_limit, 1024);
1897 assert_eq!(sorter.temp_dir, Some(PathBuf::from("/tmp/test")));
1898 assert_eq!(sorter.threads, 8);
1899 assert_eq!(sorter.output_compression, 9);
1900 assert_eq!(sorter.temp_compression, 3);
1901 assert!(sorter.write_index);
1902 assert_eq!(sorter.pg_info, Some(("1.0".to_string(), "fgumi sort".to_string())));
1903 assert_eq!(sorter.max_temp_files, 128);
1904 }
1905
1906 #[test]
1907 fn test_raw_sorter_memory_limit() {
1908 let sorter = RawExternalSorter::new(SortOrder::Coordinate).memory_limit(256 * 1024 * 1024);
1909 assert_eq!(sorter.memory_limit, 256 * 1024 * 1024);
1910 }
1911
1912 #[test]
1913 fn test_raw_sorter_temp_compression() {
1914 let sorter = RawExternalSorter::new(SortOrder::Coordinate).temp_compression(0);
1915 assert_eq!(sorter.temp_compression, 0);
1916 }
1917
1918 #[test]
1919 fn test_raw_sorter_max_temp_files() {
1920 let sorter = RawExternalSorter::new(SortOrder::Coordinate).max_temp_files(0);
1921 assert_eq!(sorter.max_temp_files, 0);
1922 }
1923
1924 #[test]
1929 fn test_create_output_header_coordinate() {
1930 let sorter = RawExternalSorter::new(SortOrder::Coordinate);
1931 let header = Header::builder().build();
1932 let output_header = sorter.create_output_header(&header);
1933
1934 let hd = output_header.header().expect("header should have HD record");
1935 let so = hd.other_fields().get(b"SO").expect("should have SO tag");
1936 assert_eq!(<_ as AsRef<[u8]>>::as_ref(so), b"coordinate");
1937 }
1938
1939 #[test]
1940 fn test_create_output_header_queryname() {
1941 let sorter = RawExternalSorter::new(SortOrder::Queryname);
1942 let header = Header::builder().build();
1943 let output_header = sorter.create_output_header(&header);
1944
1945 let hd = output_header.header().expect("header should have HD record");
1946 let so = hd.other_fields().get(b"SO").expect("should have SO tag");
1947 assert_eq!(<_ as AsRef<[u8]>>::as_ref(so), b"queryname");
1948 }
1949
1950 #[test]
1951 fn test_create_output_header_template_coordinate() {
1952 let sorter = RawExternalSorter::new(SortOrder::TemplateCoordinate);
1953 let header = Header::builder().build();
1954 let output_header = sorter.create_output_header(&header);
1955
1956 let hd = output_header.header().expect("header should have HD record");
1957 let fields = hd.other_fields();
1958
1959 let so = fields.get(b"SO").expect("should have SO tag");
1960 assert_eq!(<_ as AsRef<[u8]>>::as_ref(so), b"unsorted");
1961
1962 let go = fields.get(b"GO").expect("should have GO tag");
1963 assert_eq!(<_ as AsRef<[u8]>>::as_ref(go), b"query");
1964
1965 let ss = fields.get(b"SS").expect("should have SS tag");
1966 assert_eq!(<_ as AsRef<[u8]>>::as_ref(ss), b"template-coordinate");
1967 }
1968
1969 fn build_bam_with_aux(aux_data: &[u8]) -> Vec<u8> {
1976 let l_read_name: u8 = 4; let mut bam = vec![0u8; 32];
1978 bam[8] = l_read_name;
1979 bam.extend_from_slice(b"rea\0");
1983 bam.extend_from_slice(aux_data);
1986 bam
1987 }
1988
1989 #[rstest::rstest]
1990 #[case::present(b"RGZgroup1\0".as_slice(), Some(b"group1".as_slice()))]
1991 #[case::absent(b"".as_slice(), None)]
1992 fn test_find_rg_tag(#[case] aux_data: &[u8], #[case] expected: Option<&[u8]>) {
1993 let bam = build_bam_with_aux(aux_data);
1994 assert_eq!(fgumi_raw_bam::tags::find_string_tag_in_record(&bam, b"RG"), expected);
1995 }
1996
1997 #[test]
1998 fn test_find_rg_tag_after_other_tags() {
1999 let mut aux = Vec::new();
2001 aux.extend_from_slice(b"XYi");
2002 aux.extend_from_slice(&42i32.to_le_bytes());
2003 aux.extend_from_slice(b"RGZmygroup\0");
2004 let bam = build_bam_with_aux(&aux);
2005 assert_eq!(
2006 fgumi_raw_bam::tags::find_string_tag_in_record(&bam, b"RG"),
2007 Some(b"mygroup".as_slice())
2008 );
2009 }
2010}