Skip to main content

fgumi_lib/sort/
raw.rs

1//! Raw-bytes sorting implementation for BAM files.
2//!
3//! This module implements high-performance BAM sorting using lazy record parsing.
4//! Instead of fully decoding each BAM record into `RecordBuf`, it uses noodles'
5//! lazy `Record` type that stores raw bytes and only parses fields on-demand.
6//!
7//! # Performance Benefits
8//!
9//! - **3-4x lower memory usage**: Raw bytes are ~200-400 bytes vs ~800-1200 bytes decoded
10//! - **No re-encoding overhead**: Records are written back as raw bytes
11//! - **Lazy field access**: Only sort-key fields are parsed
12//!
13//! # Algorithm
14//!
15//! 1. Read BAM records as lazy `Record` objects (raw bytes)
16//! 2. Extract only the fields needed for sort keys (tid, pos, flags, name, MI)
17//! 3. Sort by keys while keeping raw records
18//! 4. Write raw record bytes to output
19
20use crate::bam_io::create_raw_bam_reader;
21use crate::sort::inline_buffer::{RecordBuffer, TemplateKey, TemplateRecordBuffer};
22use crate::sort::keys::SortOrder;
23use crate::sort::radix::{heap_make, heap_sift_down};
24use crate::sort::read_ahead::RawReadAheadReader;
25use anyhow::Result;
26use bstr::BString;
27use crossbeam_channel::{Receiver, bounded};
28use log::info;
29use noodles::sam::Header;
30use noodles::sam::header::record::value::Map;
31use noodles::sam::header::record::value::map::header::tag as header_tag;
32use noodles::sam::header::record::value::map::read_group::tag as rg_tag;
33use noodles_bgzf::io::{
34    MultithreadedWriter, Reader as BgzfReader, Writer as BgzfWriter, multithreaded_writer,
35    writer::CompressionLevel,
36};
37use std::cmp::Ordering;
38use std::collections::HashMap;
39use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
40use std::num::NonZero;
41use std::path::{Path, PathBuf};
42use std::thread::{self, JoinHandle};
43use tempfile::TempDir;
44
45// ============================================================================
46// Library Lookup for Template-Coordinate Sort
47// ============================================================================
48
49/// Maps read group ID -> library ordinal for O(1) comparison.
50///
51/// Pre-computes ordinals by sorting library names alphabetically.
52/// Empty/unknown library sorts first (ordinal 0).
53pub struct LibraryLookup {
54    /// RG ID -> library ordinal
55    rg_to_ordinal: HashMap<Vec<u8>, u32>,
56    /// Deterministic hasher for read name hashing, constructed once for reuse.
57    hasher: ahash::RandomState,
58}
59
60impl LibraryLookup {
61    /// Build lookup from BAM header.
62    #[must_use]
63    #[allow(clippy::cast_possible_truncation)]
64    pub fn from_header(header: &Header) -> Self {
65        // Collect all unique library names from read groups
66        let mut libraries: Vec<String> = header
67            .read_groups()
68            .iter()
69            .filter_map(|(_, rg)| {
70                rg.other_fields().get(&rg_tag::LIBRARY).map(std::string::ToString::to_string)
71            })
72            .collect();
73
74        // Sort alphabetically and deduplicate
75        libraries.sort();
76        libraries.dedup();
77
78        // Build library name -> ordinal mapping
79        // Empty string gets ordinal 0, then libraries in sorted order
80        let mut lib_to_ordinal: HashMap<String, u32> = HashMap::new();
81        lib_to_ordinal.insert(String::new(), 0);
82        for (i, lib) in libraries.iter().enumerate() {
83            lib_to_ordinal.insert(lib.clone(), (i + 1) as u32);
84        }
85
86        // Build RG ID -> ordinal mapping
87        let rg_to_ordinal: HashMap<Vec<u8>, u32> = header
88            .read_groups()
89            .iter()
90            .map(|(id, rg)| {
91                let lib = rg
92                    .other_fields()
93                    .get(&rg_tag::LIBRARY)
94                    .map(std::string::ToString::to_string)
95                    .unwrap_or_default();
96                let ordinal = *lib_to_ordinal.get(&lib).unwrap_or(&0);
97                (id.to_vec(), ordinal)
98            })
99            .collect();
100
101        let hasher = ahash::RandomState::with_seeds(
102            0x517c_c1b7_2722_0a95,
103            0x1234_5678_90ab_cdef,
104            0xfedc_ba98_7654_3210,
105            0x0123_4567_89ab_cdef,
106        );
107
108        Self { rg_to_ordinal, hasher }
109    }
110
111    /// Hash a read name deterministically.
112    #[inline]
113    #[must_use]
114    pub fn hash_name(&self, name: &[u8]) -> u64 {
115        self.hasher.hash_one(name)
116    }
117
118    /// Get library ordinal for a record (from RG tag in aux data).
119    #[inline]
120    #[must_use]
121    pub fn get_ordinal(&self, bam: &[u8]) -> u32 {
122        fgumi_raw_bam::tags::find_string_tag_in_record(bam, b"RG")
123            .and_then(|rg| self.rg_to_ordinal.get(rg))
124            .copied()
125            .unwrap_or(0)
126    }
127}
128
129/// Number of records to prefetch per chunk during merge.
130/// Larger buffer reduces I/O latency impact during merge.
131const MERGE_PREFETCH_SIZE: usize = 1024;
132
133/// Maximum number of temp files before consolidation (like samtools).
134/// When this limit is reached, oldest files are merged to reduce file count.
135const DEFAULT_MAX_TEMP_FILES: usize = 64;
136
137// ============================================================================
138// Generic Keyed Temp File I/O (works with any RawSortKey)
139// ============================================================================
140//
141// Stores pre-computed sort keys alongside each record for O(1) merge comparisons.
142// Format: [key: serialized][len: 4 bytes][record: len bytes] per record
143
144use crate::sort::keys::RawSortKey;
145use std::marker::PhantomData;
146
147/// Wrapper for temp chunk writers supporting both raw and compressed output.
148enum ChunkWriterInner {
149    /// Uncompressed raw output (fastest).
150    Raw(BufWriter<std::fs::File>),
151    /// Single-threaded BGZF-compressed output.
152    SingleThreaded(BgzfWriter<BufWriter<std::fs::File>>),
153    /// Multi-threaded BGZF-compressed output (faster for large chunks).
154    MultiThreaded(MultithreadedWriter<BufWriter<std::fs::File>>),
155}
156
157impl Write for ChunkWriterInner {
158    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
159        match self {
160            ChunkWriterInner::Raw(w) => w.write(buf),
161            ChunkWriterInner::SingleThreaded(w) => w.write(buf),
162            ChunkWriterInner::MultiThreaded(w) => w.write(buf),
163        }
164    }
165
166    fn flush(&mut self) -> std::io::Result<()> {
167        match self {
168            ChunkWriterInner::Raw(w) => w.flush(),
169            ChunkWriterInner::SingleThreaded(w) => w.flush(),
170            ChunkWriterInner::MultiThreaded(w) => w.flush(),
171        }
172    }
173}
174
175impl ChunkWriterInner {
176    fn finish(self) -> Result<()> {
177        match self {
178            ChunkWriterInner::Raw(mut w) => {
179                w.flush()?;
180                Ok(())
181            }
182            ChunkWriterInner::SingleThreaded(w) => {
183                w.finish()?;
184                Ok(())
185            }
186            ChunkWriterInner::MultiThreaded(mut w) => {
187                w.finish()?;
188                Ok(())
189            }
190        }
191    }
192}
193
194/// Generic writer for keyed temp chunks with pre-computed sort keys.
195///
196/// Works with any type implementing `RawSortKey`.
197/// Supports optional BGZF compression for reduced disk usage.
198pub struct GenericKeyedChunkWriter<K: RawSortKey> {
199    writer: ChunkWriterInner,
200    _marker: PhantomData<K>,
201}
202
203impl<K: RawSortKey> GenericKeyedChunkWriter<K> {
204    /// Create a new keyed chunk writer with optional compression.
205    ///
206    /// - `compression_level` 0 = uncompressed (fastest, uses most disk).
207    /// - `compression_level` > 0 = BGZF compression at specified level.
208    /// - `threads` > 1 enables multi-threaded compression.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the output file cannot be created.
213    ///
214    /// # Panics
215    ///
216    /// Panics if `threads` is greater than 1 but `NonZero::new` receives zero.
217    pub fn create(path: &Path, compression_level: u32, threads: usize) -> Result<Self> {
218        let file = std::fs::File::create(path)?;
219        let buf = BufWriter::with_capacity(256 * 1024, file);
220
221        let writer = if compression_level == 0 {
222            ChunkWriterInner::Raw(buf)
223        } else if threads > 1 {
224            // Use multi-threaded BGZF for faster compression
225            let worker_count = NonZero::new(threads).expect("threads > 1");
226            let mut builder =
227                multithreaded_writer::Builder::default().set_worker_count(worker_count);
228            #[allow(clippy::cast_possible_truncation)]
229            if let Some(level) = CompressionLevel::new(compression_level as u8) {
230                builder = builder.set_compression_level(level);
231            }
232            ChunkWriterInner::MultiThreaded(builder.build_from_writer(buf))
233        } else {
234            // Single-threaded BGZF with specified compression level
235            #[allow(clippy::cast_possible_truncation)]
236            let level = CompressionLevel::new(compression_level as u8)
237                .unwrap_or_else(|| CompressionLevel::new(6).unwrap());
238            let writer = noodles_bgzf::io::writer::Builder::default()
239                .set_compression_level(level)
240                .build_from_writer(buf);
241            ChunkWriterInner::SingleThreaded(writer)
242        };
243
244        Ok(Self { writer, _marker: PhantomData })
245    }
246
247    /// Write a keyed record.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if writing to the underlying writer fails.
252    #[inline]
253    #[allow(clippy::cast_possible_truncation)]
254    pub fn write_record(&mut self, key: &K, record: &[u8]) -> Result<()> {
255        key.write_to(&mut self.writer)?;
256        self.writer.write_all(&(record.len() as u32).to_le_bytes())?;
257        self.writer.write_all(record)?;
258        Ok(())
259    }
260
261    /// Finish writing and flush.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if flushing the writer fails.
266    pub fn finish(self) -> Result<()> {
267        self.writer.finish()
268    }
269}
270
271/// Generic reader for keyed temp chunks with background prefetching.
272///
273/// Works with any type implementing `RawSortKey`.
274/// Auto-detects BGZF compression via magic bytes.
275pub struct GenericKeyedChunkReader<K: RawSortKey + 'static> {
276    receiver: Receiver<Option<(K, Vec<u8>)>>,
277    _handle: JoinHandle<()>,
278}
279
280impl<K: RawSortKey + 'static> GenericKeyedChunkReader<K> {
281    /// Open a keyed chunk file for reading with background prefetching.
282    /// Auto-detects BGZF/gzip compression via magic bytes (0x1f 0x8b).
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the file cannot be opened.
287    pub fn open(path: &Path) -> Result<Self> {
288        let (tx, rx) = bounded(MERGE_PREFETCH_SIZE);
289        let path = path.to_path_buf();
290
291        let handle = thread::spawn(move || {
292            let file = match std::fs::File::open(&path) {
293                Ok(f) => f,
294                Err(e) => {
295                    log::error!("Failed to open keyed chunk {}: {}", path.display(), e);
296                    let _ = tx.send(None);
297                    return;
298                }
299            };
300            let mut buf_reader = BufReader::with_capacity(256 * 1024, file);
301
302            // Check for gzip/BGZF magic bytes: 0x1f 0x8b
303            let mut magic = [0u8; 2];
304            let is_compressed = if buf_reader.read_exact(&mut magic).is_ok() {
305                magic == [0x1f, 0x8b]
306            } else {
307                false
308            };
309
310            // Seek back to start
311            if buf_reader.seek(SeekFrom::Start(0)).is_err() {
312                log::error!("Failed to seek in keyed chunk {}", path.display());
313                let _ = tx.send(None);
314                return;
315            }
316
317            // Read using appropriate decoder
318            if is_compressed {
319                let bgzf_reader = BgzfReader::new(buf_reader);
320                Self::read_records(bgzf_reader, tx);
321            } else {
322                Self::read_records(buf_reader, tx);
323            }
324        });
325
326        Ok(Self { receiver: rx, _handle: handle })
327    }
328
329    /// Read records from a reader and send them through the channel.
330    #[allow(clippy::needless_pass_by_value)]
331    fn read_records<R: Read>(mut reader: R, tx: crossbeam_channel::Sender<Option<(K, Vec<u8>)>>) {
332        loop {
333            // Read key using the trait method
334            let key = match K::read_from(&mut reader) {
335                Ok(k) => k,
336                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
337                    // Normal EOF
338                    let _ = tx.send(None);
339                    break;
340                }
341                Err(e) => {
342                    log::error!("Error reading keyed chunk key: {e}");
343                    let _ = tx.send(None);
344                    break;
345                }
346            };
347
348            // Read record length
349            let mut len_buf = [0u8; 4];
350            if reader.read_exact(&mut len_buf).is_err() {
351                log::error!("Error reading keyed chunk length");
352                let _ = tx.send(None);
353                break;
354            }
355            let len = u32::from_le_bytes(len_buf) as usize;
356
357            // Read record data
358            let mut record = vec![0u8; len];
359            if reader.read_exact(&mut record).is_err() {
360                log::error!("Error reading keyed chunk record");
361                let _ = tx.send(None);
362                break;
363            }
364
365            if tx.send(Some((key, record))).is_err() {
366                break; // Receiver dropped
367            }
368        }
369    }
370
371    /// Read the next keyed record from the prefetch buffer.
372    pub fn next_record(&mut self) -> Option<(K, Vec<u8>)> {
373        match self.receiver.recv() {
374            Ok(Some(entry)) => Some(entry),
375            Ok(None) | Err(_) => None,
376        }
377    }
378}
379
380/// Raw-bytes external sorter for BAM files.
381///
382/// This sorter uses lazy record parsing to minimize memory usage and avoid
383/// re-encoding overhead. It's significantly faster than the RecordBuf-based
384/// sorter for large files.
385pub struct RawExternalSorter {
386    /// Sort order to use.
387    sort_order: SortOrder,
388    /// Maximum memory to use for in-memory sorting.
389    memory_limit: usize,
390    /// Temporary directory for spill files.
391    temp_dir: Option<PathBuf>,
392    /// Number of threads for parallel operations.
393    threads: usize,
394    /// Compression level for output.
395    output_compression: u32,
396    /// Compression level for temporary chunk files (0 = uncompressed).
397    temp_compression: u32,
398    /// Whether to write BAM index alongside output (coordinate sort only).
399    write_index: bool,
400    /// Program record info (version, `command_line`) for @PG header.
401    pg_info: Option<(String, String)>,
402    /// Maximum temp files before consolidation (0 = unlimited).
403    max_temp_files: usize,
404}
405
406impl RawExternalSorter {
407    /// Create a new raw external sorter with the given sort order.
408    #[must_use]
409    pub fn new(sort_order: SortOrder) -> Self {
410        Self {
411            sort_order,
412            memory_limit: 512 * 1024 * 1024, // 512 MB default
413            temp_dir: None,
414            threads: 1,
415            output_compression: 6,
416            temp_compression: 1, // Default: fast compression
417            write_index: false,
418            pg_info: None,
419            max_temp_files: DEFAULT_MAX_TEMP_FILES,
420        }
421    }
422
423    /// Set the memory limit for in-memory sorting.
424    #[must_use]
425    pub fn memory_limit(mut self, limit: usize) -> Self {
426        self.memory_limit = limit;
427        self
428    }
429
430    /// Set the temporary directory for spill files.
431    #[must_use]
432    pub fn temp_dir(mut self, path: PathBuf) -> Self {
433        self.temp_dir = Some(path);
434        self
435    }
436
437    /// Set the number of threads.
438    #[must_use]
439    pub fn threads(mut self, threads: usize) -> Self {
440        self.threads = threads;
441        self
442    }
443
444    /// Set the output compression level.
445    #[must_use]
446    pub fn output_compression(mut self, level: u32) -> Self {
447        self.output_compression = level;
448        self
449    }
450
451    /// Set compression level for temporary chunk files.
452    ///
453    /// Level 0 disables compression (fastest, uses most disk space).
454    /// Level 1 (default) provides fast compression with reasonable space savings.
455    /// Higher levels provide better compression but are slower.
456    #[must_use]
457    pub fn temp_compression(mut self, level: u32) -> Self {
458        self.temp_compression = level;
459        self
460    }
461
462    /// Enable writing BAM index alongside output.
463    ///
464    /// Only valid for coordinate sort. When enabled, writes `<output>.bai`
465    /// alongside the output BAM file. Uses single-threaded compression
466    /// for accurate virtual position tracking.
467    #[must_use]
468    pub fn write_index(mut self, enabled: bool) -> Self {
469        self.write_index = enabled;
470        self
471    }
472
473    /// Set program record info for @PG header entry.
474    #[must_use]
475    pub fn pg_info(mut self, version: String, command_line: String) -> Self {
476        self.pg_info = Some((version, command_line));
477        self
478    }
479
480    /// Set maximum temp files before consolidation.
481    ///
482    /// When the number of temp files exceeds this limit, the oldest files
483    /// are merged together to reduce the count. Set to 0 for unlimited.
484    /// Default is 64 (matching samtools).
485    #[must_use]
486    pub fn max_temp_files(mut self, max: usize) -> Self {
487        self.max_temp_files = max;
488        self
489    }
490
491    /// Consolidate temp files if we've exceeded the limit.
492    ///
493    /// Merges the oldest half of temp files into a single new file to reduce
494    /// the total count while maintaining sort order.
495    fn maybe_consolidate_temp_files<K: RawSortKey + Default + 'static>(
496        &self,
497        chunk_files: &mut Vec<PathBuf>,
498        temp_path: &Path,
499        consolidation_count: &mut usize,
500    ) -> Result<()> {
501        struct HeapEntry<K> {
502            key: K,
503            record: Vec<u8>,
504            reader_idx: usize,
505        }
506
507        if self.max_temp_files == 0 || chunk_files.len() < self.max_temp_files {
508            return Ok(());
509        }
510
511        // Need at least 2 files to consolidate meaningfully
512        if self.max_temp_files < 2 {
513            return Ok(());
514        }
515
516        // Merge oldest half of files into one (at least 2)
517        let merge_count = (self.max_temp_files / 2).max(2).min(chunk_files.len());
518        let files_to_merge: Vec<PathBuf> = chunk_files.drain(..merge_count).collect();
519
520        info!(
521            "Consolidating {} temp files into 1 (total was {})...",
522            merge_count,
523            merge_count + chunk_files.len()
524        );
525
526        // Create merged output file
527        let merged_path = temp_path.join(format!("merged_{:04}.keyed", *consolidation_count));
528        *consolidation_count += 1;
529
530        // Open readers for files to merge
531        let mut readers: Vec<GenericKeyedChunkReader<K>> = files_to_merge
532            .iter()
533            .map(|p| GenericKeyedChunkReader::<K>::open(p))
534            .collect::<Result<Vec<_>>>()?;
535
536        // Create writer for merged output
537        let mut writer = GenericKeyedChunkWriter::<K>::create(
538            &merged_path,
539            self.temp_compression,
540            self.threads,
541        )?;
542
543        // Initialize heap with first record from each reader
544        let mut heap: Vec<HeapEntry<K>> = Vec::with_capacity(readers.len());
545        for (reader_idx, reader) in readers.iter_mut().enumerate() {
546            if let Some((key, record)) = reader.next_record() {
547                heap.push(HeapEntry { key, record, reader_idx });
548            }
549        }
550
551        let mut heap_size = heap.len();
552        if heap_size == 0 {
553            writer.finish()?;
554            // Insert at beginning to preserve stable order
555            chunk_files.insert(0, merged_path);
556            // Clean up old files
557            for path in &files_to_merge {
558                let _ = std::fs::remove_file(path);
559            }
560            return Ok(());
561        }
562
563        // Use chunk_idx as tie-breaker for stable merge
564        let lt = |a: &HeapEntry<K>, b: &HeapEntry<K>| -> bool {
565            a.key.cmp(&b.key).then_with(|| a.reader_idx.cmp(&b.reader_idx)) == Ordering::Greater
566        };
567
568        heap_make(&mut heap, &lt);
569
570        // Merge loop
571        while heap_size > 0 {
572            let reader_idx = heap[0].reader_idx;
573            let key = std::mem::take(&mut heap[0].key);
574            let record = std::mem::take(&mut heap[0].record);
575
576            writer.write_record(&key, &record)?;
577
578            if let Some((next_key, next_record)) = readers[reader_idx].next_record() {
579                heap[0].key = next_key;
580                heap[0].record = next_record;
581                heap_sift_down(&mut heap, 0, heap_size, &lt);
582            } else {
583                heap_size -= 1;
584                if heap_size > 0 {
585                    heap.swap(0, heap_size);
586                    heap_sift_down(&mut heap, 0, heap_size, &lt);
587                }
588            }
589        }
590
591        writer.finish()?;
592
593        // Insert merged file at the beginning to preserve stable order for equal keys.
594        // The merged file contains the oldest records, so it should be processed first.
595        chunk_files.insert(0, merged_path);
596
597        // Clean up old files
598        for path in &files_to_merge {
599            let _ = std::fs::remove_file(path);
600        }
601
602        info!("Consolidation complete, {} temp files remain", chunk_files.len());
603
604        Ok(())
605    }
606
607    /// Sort a BAM file using raw-bytes approach.
608    ///
609    /// # Errors
610    ///
611    /// Returns an error if reading, sorting, or writing the BAM file fails.
612    pub fn sort(&self, input: &Path, output: &Path) -> Result<RawSortStats> {
613        info!("Starting raw-bytes sort with order: {:?}", self.sort_order);
614        info!("Memory limit: {} MB", self.memory_limit / (1024 * 1024));
615        info!("Threads: {}", self.threads);
616
617        // Open input BAM with lazy record reading
618        let (reader, header) = create_raw_bam_reader(input, self.threads)?;
619
620        // Add @PG record if pg_info was provided
621        let header = if let Some((ref version, ref command_line)) = self.pg_info {
622            crate::header::add_pg_record(header, version, command_line)?
623        } else {
624            header
625        };
626
627        // Create temp directory
628        let temp_dir = self.create_temp_dir()?;
629        let temp_path = temp_dir.path();
630
631        // Sort based on order
632        match self.sort_order {
633            SortOrder::Coordinate => self.sort_coordinate(reader, &header, output, temp_path),
634            SortOrder::Queryname => self.sort_queryname(reader, &header, output, temp_path),
635            SortOrder::TemplateCoordinate => {
636                self.sort_template_coordinate(reader, &header, output, temp_path)
637            }
638        }
639    }
640
641    /// Sort by coordinate order using optimized radix sort for large arrays.
642    fn sort_coordinate(
643        &self,
644        reader: crate::bam_io::RawBamReaderAuto,
645        header: &Header,
646        output: &Path,
647        temp_path: &Path,
648    ) -> Result<RawSortStats> {
649        if self.write_index {
650            self.sort_coordinate_with_index(reader, header, output, temp_path)
651        } else {
652            self.sort_coordinate_optimized(reader, header, output, temp_path)
653        }
654    }
655
656    /// Optimized coordinate sort using inline buffer for reduced memory overhead.
657    ///
658    /// Uses `RecordBuffer` which stores records in a single contiguous allocation
659    /// with pre-computed sort keys, eliminating per-record heap allocations.
660    #[allow(clippy::cast_possible_truncation)]
661    fn sort_coordinate_optimized(
662        &self,
663        reader: crate::bam_io::RawBamReaderAuto,
664        header: &Header,
665        output: &Path,
666        temp_path: &Path,
667    ) -> Result<RawSortStats> {
668        use crate::sort::keys::RawCoordinateKey;
669
670        let mut stats = RawSortStats::default();
671
672        // Get number of references (unmapped reads map to nref)
673        let nref = header.reference_sequences().len() as u32;
674
675        // Estimate capacity: ~200 bytes per record average
676        let estimated_records = self.memory_limit / 200;
677
678        let mut chunk_files: Vec<PathBuf> = Vec::new();
679        let mut buffer = RecordBuffer::with_capacity(estimated_records, self.memory_limit, nref);
680        let mut consolidation_count = 0usize;
681
682        let read_ahead = RawReadAheadReader::new(reader);
683
684        info!("Phase 1: Reading and sorting chunks (inline buffer, keyed output)...");
685
686        for record in read_ahead {
687            stats.total_records += 1;
688
689            // Push directly to buffer - key extracted inline from raw bytes
690            buffer.push_coordinate(record.as_ref());
691
692            // Check memory usage
693            if buffer.memory_usage() >= self.memory_limit {
694                let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
695
696                // Sort in place using parallel sort for large buffers
697                if self.threads > 1 {
698                    buffer.par_sort();
699                } else {
700                    buffer.sort();
701                }
702
703                // Write keyed temp file (stores sort key with each record for O(1) merge)
704                let mut writer = GenericKeyedChunkWriter::<RawCoordinateKey>::create(
705                    &chunk_path,
706                    self.temp_compression,
707                    self.threads,
708                )?;
709                for r in buffer.refs() {
710                    let key = RawCoordinateKey { sort_key: r.sort_key };
711                    let record_bytes = buffer.get_record(r);
712                    writer.write_record(&key, record_bytes)?;
713                }
714                writer.finish()?;
715
716                stats.chunks_written += 1;
717                chunk_files.push(chunk_path);
718
719                // Consolidate if we have too many temp files
720                self.maybe_consolidate_temp_files::<RawCoordinateKey>(
721                    &mut chunk_files,
722                    temp_path,
723                    &mut consolidation_count,
724                )?;
725
726                buffer.clear();
727            }
728        }
729
730        info!("Read {} records total", stats.total_records);
731
732        if chunk_files.is_empty() {
733            // All records fit in memory - no merge needed
734            info!("All records fit in memory, performing in-memory sort");
735
736            if self.threads > 1 {
737                buffer.par_sort();
738            } else {
739                buffer.sort();
740            }
741
742            let output_header = self.create_output_header(header);
743            let mut writer = crate::bam_io::create_raw_bam_writer(
744                output,
745                &output_header,
746                self.threads,
747                self.output_compression,
748            )?;
749
750            for record_bytes in buffer.iter_sorted() {
751                writer.write_raw_record(record_bytes)?;
752            }
753            writer.finish()?;
754        } else {
755            // Sort remaining records and prepare for merge
756            if self.threads > 1 {
757                buffer.par_sort();
758            } else {
759                buffer.sort();
760            }
761
762            // Extract keyed pairs from the in-memory buffer
763            let memory_keyed: Vec<(RawCoordinateKey, Vec<u8>)> = if buffer.is_empty() {
764                Vec::new()
765            } else {
766                buffer
767                    .refs()
768                    .iter()
769                    .map(|r| {
770                        let key = RawCoordinateKey { sort_key: r.sort_key };
771                        let record_bytes = buffer.get_record(r).to_vec();
772                        (key, record_bytes)
773                    })
774                    .collect()
775            };
776
777            info!(
778                "Phase 2: Merging {} chunks (keyed O(1) comparisons)...",
779                chunk_files.len() + usize::from(!memory_keyed.is_empty())
780            );
781
782            // Merge disk chunks + in-memory chunks using O(1) key comparisons
783            self.merge_chunks_generic::<RawCoordinateKey>(
784                &chunk_files,
785                memory_keyed,
786                header,
787                output,
788            )?;
789        }
790
791        stats.output_records = stats.total_records;
792        info!("Sort complete: {} records processed", stats.total_records);
793
794        Ok(stats)
795    }
796
797    /// Coordinate sort with BAM index generation.
798    ///
799    /// Similar to `sort_coordinate_optimized` but uses `IndexingBamWriter` to
800    /// build the BAI index incrementally during write. Uses single-threaded
801    /// compression for accurate virtual position tracking.
802    #[allow(clippy::cast_possible_truncation)]
803    fn sort_coordinate_with_index(
804        &self,
805        reader: crate::bam_io::RawBamReaderAuto,
806        header: &Header,
807        output: &Path,
808        temp_path: &Path,
809    ) -> Result<RawSortStats> {
810        use crate::bam_io::{create_indexing_bam_writer, write_bai_index};
811        use crate::sort::keys::RawCoordinateKey;
812
813        info!("Indexing enabled: will write BAM index alongside output");
814
815        let mut stats = RawSortStats::default();
816        let nref = header.reference_sequences().len() as u32;
817        let estimated_records = self.memory_limit / 200;
818
819        let mut chunk_files: Vec<PathBuf> = Vec::new();
820        let mut buffer = RecordBuffer::with_capacity(estimated_records, self.memory_limit, nref);
821        let mut consolidation_count = 0usize;
822        let read_ahead = RawReadAheadReader::new(reader);
823
824        info!("Phase 1: Reading and sorting chunks (inline buffer, keyed output)...");
825
826        for record in read_ahead {
827            stats.total_records += 1;
828            buffer.push_coordinate(record.as_ref());
829
830            if buffer.memory_usage() >= self.memory_limit {
831                let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
832
833                if self.threads > 1 {
834                    buffer.par_sort();
835                } else {
836                    buffer.sort();
837                }
838
839                let mut writer = GenericKeyedChunkWriter::<RawCoordinateKey>::create(
840                    &chunk_path,
841                    self.temp_compression,
842                    self.threads,
843                )?;
844                for r in buffer.refs() {
845                    let key = RawCoordinateKey { sort_key: r.sort_key };
846                    let record_bytes = buffer.get_record(r);
847                    writer.write_record(&key, record_bytes)?;
848                }
849                writer.finish()?;
850
851                stats.chunks_written += 1;
852                chunk_files.push(chunk_path);
853
854                // Consolidate if we have too many temp files
855                self.maybe_consolidate_temp_files::<RawCoordinateKey>(
856                    &mut chunk_files,
857                    temp_path,
858                    &mut consolidation_count,
859                )?;
860
861                buffer.clear();
862            }
863        }
864
865        info!("Read {} records total", stats.total_records);
866
867        let output_header = self.create_output_header(header);
868
869        if chunk_files.is_empty() {
870            // All records fit in memory - no merge needed
871            info!("All records fit in memory, performing in-memory sort");
872
873            if self.threads > 1 {
874                buffer.par_sort();
875            } else {
876                buffer.sort();
877            }
878
879            // Use IndexingBamWriter for single-pass index generation
880            let mut writer = create_indexing_bam_writer(
881                output,
882                &output_header,
883                self.output_compression,
884                self.threads,
885            )?;
886
887            for record_bytes in buffer.iter_sorted() {
888                writer.write_raw_record(record_bytes)?;
889            }
890
891            let index = writer.finish()?;
892
893            // Write the index file
894            let index_path = output.with_extension("bam.bai");
895            write_bai_index(&index_path, &index)?;
896            info!("Wrote BAM index: {}", index_path.display());
897        } else {
898            // Sort remaining records and prepare for merge
899            if self.threads > 1 {
900                buffer.par_sort();
901            } else {
902                buffer.sort();
903            }
904
905            let memory_keyed: Vec<(RawCoordinateKey, Vec<u8>)> = if buffer.is_empty() {
906                Vec::new()
907            } else {
908                buffer
909                    .refs()
910                    .iter()
911                    .map(|r| {
912                        let key = RawCoordinateKey { sort_key: r.sort_key };
913                        let record_bytes = buffer.get_record(r).to_vec();
914                        (key, record_bytes)
915                    })
916                    .collect()
917            };
918
919            info!(
920                "Phase 2: Merging {} chunks with index generation...",
921                chunk_files.len() + usize::from(!memory_keyed.is_empty())
922            );
923
924            // Merge with index generation
925            let index = self.merge_chunks_with_index::<RawCoordinateKey>(
926                &chunk_files,
927                memory_keyed,
928                header,
929                output,
930            )?;
931
932            // Write the index file
933            let index_path = output.with_extension("bam.bai");
934            write_bai_index(&index_path, &index)?;
935            info!("Wrote BAM index: {}", index_path.display());
936        }
937
938        stats.output_records = stats.total_records;
939        info!("Sort complete: {} records processed", stats.total_records);
940
941        Ok(stats)
942    }
943
944    /// Sort by queryname order using keyed temp files for O(1) merge comparisons.
945    ///
946    /// Uses `RawQuerynameKey` which stores the full read name for natural ordering,
947    /// enabling efficient merge without re-parsing BAM records.
948    fn sort_queryname(
949        &self,
950        reader: crate::bam_io::RawBamReaderAuto,
951        header: &Header,
952        output: &Path,
953        temp_path: &Path,
954    ) -> Result<RawSortStats> {
955        use crate::sort::keys::{RawQuerynameKey, SortContext};
956
957        let mut stats = RawSortStats::default();
958        let ctx = SortContext::from_header(header);
959
960        // Estimate capacity: ~250 bytes per record + ~50 bytes for name + flags
961        let estimated_records = self.memory_limit / 300;
962
963        let mut chunk_files: Vec<PathBuf> = Vec::new();
964        let mut entries: Vec<(RawQuerynameKey, Vec<u8>)> = Vec::with_capacity(estimated_records);
965        let mut memory_used = 0usize;
966        let mut consolidation_count = 0usize;
967
968        let read_ahead = RawReadAheadReader::new(reader);
969
970        info!("Phase 1: Reading and sorting chunks (keyed output)...");
971
972        for record in read_ahead {
973            stats.total_records += 1;
974
975            // Extract key from raw bytes
976            let bam_bytes = record.as_ref();
977            let key = RawQuerynameKey::extract(bam_bytes, &ctx);
978
979            // Estimate memory: key (name + 2 bytes flags) + record bytes
980            let record_size = bam_bytes.len() + key.name.len() + 2;
981            memory_used += record_size;
982
983            entries.push((key, bam_bytes.to_vec()));
984
985            // Check if we need to spill to disk
986            if memory_used >= self.memory_limit {
987                let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
988
989                // Sort the entries
990                if self.threads > 1 {
991                    use rayon::prelude::*;
992                    entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
993                } else {
994                    entries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
995                }
996
997                // Write keyed temp file
998                let mut writer = GenericKeyedChunkWriter::<RawQuerynameKey>::create(
999                    &chunk_path,
1000                    self.temp_compression,
1001                    self.threads,
1002                )?;
1003                for (key, record) in entries.drain(..) {
1004                    writer.write_record(&key, &record)?;
1005                }
1006                writer.finish()?;
1007
1008                stats.chunks_written += 1;
1009                chunk_files.push(chunk_path);
1010
1011                // Consolidate if we have too many temp files
1012                self.maybe_consolidate_temp_files::<RawQuerynameKey>(
1013                    &mut chunk_files,
1014                    temp_path,
1015                    &mut consolidation_count,
1016                )?;
1017
1018                memory_used = 0;
1019            }
1020        }
1021
1022        info!("Read {} records total", stats.total_records);
1023
1024        if chunk_files.is_empty() {
1025            // All records fit in memory
1026            info!("All records fit in memory, performing in-memory sort");
1027
1028            if self.threads > 1 {
1029                use rayon::prelude::*;
1030                entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1031            } else {
1032                entries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
1033            }
1034
1035            let output_header = self.create_output_header(header);
1036            let mut writer = crate::bam_io::create_raw_bam_writer(
1037                output,
1038                &output_header,
1039                self.threads,
1040                self.output_compression,
1041            )?;
1042
1043            for (_key, record) in entries {
1044                writer.write_raw_record(&record)?;
1045            }
1046            writer.finish()?;
1047        } else {
1048            // Sort remaining records
1049            if self.threads > 1 {
1050                use rayon::prelude::*;
1051                entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1052            } else {
1053                entries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
1054            }
1055
1056            info!(
1057                "Phase 2: Merging {} chunks (keyed comparisons)...",
1058                chunk_files.len() + usize::from(!entries.is_empty())
1059            );
1060
1061            // Merge disk chunks + in-memory records using keyed comparisons
1062            self.merge_chunks_generic::<RawQuerynameKey>(&chunk_files, entries, header, output)?;
1063        }
1064
1065        stats.output_records = stats.total_records;
1066        info!("Sort complete: {} records processed", stats.total_records);
1067
1068        Ok(stats)
1069    }
1070
1071    /// Sort by template-coordinate order using inline buffer for reduced memory.
1072    ///
1073    /// Uses `TemplateRecordBuffer` which stores records in a single contiguous allocation
1074    /// with packed sort keys, eliminating per-record heap allocations for names.
1075    ///
1076    /// Writes keyed temp chunks that preserve pre-computed sort keys, enabling O(1)
1077    /// comparisons during merge (instead of expensive CIGAR/aux parsing).
1078    fn sort_template_coordinate(
1079        &self,
1080        reader: crate::bam_io::RawBamReaderAuto,
1081        header: &Header,
1082        output: &Path,
1083        temp_path: &Path,
1084    ) -> Result<RawSortStats> {
1085        let mut stats = RawSortStats::default();
1086
1087        // Build library lookup ONCE before sorting for O(1) ordinal lookups
1088        let lib_lookup = LibraryLookup::from_header(header);
1089
1090        // Estimate capacity accounting for inline headers and cached-key refs
1091        // Memory layout per record:
1092        //   - data: 40 bytes (inline header) + ~250 bytes (BAM record) = 290 bytes
1093        //   - refs: 48 bytes (TemplateKey + u64 offset + u32 len + u32 pad)
1094        //   - Total: ~338 bytes per record
1095        // The larger refs trade memory for cache locality during sort
1096        let bytes_per_record = 338;
1097        let estimated_records = self.memory_limit / bytes_per_record;
1098        // Allocate ~86% for data, ~14% for refs (48/338 ≈ 14%)
1099        let estimated_data_bytes = self.memory_limit * 86 / 100;
1100
1101        let mut chunk_files: Vec<PathBuf> = Vec::new();
1102        let mut buffer =
1103            TemplateRecordBuffer::with_capacity(estimated_records, estimated_data_bytes);
1104        let mut consolidation_count = 0usize;
1105
1106        let read_ahead = RawReadAheadReader::new(reader);
1107
1108        info!("Phase 1: Reading and sorting chunks (inline buffer)...");
1109
1110        for record in read_ahead {
1111            stats.total_records += 1;
1112
1113            // Extract template key and push to buffer
1114            let bam_bytes = record.as_ref();
1115            let key = extract_template_key_inline(bam_bytes, &lib_lookup);
1116            buffer.push(bam_bytes, key);
1117
1118            // Check memory usage
1119            if buffer.memory_usage() >= self.memory_limit {
1120                // Use .keyed extension to distinguish from BAM chunks
1121                let chunk_path = temp_path.join(format!("chunk_{:04}.keyed", chunk_files.len()));
1122
1123                // Sort in place using parallel sort for large buffers
1124                if self.threads > 1 {
1125                    buffer.par_sort();
1126                } else {
1127                    buffer.sort();
1128                }
1129
1130                // Write keyed chunk preserving sort keys for O(1) merge comparisons
1131                let mut keyed_writer = GenericKeyedChunkWriter::<TemplateKey>::create(
1132                    &chunk_path,
1133                    self.temp_compression,
1134                    self.threads,
1135                )?;
1136                for (key, record) in buffer.iter_sorted_keyed() {
1137                    keyed_writer.write_record(&key, record)?;
1138                }
1139                keyed_writer.finish()?;
1140
1141                stats.chunks_written += 1;
1142                chunk_files.push(chunk_path);
1143
1144                // Consolidate if we have too many temp files
1145                self.maybe_consolidate_temp_files::<TemplateKey>(
1146                    &mut chunk_files,
1147                    temp_path,
1148                    &mut consolidation_count,
1149                )?;
1150
1151                buffer.clear();
1152            }
1153        }
1154
1155        info!("Read {} records total", stats.total_records);
1156
1157        if chunk_files.is_empty() {
1158            // All records fit in memory
1159            info!("All records fit in memory, performing in-memory sort");
1160
1161            if self.threads > 1 {
1162                buffer.par_sort();
1163            } else {
1164                buffer.sort();
1165            }
1166
1167            let output_header = self.create_output_header(header);
1168            let mut writer = crate::bam_io::create_raw_bam_writer(
1169                output,
1170                &output_header,
1171                self.threads,
1172                self.output_compression,
1173            )?;
1174
1175            for record_bytes in buffer.iter_sorted() {
1176                writer.write_raw_record(record_bytes)?;
1177            }
1178            writer.finish()?;
1179        } else {
1180            // Sort remaining records and prepare for merge
1181            if self.threads > 1 {
1182                buffer.par_sort();
1183            } else {
1184                buffer.sort();
1185            }
1186
1187            // Collect keyed in-memory records for merge
1188            let memory_keyed: Vec<(TemplateKey, Vec<u8>)> = if buffer.is_empty() {
1189                Vec::new()
1190            } else {
1191                buffer.iter_sorted_keyed().map(|(k, r)| (k, r.to_vec())).collect()
1192            };
1193
1194            info!(
1195                "Phase 2: Merging {} chunks...",
1196                chunk_files.len() + usize::from(!memory_keyed.is_empty())
1197            );
1198
1199            // Merge using O(1) key comparisons
1200            self.merge_chunks_keyed(&chunk_files, memory_keyed, header, output)?;
1201        }
1202
1203        stats.output_records = stats.total_records;
1204        info!("Sort complete: {} records processed", stats.total_records);
1205
1206        Ok(stats)
1207    }
1208
1209    /// Merge keyed chunks using O(1) key comparisons.
1210    ///
1211    /// This is the fast path for template-coordinate sorting. Instead of parsing
1212    /// CIGAR/aux data on every comparison (`O(CIGAR_len` + `aux_scan`)), we compare
1213    /// pre-computed 32-byte keys (O(1) - just 4 u64 comparisons).
1214    fn merge_chunks_keyed(
1215        &self,
1216        chunk_files: &[PathBuf],
1217        memory_keyed: Vec<(TemplateKey, Vec<u8>)>,
1218        header: &Header,
1219        output: &Path,
1220    ) -> Result<u64> {
1221        /// Source for keyed chunks during merge.
1222        enum KeyedChunkSource {
1223            /// Disk-based chunk with prefetching reader.
1224            Disk(GenericKeyedChunkReader<TemplateKey>),
1225            /// In-memory sorted records.
1226            Memory { records: Vec<(TemplateKey, Vec<u8>)>, idx: usize },
1227        }
1228
1229        impl KeyedChunkSource {
1230            fn next_record(&mut self) -> Option<(TemplateKey, Vec<u8>)> {
1231                match self {
1232                    KeyedChunkSource::Disk(reader) => reader.next_record(),
1233                    KeyedChunkSource::Memory { records, idx } => {
1234                        if *idx < records.len() {
1235                            // Use replace with zeroed key and empty vec to avoid Default requirement
1236                            let dummy = (TemplateKey::zeroed(), Vec::new());
1237                            let entry = std::mem::replace(&mut records[*idx], dummy);
1238                            *idx += 1;
1239                            Some(entry)
1240                        } else {
1241                            None
1242                        }
1243                    }
1244                }
1245            }
1246        }
1247
1248        /// Heap entry for keyed merge - stores pre-computed key for O(1) comparison.
1249        struct KeyedHeapEntry {
1250            key: TemplateKey,
1251            record: Vec<u8>,
1252            chunk_idx: usize,
1253        }
1254
1255        // Output buffer to reduce write syscalls - stores raw bytes directly
1256        const OUTPUT_BUFFER_SIZE: usize = 2048;
1257
1258        let num_disk = chunk_files.len();
1259        let has_memory = !memory_keyed.is_empty();
1260        info!("Merging from {} files and {} in-memory blocks...", num_disk, i32::from(has_memory));
1261
1262        // Create output header with sort order tags
1263        let output_header = self.create_output_header(header);
1264
1265        // Create unified chunk sources
1266        let mut sources: Vec<KeyedChunkSource> = Vec::with_capacity(num_disk + 1);
1267
1268        // Add disk-based chunks with keyed readers
1269        for path in chunk_files {
1270            sources
1271                .push(KeyedChunkSource::Disk(GenericKeyedChunkReader::<TemplateKey>::open(path)?));
1272        }
1273
1274        // Add in-memory keyed chunk
1275        if has_memory {
1276            sources.push(KeyedChunkSource::Memory { records: memory_keyed, idx: 0 });
1277        }
1278
1279        // Initialize heap with first record from each chunk
1280        let mut heap: Vec<KeyedHeapEntry> = Vec::with_capacity(sources.len());
1281        for (chunk_idx, source) in sources.iter_mut().enumerate() {
1282            if let Some((key, record)) = source.next_record() {
1283                heap.push(KeyedHeapEntry { key, record, chunk_idx });
1284            }
1285        }
1286
1287        let mut heap_size = heap.len();
1288        if heap_size == 0 {
1289            info!("Merge complete: 0 records merged");
1290            return Ok(0);
1291        }
1292
1293        // O(1) comparison using pre-computed keys - THIS IS THE KEY OPTIMIZATION
1294        // No CIGAR parsing, no aux scanning, just 4 u64 comparisons
1295        // Use chunk_idx as tie-breaker for stable merge (preserves input order for equal keys)
1296        let lt = |a: &KeyedHeapEntry, b: &KeyedHeapEntry| -> bool {
1297            a.key.cmp(&b.key).then_with(|| a.chunk_idx.cmp(&b.chunk_idx)) == Ordering::Greater
1298        };
1299
1300        // Build initial heap
1301        heap_make(&mut heap, &lt);
1302
1303        // Create raw output writer (bypasses noodles Record encoding for speed)
1304        let mut writer = crate::bam_io::create_raw_bam_writer(
1305            output,
1306            &output_header,
1307            self.threads,
1308            self.output_compression,
1309        )?;
1310
1311        let mut records_merged = 0u64;
1312        let mut output_buffer: Vec<Vec<u8>> = Vec::with_capacity(OUTPUT_BUFFER_SIZE);
1313
1314        // Merge loop using fixed-array heap with O(1) comparisons
1315        while heap_size > 0 {
1316            // Get the minimum record (at heap[0])
1317            let chunk_idx = heap[0].chunk_idx;
1318            let record_bytes = std::mem::take(&mut heap[0].record);
1319
1320            // Buffer the raw bytes directly (no Record conversion)
1321            output_buffer.push(record_bytes);
1322            records_merged += 1;
1323
1324            // Flush buffer when full
1325            if output_buffer.len() >= OUTPUT_BUFFER_SIZE {
1326                for rec in output_buffer.drain(..) {
1327                    writer.write_raw_record(&rec)?;
1328                }
1329            }
1330
1331            // Get next record from the same chunk
1332            if let Some((key, next_record)) = sources[chunk_idx].next_record() {
1333                // Replace top with new record and sift down
1334                heap[0].key = key;
1335                heap[0].record = next_record;
1336                heap_sift_down(&mut heap, 0, heap_size, &lt);
1337            } else {
1338                // Chunk exhausted - remove from heap
1339                heap_size -= 1;
1340                if heap_size > 0 {
1341                    // Move last element to top and sift down
1342                    heap.swap(0, heap_size);
1343                    heap_sift_down(&mut heap, 0, heap_size, &lt);
1344                }
1345            }
1346        }
1347
1348        // Flush remaining buffered records
1349        for rec in output_buffer {
1350            writer.write_raw_record(&rec)?;
1351        }
1352
1353        writer.finish()?;
1354
1355        info!("Merge complete: {records_merged} records merged");
1356        Ok(records_merged)
1357    }
1358
1359    /// Generic merge for keyed chunks using `O(1)` key comparisons.
1360    ///
1361    /// This is the unified merge function that works with any `RawSortKey` type.
1362    /// It provides `O(1)` comparisons during merge for fixed-size keys (coordinate, template)
1363    /// and `O(name_len)` for variable-size keys (queryname).
1364    fn merge_chunks_generic<K: RawSortKey + Default + 'static>(
1365        &self,
1366        chunk_files: &[PathBuf],
1367        memory_keyed: Vec<(K, Vec<u8>)>,
1368        header: &Header,
1369        output: &Path,
1370    ) -> Result<u64> {
1371        /// Source for keyed chunks during merge.
1372        enum GenericKeyedChunkSource<K: RawSortKey + Default + 'static> {
1373            /// Disk-based chunk with prefetching reader.
1374            Disk(GenericKeyedChunkReader<K>),
1375            /// In-memory sorted records.
1376            Memory { records: Vec<(K, Vec<u8>)>, idx: usize },
1377        }
1378
1379        impl<K: RawSortKey + Default + 'static> GenericKeyedChunkSource<K> {
1380            fn next_record(&mut self) -> Option<(K, Vec<u8>)> {
1381                match self {
1382                    GenericKeyedChunkSource::Disk(reader) => reader.next_record(),
1383                    GenericKeyedChunkSource::Memory { records, idx } => {
1384                        if *idx < records.len() {
1385                            // Take ownership of the record
1386                            let entry = std::mem::take(&mut records[*idx]);
1387                            *idx += 1;
1388                            Some(entry)
1389                        } else {
1390                            None
1391                        }
1392                    }
1393                }
1394            }
1395        }
1396
1397        /// Heap entry for keyed merge - stores pre-computed key for O(1) comparison.
1398        struct GenericKeyedHeapEntry<K> {
1399            key: K,
1400            record: Vec<u8>,
1401            chunk_idx: usize,
1402        }
1403
1404        // Output buffer to reduce write syscalls
1405        const OUTPUT_BUFFER_SIZE: usize = 2048;
1406
1407        let num_disk = chunk_files.len();
1408        let has_memory = !memory_keyed.is_empty();
1409        info!("Merging from {} files and {} in-memory blocks...", num_disk, i32::from(has_memory));
1410
1411        // Create output header with sort order tags
1412        let output_header = self.create_output_header(header);
1413
1414        // Create unified chunk sources
1415        let mut sources: Vec<GenericKeyedChunkSource<K>> = Vec::with_capacity(num_disk + 1);
1416
1417        // Add disk-based chunks with keyed readers
1418        for path in chunk_files {
1419            sources.push(GenericKeyedChunkSource::Disk(GenericKeyedChunkReader::<K>::open(path)?));
1420        }
1421
1422        // Add in-memory keyed chunk
1423        if has_memory {
1424            sources.push(GenericKeyedChunkSource::Memory { records: memory_keyed, idx: 0 });
1425        }
1426
1427        // Initialize heap with first record from each chunk
1428        let mut heap: Vec<GenericKeyedHeapEntry<K>> = Vec::with_capacity(sources.len());
1429        for (chunk_idx, source) in sources.iter_mut().enumerate() {
1430            if let Some((key, record)) = source.next_record() {
1431                heap.push(GenericKeyedHeapEntry { key, record, chunk_idx });
1432            }
1433        }
1434
1435        let mut heap_size = heap.len();
1436        if heap_size == 0 {
1437            info!("Merge complete: 0 records merged");
1438            return Ok(0);
1439        }
1440
1441        // O(1) comparison using pre-computed keys
1442        // Use chunk_idx as tie-breaker for stable merge (preserves input order for equal keys)
1443        let lt = |a: &GenericKeyedHeapEntry<K>, b: &GenericKeyedHeapEntry<K>| -> bool {
1444            a.key.cmp(&b.key).then_with(|| a.chunk_idx.cmp(&b.chunk_idx)) == Ordering::Greater
1445        };
1446
1447        // Build initial heap
1448        heap_make(&mut heap, &lt);
1449
1450        // Create raw output writer (bypasses noodles Record encoding for speed)
1451        let mut writer = crate::bam_io::create_raw_bam_writer(
1452            output,
1453            &output_header,
1454            self.threads,
1455            self.output_compression,
1456        )?;
1457
1458        let mut records_merged = 0u64;
1459        let mut output_buffer: Vec<Vec<u8>> = Vec::with_capacity(OUTPUT_BUFFER_SIZE);
1460
1461        // Merge loop using fixed-array heap with key comparisons
1462        while heap_size > 0 {
1463            // Get the minimum record (at heap[0])
1464            let chunk_idx = heap[0].chunk_idx;
1465            let record_bytes = std::mem::take(&mut heap[0].record);
1466
1467            // Buffer the raw bytes directly (no Record conversion)
1468            output_buffer.push(record_bytes);
1469            records_merged += 1;
1470
1471            // Flush buffer when full
1472            if output_buffer.len() >= OUTPUT_BUFFER_SIZE {
1473                for rec in output_buffer.drain(..) {
1474                    writer.write_raw_record(&rec)?;
1475                }
1476            }
1477
1478            // Get next record from the same chunk
1479            if let Some((key, next_record)) = sources[chunk_idx].next_record() {
1480                // Replace top with new record and sift down
1481                heap[0].key = key;
1482                heap[0].record = next_record;
1483                heap_sift_down(&mut heap, 0, heap_size, &lt);
1484            } else {
1485                // Chunk exhausted - remove from heap
1486                heap_size -= 1;
1487                if heap_size > 0 {
1488                    // Move last element to top and sift down
1489                    heap.swap(0, heap_size);
1490                    heap_sift_down(&mut heap, 0, heap_size, &lt);
1491                }
1492            }
1493        }
1494
1495        // Flush remaining buffered records
1496        for rec in output_buffer {
1497            writer.write_raw_record(&rec)?;
1498        }
1499
1500        writer.finish()?;
1501
1502        info!("Merge complete: {records_merged} records merged");
1503        Ok(records_merged)
1504    }
1505
1506    /// Merge keyed chunks with BAM index generation.
1507    ///
1508    /// Similar to `merge_chunks_generic` but uses `IndexingBamWriter` to build
1509    /// the BAI index incrementally during the merge. Returns the generated index.
1510    fn merge_chunks_with_index<K: RawSortKey + Default + 'static>(
1511        &self,
1512        chunk_files: &[PathBuf],
1513        memory_keyed: Vec<(K, Vec<u8>)>,
1514        header: &Header,
1515        output: &Path,
1516    ) -> Result<noodles::bam::bai::Index> {
1517        use crate::bam_io::create_indexing_bam_writer;
1518
1519        // Source for keyed chunks during merge
1520        enum KeyedSource<K: RawSortKey + Default + 'static> {
1521            Disk(GenericKeyedChunkReader<K>),
1522            Memory { records: Vec<(K, Vec<u8>)>, idx: usize },
1523        }
1524
1525        impl<K: RawSortKey + Default + 'static> KeyedSource<K> {
1526            fn next_record(&mut self) -> Option<(K, Vec<u8>)> {
1527                match self {
1528                    KeyedSource::Disk(reader) => reader.next_record(),
1529                    KeyedSource::Memory { records, idx } => {
1530                        if *idx < records.len() {
1531                            let entry = std::mem::take(&mut records[*idx]);
1532                            *idx += 1;
1533                            Some(entry)
1534                        } else {
1535                            None
1536                        }
1537                    }
1538                }
1539            }
1540        }
1541
1542        // Heap entry for merge
1543        struct HeapEntry<K> {
1544            key: K,
1545            record: Vec<u8>,
1546            chunk_idx: usize,
1547        }
1548
1549        let num_disk = chunk_files.len();
1550        let has_memory = !memory_keyed.is_empty();
1551        info!("Merging from {} files and {} in-memory blocks...", num_disk, i32::from(has_memory));
1552
1553        let output_header = self.create_output_header(header);
1554
1555        // Create chunk sources
1556        let mut sources: Vec<KeyedSource<K>> = Vec::with_capacity(num_disk + 1);
1557        for path in chunk_files {
1558            sources.push(KeyedSource::Disk(GenericKeyedChunkReader::<K>::open(path)?));
1559        }
1560        if has_memory {
1561            sources.push(KeyedSource::Memory { records: memory_keyed, idx: 0 });
1562        }
1563
1564        // Initialize heap
1565        let mut heap: Vec<HeapEntry<K>> = Vec::with_capacity(sources.len());
1566        for (chunk_idx, source) in sources.iter_mut().enumerate() {
1567            if let Some((key, record)) = source.next_record() {
1568                heap.push(HeapEntry { key, record, chunk_idx });
1569            }
1570        }
1571
1572        let mut heap_size = heap.len();
1573        if heap_size == 0 {
1574            // Empty input - create empty indexed BAM
1575            let writer = create_indexing_bam_writer(
1576                output,
1577                &output_header,
1578                self.output_compression,
1579                self.threads,
1580            )?;
1581            let index = writer.finish()?;
1582            info!("Merge complete: 0 records merged");
1583            return Ok(index);
1584        }
1585
1586        // Use chunk_idx as tie-breaker for stable merge (preserves input order for equal keys)
1587        let lt = |a: &HeapEntry<K>, b: &HeapEntry<K>| -> bool {
1588            a.key.cmp(&b.key).then_with(|| a.chunk_idx.cmp(&b.chunk_idx)) == Ordering::Greater
1589        };
1590        heap_make(&mut heap, &lt);
1591
1592        // Use IndexingBamWriter (single-threaded for accurate virtual positions)
1593        let mut writer = create_indexing_bam_writer(
1594            output,
1595            &output_header,
1596            self.output_compression,
1597            self.threads,
1598        )?;
1599        let mut records_merged = 0u64;
1600
1601        // No buffering needed - IndexingBamWriter needs accurate virtual positions
1602        // for each record, so we write directly
1603        while heap_size > 0 {
1604            let chunk_idx = heap[0].chunk_idx;
1605            let record_bytes = std::mem::take(&mut heap[0].record);
1606
1607            writer.write_raw_record(&record_bytes)?;
1608            records_merged += 1;
1609
1610            if let Some((key, next_record)) = sources[chunk_idx].next_record() {
1611                heap[0].key = key;
1612                heap[0].record = next_record;
1613                heap_sift_down(&mut heap, 0, heap_size, &lt);
1614            } else {
1615                heap_size -= 1;
1616                if heap_size > 0 {
1617                    heap.swap(0, heap_size);
1618                    heap_sift_down(&mut heap, 0, heap_size, &lt);
1619                }
1620            }
1621        }
1622
1623        let index = writer.finish()?;
1624        info!("Merge complete: {records_merged} records merged");
1625        Ok(index)
1626    }
1627
1628    /// Create output header with appropriate sort order tags.
1629    fn create_output_header(&self, header: &Header) -> Header {
1630        let mut builder = Header::builder();
1631
1632        // Copy reference sequences
1633        for (name, seq) in header.reference_sequences() {
1634            builder = builder.add_reference_sequence(name.as_slice(), seq.clone());
1635        }
1636
1637        // Copy read groups
1638        for (id, rg) in header.read_groups() {
1639            builder = builder.add_read_group(id.as_slice(), rg.clone());
1640        }
1641
1642        // Copy programs
1643        for (id, pg) in header.programs().as_ref() {
1644            builder = builder.add_program(id.as_slice(), pg.clone());
1645        }
1646
1647        // Copy comments
1648        for comment in header.comments() {
1649            builder = builder.add_comment(comment.clone());
1650        }
1651
1652        // Set header record with sort order using insert API
1653        let hd = match self.sort_order {
1654            SortOrder::Coordinate => {
1655                Map::<noodles::sam::header::record::value::map::Header>::builder()
1656                    .insert(header_tag::SORT_ORDER, BString::from("coordinate"))
1657                    .build()
1658                    .expect("valid header")
1659            }
1660            SortOrder::Queryname => {
1661                Map::<noodles::sam::header::record::value::map::Header>::builder()
1662                    .insert(header_tag::SORT_ORDER, BString::from("queryname"))
1663                    .build()
1664                    .expect("valid header")
1665            }
1666            SortOrder::TemplateCoordinate => {
1667                Map::<noodles::sam::header::record::value::map::Header>::builder()
1668                    .insert(header_tag::SORT_ORDER, BString::from("unsorted"))
1669                    .insert(header_tag::GROUP_ORDER, BString::from("query"))
1670                    .insert(header_tag::SUBSORT_ORDER, BString::from("template-coordinate"))
1671                    .build()
1672                    .expect("valid header")
1673            }
1674        };
1675
1676        builder = builder.set_header(hd);
1677        builder.build()
1678    }
1679
1680    /// Create temporary directory for spill files.
1681    fn create_temp_dir(&self) -> Result<TempDir> {
1682        super::create_temp_dir(self.temp_dir.as_deref())
1683    }
1684}
1685
1686/// Extract a packed `TemplateKey` directly from BAM record bytes.
1687///
1688/// This function computes the template-coordinate sort key inline, avoiding
1689/// heap allocations for the read name by using a hash instead.
1690#[must_use]
1691pub fn extract_template_key_inline(bam_bytes: &[u8], lib_lookup: &LibraryLookup) -> TemplateKey {
1692    use crate::sort::bam_fields;
1693    use bam_fields::{
1694        find_mc_tag_in_record, find_mi_tag_in_record, flags, get_cigar_ops, mate_unclipped_5prime,
1695        unclipped_5prime,
1696    };
1697
1698    // Extract MI tag using fast raw byte scan (bypasses noodles overhead)
1699    let mi = find_mi_tag_in_record(bam_bytes);
1700
1701    // Get library ordinal
1702    let library = lib_lookup.get_ordinal(bam_bytes);
1703
1704    // Extract fields from raw bytes
1705    let tid = bam_fields::ref_id(bam_bytes);
1706    let pos = bam_fields::pos(bam_bytes);
1707    let l_read_name = bam_fields::l_read_name(bam_bytes) as usize;
1708    let flag = bam_fields::flags(bam_bytes);
1709    let mate_tid = bam_fields::mate_ref_id(bam_bytes);
1710    let mate_pos = bam_fields::mate_pos(bam_bytes);
1711
1712    // Extract flags
1713    let is_unmapped = (flag & flags::UNMAPPED) != 0;
1714    let mate_unmapped = (flag & flags::MATE_UNMAPPED) != 0;
1715    let is_reverse = (flag & flags::REVERSE) != 0;
1716    let mate_reverse = (flag & flags::MATE_REVERSE) != 0;
1717    let is_paired = (flag & flags::PAIRED) != 0;
1718
1719    // Hash read name (exclude null terminator)
1720    let name_len = l_read_name.saturating_sub(1);
1721    let name = if name_len > 0 && 32 + name_len <= bam_bytes.len() {
1722        &bam_bytes[32..32 + name_len]
1723    } else {
1724        &[]
1725    };
1726    let name_hash = lib_lookup.hash_name(name);
1727
1728    // Handle unmapped reads
1729    if is_unmapped {
1730        if is_paired && !mate_unmapped {
1731            // Unmapped read with mapped mate - use mate's position as primary key
1732            let mate_unclipped = find_mc_tag_in_record(bam_bytes)
1733                .map_or(mate_pos, |mc| mate_unclipped_5prime(mate_pos, mate_reverse, mc));
1734
1735            return TemplateKey::new(
1736                mate_tid,
1737                mate_unclipped,
1738                mate_reverse,
1739                i32::MAX,
1740                i32::MAX,
1741                false,
1742                library,
1743                mi,
1744                name_hash,
1745                true, // Unmapped read is always "upper" relative to mapped mate
1746            );
1747        }
1748
1749        // Completely unmapped - sort to end
1750        let is_read2 = (flag & 0x80) != 0; // is_last_segment flag
1751        return TemplateKey::unmapped(name_hash, is_read2);
1752    }
1753
1754    // Calculate unclipped 5' position for this read
1755    let cigar_ops = get_cigar_ops(bam_bytes);
1756    let this_pos = unclipped_5prime(pos, is_reverse, &cigar_ops);
1757
1758    // Calculate mate's unclipped 5' position
1759    let mate_unclipped = if is_paired && !mate_unmapped {
1760        find_mc_tag_in_record(bam_bytes)
1761            .map_or(mate_pos, |mc| mate_unclipped_5prime(mate_pos, mate_reverse, mc))
1762    } else {
1763        mate_pos
1764    };
1765
1766    // Determine canonical ordering
1767    let (tid1, tid2, pos1, pos2, neg1, neg2, is_upper) = if is_paired && !mate_unmapped {
1768        // Samtools logic: is_upper if pos > mate_pos, or (pos == mate_pos && this read is reverse)
1769        let is_upper = (tid, this_pos) > (mate_tid, mate_unclipped)
1770            || ((tid, this_pos) == (mate_tid, mate_unclipped) && is_reverse);
1771
1772        if is_upper {
1773            // Swap: mate's position comes first
1774            (mate_tid, tid, mate_unclipped, this_pos, mate_reverse, is_reverse, true)
1775        } else {
1776            // No swap: this read's position comes first
1777            (tid, mate_tid, this_pos, mate_unclipped, is_reverse, mate_reverse, false)
1778        }
1779    } else {
1780        // Unpaired or mate unmapped - use MAX for tid2/pos2
1781        (tid, i32::MAX, this_pos, i32::MAX, is_reverse, false, false)
1782    };
1783
1784    TemplateKey::new(tid1, pos1, neg1, tid2, pos2, neg2, library, mi, name_hash, is_upper)
1785}
1786
1787// Use shared SortStats from the parent module.
1788pub use super::SortStats as RawSortStats;
1789
1790#[cfg(test)]
1791mod tests {
1792    use super::*;
1793    use noodles::sam::header::record::value::map::ReadGroup;
1794
1795    // ========================================================================
1796    // LibraryLookup tests
1797    // ========================================================================
1798
1799    #[test]
1800    fn test_library_lookup_empty_header() {
1801        let header = Header::builder().build();
1802        let lookup = LibraryLookup::from_header(&header);
1803        assert!(lookup.rg_to_ordinal.is_empty());
1804    }
1805
1806    #[test]
1807    fn test_library_lookup_single_rg() {
1808        let rg = Map::<ReadGroup>::builder()
1809            .insert(rg_tag::LIBRARY, String::from("LibA"))
1810            .build()
1811            .expect("valid");
1812        let header = Header::builder().add_read_group(BString::from("rg1"), rg).build();
1813
1814        let lookup = LibraryLookup::from_header(&header);
1815        assert_eq!(lookup.rg_to_ordinal.len(), 1);
1816        // LibA is the only library, so it gets ordinal 1 (0 is reserved for empty/unknown)
1817        assert_eq!(*lookup.rg_to_ordinal.get(b"rg1".as_slice()).unwrap(), 1);
1818    }
1819
1820    #[test]
1821    fn test_library_lookup_multiple_libraries() {
1822        let rg_a = Map::<ReadGroup>::builder()
1823            .insert(rg_tag::LIBRARY, String::from("LibC"))
1824            .build()
1825            .expect("valid");
1826        let rg_b = Map::<ReadGroup>::builder()
1827            .insert(rg_tag::LIBRARY, String::from("LibA"))
1828            .build()
1829            .expect("valid");
1830        let rg_c = Map::<ReadGroup>::builder()
1831            .insert(rg_tag::LIBRARY, String::from("LibB"))
1832            .build()
1833            .expect("valid");
1834
1835        let header = Header::builder()
1836            .add_read_group(BString::from("rg1"), rg_a)
1837            .add_read_group(BString::from("rg2"), rg_b)
1838            .add_read_group(BString::from("rg3"), rg_c)
1839            .build();
1840
1841        let lookup = LibraryLookup::from_header(&header);
1842        assert_eq!(lookup.rg_to_ordinal.len(), 3);
1843
1844        // Libraries sorted alphabetically: LibA=1, LibB=2, LibC=3
1845        assert_eq!(*lookup.rg_to_ordinal.get(b"rg2".as_slice()).unwrap(), 1); // LibA
1846        assert_eq!(*lookup.rg_to_ordinal.get(b"rg3".as_slice()).unwrap(), 2); // LibB
1847        assert_eq!(*lookup.rg_to_ordinal.get(b"rg1".as_slice()).unwrap(), 3); // LibC
1848    }
1849
1850    #[test]
1851    fn test_library_lookup_unknown_rg_returns_zero() {
1852        let rg = Map::<ReadGroup>::builder()
1853            .insert(rg_tag::LIBRARY, String::from("LibA"))
1854            .build()
1855            .expect("valid");
1856        let header = Header::builder().add_read_group(BString::from("rg1"), rg).build();
1857
1858        let lookup = LibraryLookup::from_header(&header);
1859        // A BAM record with no RG tag or unknown RG should return ordinal 0
1860        // We can test get_ordinal with a minimal BAM record that has no aux data
1861        let mut bam = vec![0u8; 36];
1862        bam[8] = 4; // l_read_name = 4
1863        bam[32..36].copy_from_slice(b"rea\0");
1864        assert_eq!(lookup.get_ordinal(&bam), 0);
1865    }
1866
1867    // ========================================================================
1868    // RawExternalSorter builder tests
1869    // ========================================================================
1870
1871    #[test]
1872    fn test_raw_sorter_defaults() {
1873        let sorter = RawExternalSorter::new(SortOrder::Coordinate);
1874        assert_eq!(sorter.memory_limit, 512 * 1024 * 1024);
1875        assert!(sorter.temp_dir.is_none());
1876        assert_eq!(sorter.threads, 1);
1877        assert_eq!(sorter.output_compression, 6);
1878        assert_eq!(sorter.temp_compression, 1);
1879        assert!(!sorter.write_index);
1880        assert!(sorter.pg_info.is_none());
1881        assert_eq!(sorter.max_temp_files, DEFAULT_MAX_TEMP_FILES);
1882    }
1883
1884    #[test]
1885    fn test_raw_sorter_builder_chain() {
1886        let sorter = RawExternalSorter::new(SortOrder::Queryname)
1887            .memory_limit(1024)
1888            .temp_dir(PathBuf::from("/tmp/test"))
1889            .threads(8)
1890            .output_compression(9)
1891            .temp_compression(3)
1892            .write_index(true)
1893            .pg_info("1.0".to_string(), "fgumi sort".to_string())
1894            .max_temp_files(128);
1895
1896        assert_eq!(sorter.memory_limit, 1024);
1897        assert_eq!(sorter.temp_dir, Some(PathBuf::from("/tmp/test")));
1898        assert_eq!(sorter.threads, 8);
1899        assert_eq!(sorter.output_compression, 9);
1900        assert_eq!(sorter.temp_compression, 3);
1901        assert!(sorter.write_index);
1902        assert_eq!(sorter.pg_info, Some(("1.0".to_string(), "fgumi sort".to_string())));
1903        assert_eq!(sorter.max_temp_files, 128);
1904    }
1905
1906    #[test]
1907    fn test_raw_sorter_memory_limit() {
1908        let sorter = RawExternalSorter::new(SortOrder::Coordinate).memory_limit(256 * 1024 * 1024);
1909        assert_eq!(sorter.memory_limit, 256 * 1024 * 1024);
1910    }
1911
1912    #[test]
1913    fn test_raw_sorter_temp_compression() {
1914        let sorter = RawExternalSorter::new(SortOrder::Coordinate).temp_compression(0);
1915        assert_eq!(sorter.temp_compression, 0);
1916    }
1917
1918    #[test]
1919    fn test_raw_sorter_max_temp_files() {
1920        let sorter = RawExternalSorter::new(SortOrder::Coordinate).max_temp_files(0);
1921        assert_eq!(sorter.max_temp_files, 0);
1922    }
1923
1924    // ========================================================================
1925    // create_output_header tests
1926    // ========================================================================
1927
1928    #[test]
1929    fn test_create_output_header_coordinate() {
1930        let sorter = RawExternalSorter::new(SortOrder::Coordinate);
1931        let header = Header::builder().build();
1932        let output_header = sorter.create_output_header(&header);
1933
1934        let hd = output_header.header().expect("header should have HD record");
1935        let so = hd.other_fields().get(b"SO").expect("should have SO tag");
1936        assert_eq!(<_ as AsRef<[u8]>>::as_ref(so), b"coordinate");
1937    }
1938
1939    #[test]
1940    fn test_create_output_header_queryname() {
1941        let sorter = RawExternalSorter::new(SortOrder::Queryname);
1942        let header = Header::builder().build();
1943        let output_header = sorter.create_output_header(&header);
1944
1945        let hd = output_header.header().expect("header should have HD record");
1946        let so = hd.other_fields().get(b"SO").expect("should have SO tag");
1947        assert_eq!(<_ as AsRef<[u8]>>::as_ref(so), b"queryname");
1948    }
1949
1950    #[test]
1951    fn test_create_output_header_template_coordinate() {
1952        let sorter = RawExternalSorter::new(SortOrder::TemplateCoordinate);
1953        let header = Header::builder().build();
1954        let output_header = sorter.create_output_header(&header);
1955
1956        let hd = output_header.header().expect("header should have HD record");
1957        let fields = hd.other_fields();
1958
1959        let so = fields.get(b"SO").expect("should have SO tag");
1960        assert_eq!(<_ as AsRef<[u8]>>::as_ref(so), b"unsorted");
1961
1962        let go = fields.get(b"GO").expect("should have GO tag");
1963        assert_eq!(<_ as AsRef<[u8]>>::as_ref(go), b"query");
1964
1965        let ss = fields.get(b"SS").expect("should have SS tag");
1966        assert_eq!(<_ as AsRef<[u8]>>::as_ref(ss), b"template-coordinate");
1967    }
1968
1969    // ========================================================================
1970    // find_string_tag_in_record tests (via fgumi_raw_bam)
1971    // ========================================================================
1972
1973    /// Helper to build minimal BAM bytes with aux data appended.
1974    /// Fixed 32-byte header + read name "rea\0" (`l_read_name=4`) + no cigar + no seq + aux.
1975    fn build_bam_with_aux(aux_data: &[u8]) -> Vec<u8> {
1976        let l_read_name: u8 = 4; // "rea" + null
1977        let mut bam = vec![0u8; 32];
1978        bam[8] = l_read_name;
1979        // n_cigar_op = 0 (bytes 12-13 already zero)
1980        // l_seq = 0 (bytes 16-19 already zero)
1981        // read name
1982        bam.extend_from_slice(b"rea\0");
1983        // no cigar, no seq, no qual
1984        // append aux data
1985        bam.extend_from_slice(aux_data);
1986        bam
1987    }
1988
1989    #[rstest::rstest]
1990    #[case::present(b"RGZgroup1\0".as_slice(), Some(b"group1".as_slice()))]
1991    #[case::absent(b"".as_slice(), None)]
1992    fn test_find_rg_tag(#[case] aux_data: &[u8], #[case] expected: Option<&[u8]>) {
1993        let bam = build_bam_with_aux(aux_data);
1994        assert_eq!(fgumi_raw_bam::tags::find_string_tag_in_record(&bam, b"RG"), expected);
1995    }
1996
1997    #[test]
1998    fn test_find_rg_tag_after_other_tags() {
1999        // XY:i:42 followed by RG:Z:mygroup — aux built dynamically for the integer tag
2000        let mut aux = Vec::new();
2001        aux.extend_from_slice(b"XYi");
2002        aux.extend_from_slice(&42i32.to_le_bytes());
2003        aux.extend_from_slice(b"RGZmygroup\0");
2004        let bam = build_bam_with_aux(&aux);
2005        assert_eq!(
2006            fgumi_raw_bam::tags::find_string_tag_in_record(&bam, b"RG"),
2007            Some(b"mygroup".as_slice())
2008        );
2009    }
2010}