Skip to main content

fgumi_lib/
grouper.rs

1//! Grouper implementations for the 9-step pipeline.
2//!
3//! Each grouper is responsible for grouping decoded BAM records according
4//! to command-specific rules and emitting complete groups for processing.
5//!
6//! Note: BAM record parsing is now handled by the Decode step in the pipeline.
7//! Groupers receive pre-decoded `RecordBuf` vectors.
8
9use std::io;
10
11use noodles::sam::alignment::RecordBuf;
12
13use crate::sort::bam_fields;
14use crate::template::{Template, TemplateBatch};
15use crate::unified_pipeline::{BatchWeight, DecodedRecord, Grouper, MemoryEstimate};
16
17// ============================================================================
18// BatchWeight Implementations
19// ============================================================================
20
21/// Single records have weight 1 (for batch_size-like behavior).
22impl BatchWeight for RecordBuf {
23    fn batch_weight(&self) -> usize {
24        1
25    }
26}
27
28/// Raw byte records have weight 1 (for batch_size-like behavior).
29impl BatchWeight for Vec<u8> {
30    fn batch_weight(&self) -> usize {
31        1
32    }
33}
34
35/// Template batches have weight equal to the number of templates.
36impl BatchWeight for TemplateBatch {
37    fn batch_weight(&self) -> usize {
38        self.len()
39    }
40}
41
42// ============================================================================
43// SingleRecordGrouper
44// ============================================================================
45
46/// A grouper that emits each record as its own "group".
47///
48/// Used by commands that process records independently:
49/// - filter: Apply filters to each record
50/// - clip: Clip each record
51/// - correct: Correct UMIs in each record
52#[derive(Default)]
53pub struct SingleRecordGrouper;
54
55impl SingleRecordGrouper {
56    /// Create a new single-record grouper.
57    #[must_use]
58    pub fn new() -> Self {
59        Self
60    }
61}
62
63impl Grouper for SingleRecordGrouper {
64    type Group = RecordBuf;
65
66    fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
67        // Each record is its own group - extract and return them directly
68        records
69            .into_iter()
70            .map(|d| {
71                d.into_record().ok_or_else(|| {
72                    io::Error::new(
73                        io::ErrorKind::InvalidData,
74                        "SingleRecordGrouper requires parsed records, got raw bytes",
75                    )
76                })
77            })
78            .collect()
79    }
80
81    fn finish(&mut self) -> io::Result<Option<Self::Group>> {
82        // No state to flush - records are passed through immediately
83        Ok(None)
84    }
85
86    fn has_pending(&self) -> bool {
87        // Never has pending state
88        false
89    }
90}
91
92// ============================================================================
93// SingleRawRecordGrouper
94// ============================================================================
95
96/// A grouper that emits each raw-byte record as its own "group".
97///
98/// Used by commands that process records independently using raw bytes
99/// (e.g., filter with raw-byte pipeline).
100#[derive(Default)]
101pub struct SingleRawRecordGrouper;
102
103impl SingleRawRecordGrouper {
104    /// Create a new single raw record grouper.
105    #[must_use]
106    pub fn new() -> Self {
107        Self
108    }
109}
110
111impl Grouper for SingleRawRecordGrouper {
112    type Group = Vec<u8>;
113
114    fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
115        records
116            .into_iter()
117            .map(|d| {
118                d.into_raw_bytes().ok_or_else(|| {
119                    io::Error::new(
120                        io::ErrorKind::InvalidData,
121                        "SingleRawRecordGrouper requires raw bytes, got parsed record",
122                    )
123                })
124            })
125            .collect()
126    }
127
128    fn finish(&mut self) -> io::Result<Option<Self::Group>> {
129        Ok(None)
130    }
131
132    fn has_pending(&self) -> bool {
133        false
134    }
135}
136
137// ============================================================================
138// TemplateGrouper
139// ============================================================================
140
141use std::collections::VecDeque;
142
143/// A Grouper that batches BAM records into Templates by QNAME.
144///
145/// Input BAM must be query-name sorted or grouped by QNAME.
146/// Records with the same QNAME are grouped into a [`Template`], then
147/// templates are batched for efficient parallel processing.
148///
149/// # Example
150///
151/// ```ignore
152/// use fgumi_lib::grouper::TemplateGrouper;
153/// use fgumi_lib::unified_pipeline::Grouper;
154///
155/// let grouper = TemplateGrouper::new(1000); // 1000 templates per batch
156/// // Use with run_bam_pipeline_with_grouper...
157/// ```
158pub struct TemplateGrouper {
159    /// Number of templates per batch.
160    batch_size: usize,
161    /// Current template name being accumulated.
162    current_name: Option<Vec<u8>>,
163    /// Hash of current template name (for fast comparison).
164    current_name_hash: Option<u64>,
165    /// Records for current template (non-raw mode).
166    current_records: Vec<RecordBuf>,
167    /// Raw byte records for current template (raw-byte mode).
168    current_raw_records: Vec<Vec<u8>>,
169    /// Completed templates waiting to be batched.
170    pending_templates: VecDeque<Template>,
171}
172
173impl TemplateGrouper {
174    /// Create a new [`TemplateGrouper`].
175    ///
176    /// # Arguments
177    /// * `batch_size` - Number of templates per batch (e.g., 1000)
178    #[must_use]
179    pub fn new(batch_size: usize) -> Self {
180        Self {
181            batch_size: batch_size.max(1),
182            current_name: None,
183            current_name_hash: None,
184            current_records: Vec::new(),
185            current_raw_records: Vec::new(),
186            pending_templates: VecDeque::new(),
187        }
188    }
189
190    /// Flush current template to pending queue if non-empty.
191    fn flush_current_template(&mut self) -> io::Result<()> {
192        debug_assert!(
193            self.current_raw_records.is_empty() || self.current_records.is_empty(),
194            "mixed raw/parsed records in same template group"
195        );
196        if !self.current_raw_records.is_empty() {
197            let template =
198                Template::from_raw_records(std::mem::take(&mut self.current_raw_records))
199                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
200            self.pending_templates.push_back(template);
201            self.current_name = None;
202            self.current_name_hash = None;
203        } else if !self.current_records.is_empty() {
204            let template = Template::from_records(std::mem::take(&mut self.current_records))
205                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
206            self.pending_templates.push_back(template);
207            self.current_name = None;
208            self.current_name_hash = None;
209        }
210        Ok(())
211    }
212
213    /// Collect completed batches from pending templates.
214    fn collect_batches(&mut self) -> Vec<TemplateBatch> {
215        let mut batches = Vec::new();
216
217        while self.pending_templates.len() >= self.batch_size {
218            let mut batch = Vec::with_capacity(self.batch_size);
219            for _ in 0..self.batch_size {
220                if let Some(template) = self.pending_templates.pop_front() {
221                    batch.push(template);
222                }
223            }
224            batches.push(batch);
225        }
226
227        batches
228    }
229}
230
231impl Grouper for TemplateGrouper {
232    type Group = TemplateBatch;
233
234    fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
235        use crate::unified_pipeline::DecodedRecordData;
236
237        // Group records by QNAME (using pre-computed name_hash for fast comparison)
238        for decoded in records {
239            let name_hash = decoded.key.name_hash;
240
241            match decoded.data {
242                DecodedRecordData::Raw(raw) => {
243                    // Raw-byte mode: only extract name when starting a new template
244                    match (self.current_name_hash, name_hash) {
245                        (Some(current_hash), new_hash) if current_hash == new_hash => {
246                            self.current_raw_records.push(raw);
247                        }
248                        _ => {
249                            self.flush_current_template()?;
250                            self.current_name = Some(bam_fields::read_name(&raw).to_vec());
251                            self.current_name_hash = Some(name_hash);
252                            self.current_raw_records.push(raw);
253                        }
254                    }
255                }
256                DecodedRecordData::Parsed(record) => {
257                    let name = record.name().map(|n| Vec::from(<_ as AsRef<[u8]>>::as_ref(n)));
258                    match (self.current_name_hash, name_hash) {
259                        (Some(current_hash), new_hash) if current_hash == new_hash => {
260                            self.current_records.push(record);
261                        }
262                        _ => {
263                            self.flush_current_template()?;
264                            self.current_name = name;
265                            self.current_name_hash = Some(name_hash);
266                            self.current_records.push(record);
267                        }
268                    }
269                }
270            }
271        }
272
273        // Return any completed batches
274        Ok(self.collect_batches())
275    }
276
277    fn finish(&mut self) -> io::Result<Option<Self::Group>> {
278        // Flush any remaining template
279        self.flush_current_template()?;
280
281        // Return remaining templates as a final batch (may be < batch_size)
282        if self.pending_templates.is_empty() {
283            return Ok(None);
284        }
285
286        let mut batch = Vec::with_capacity(self.pending_templates.len());
287        while let Some(template) = self.pending_templates.pop_front() {
288            batch.push(template);
289        }
290        Ok(Some(batch))
291    }
292
293    fn has_pending(&self) -> bool {
294        self.current_name.is_some()
295            || !self.pending_templates.is_empty()
296            || !self.current_raw_records.is_empty()
297            || !self.current_records.is_empty()
298    }
299}
300
301use noodles::sam::alignment::record::data::field::Tag;
302
303use crate::unified_pipeline::GroupKey;
304
305impl MemoryEstimate for ProcessedPositionGroup {
306    fn estimate_heap_size(&self) -> usize {
307        // templates: Vec<Template>
308        let templates_size: usize =
309            self.templates.iter().map(MemoryEstimate::estimate_heap_size).sum();
310        let templates_vec_overhead = self.templates.capacity() * std::mem::size_of::<Template>();
311
312        // family_sizes: AHashMap<usize, u64>
313        // Each entry is ~24 bytes (key + value + overhead)
314        let family_sizes_size = self.family_sizes.len() * 24;
315
316        // filter_metrics: FilterMetrics is mostly inline (u64 fields)
317        // Just a small struct overhead
318
319        templates_size + templates_vec_overhead + family_sizes_size
320    }
321}
322
323/// Metrics tracking what was filtered during processing.
324#[derive(Default, Clone, Debug)]
325pub struct FilterMetrics {
326    /// Total templates seen before filtering.
327    pub total_templates: u64,
328    /// Templates accepted after filtering.
329    pub accepted_templates: u64,
330    /// Templates discarded because they weren't passing filter.
331    pub discarded_non_pf: u64,
332    /// Templates discarded due to poor alignment.
333    pub discarded_poor_alignment: u64,
334    /// Templates discarded due to Ns in UMI.
335    pub discarded_ns_in_umi: u64,
336    /// Templates discarded due to UMI being too short.
337    pub discarded_umi_too_short: u64,
338}
339
340impl FilterMetrics {
341    /// Create new empty metrics.
342    #[must_use]
343    pub fn new() -> Self {
344        Self::default()
345    }
346
347    /// Merge another `FilterMetrics` into this one.
348    pub fn merge(&mut self, other: &FilterMetrics) {
349        self.total_templates += other.total_templates;
350        self.accepted_templates += other.accepted_templates;
351        self.discarded_non_pf += other.discarded_non_pf;
352        self.discarded_poor_alignment += other.discarded_poor_alignment;
353        self.discarded_ns_in_umi += other.discarded_ns_in_umi;
354        self.discarded_umi_too_short += other.discarded_umi_too_short;
355    }
356}
357
358/// Result of processing a position group through UMI assignment.
359#[derive(Debug)]
360pub struct ProcessedPositionGroup {
361    /// Templates with MI tags assigned, sorted by MI then name.
362    /// `Template.mi` contains local IDs (0, 1, 2, ...) - the serialize step adds a global offset.
363    pub templates: Vec<Template>,
364    /// Family size counts for this position group.
365    pub family_sizes: ahash::AHashMap<usize, u64>,
366    /// Filter metrics for this position group (thread-local, merged later).
367    pub filter_metrics: FilterMetrics,
368    /// Total input records processed (for progress tracking).
369    pub input_record_count: u64,
370}
371
372// ============================================================================
373// RawPositionGroup + RecordPositionGrouper
374// ============================================================================
375
376/// Position key tuple returned by [`GroupKey::position_key`].
377type PositionKeyTuple = (i32, i32, u8, i32, i32, u8, u16, u64);
378
379/// A position group containing decoded records at the same genomic position.
380///
381/// Contains raw [`DecodedRecord`]s rather than built [`Template`]s. Template
382/// construction is deferred to the parallel Process step via
383/// [`build_templates_from_records`].
384///
385/// Used by [`RecordPositionGrouper`] which requires MC tags so that each record's
386/// pre-computed [`GroupKey`] contains the complete paired position.
387#[derive(Debug)]
388pub struct RawPositionGroup {
389    /// The position key for this group (from the first record's `GroupKey`).
390    pub group_key: GroupKey,
391    /// Raw decoded records at this position (not yet grouped into Templates).
392    pub records: Vec<DecodedRecord>,
393}
394
395impl BatchWeight for RawPositionGroup {
396    fn batch_weight(&self) -> usize {
397        // Estimate ~2 records per template for paired-end data
398        self.records.len().div_ceil(2)
399    }
400}
401
402impl MemoryEstimate for RawPositionGroup {
403    fn estimate_heap_size(&self) -> usize {
404        let records_size: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
405        let records_vec_overhead = self.records.capacity() * std::mem::size_of::<DecodedRecord>();
406        records_size + records_vec_overhead
407    }
408}
409
410/// A lightweight position grouper that compares per-record [`GroupKey::position_key`]
411/// values to detect group boundaries.
412///
413/// Does **not** build Templates or combine keys — it accumulates raw
414/// [`DecodedRecord`]s and emits [`RawPositionGroup`]s. Template construction
415/// is deferred to the parallel Process step via [`build_templates_from_records`].
416///
417/// **Requirement:** Paired-end reads must have MC tags so that [`compute_group_key`]
418/// produces complete [`GroupKey::paired`] values. Without MC tags, R1 and R2 would
419/// get different `position_key()` values and be incorrectly split.
420///
421/// By default, secondary/supplementary reads are skipped (they have UNKNOWN
422/// position keys). Use [`with_secondary_supplementary`](Self::with_secondary_supplementary)
423/// to include them — they are coalesced by `name_hash` into the group of their
424/// adjacent primary read (requires template-coordinate sorted input).
425///
426/// [`compute_group_key`]: crate::read_info::compute_group_key
427pub struct RecordPositionGrouper {
428    /// Current position key being accumulated (tuple for fast comparison).
429    current_position_key: Option<PositionKeyTuple>,
430    /// Full `GroupKey` for emitting with the group.
431    current_group_key: Option<GroupKey>,
432    /// Records at the current position.
433    current_records: Vec<DecodedRecord>,
434    /// Whether MC tags have been validated on paired records.
435    mc_validated: bool,
436    /// Whether to include secondary and supplementary reads in groups.
437    /// When true, secondary/supplementary reads (which have UNKNOWN position keys)
438    /// are kept and coalesced by `name_hash` into the group of their primary read.
439    /// When false (default), they are skipped.
440    include_secondary_supplementary: bool,
441}
442
443impl RecordPositionGrouper {
444    /// Create a new record-level position grouper.
445    ///
446    /// Secondary/supplementary reads are skipped by default.
447    #[must_use]
448    pub fn new() -> Self {
449        Self {
450            current_position_key: None,
451            current_group_key: None,
452            current_records: Vec::new(),
453            mc_validated: false,
454            include_secondary_supplementary: false,
455        }
456    }
457
458    /// Create a record-level position grouper that includes secondary/supplementary reads.
459    ///
460    /// Secondary/supplementary reads have UNKNOWN position keys and are coalesced
461    /// by `name_hash` into the group of their adjacent primary read.
462    #[must_use]
463    pub fn with_secondary_supplementary() -> Self {
464        Self { include_secondary_supplementary: true, ..Self::new() }
465    }
466
467    /// Validate that a paired primary record has an MC tag.
468    ///
469    /// Skips validation for records that are unmapped or whose mates are unmapped,
470    /// since unmapped reads have no CIGAR to report in an MC tag.
471    fn validate_mc_tag(&mut self, decoded: &DecodedRecord) -> io::Result<()> {
472        use crate::sort::bam_fields;
473        use crate::unified_pipeline::DecodedRecordData;
474
475        if self.mc_validated {
476            return Ok(());
477        }
478
479        match &decoded.data {
480            DecodedRecordData::Raw(raw) => {
481                let flg = bam_fields::flags(raw);
482                let is_paired = (flg & bam_fields::flags::PAIRED) != 0;
483                let is_secondary = (flg & bam_fields::flags::SECONDARY) != 0;
484                let is_supplementary = (flg & bam_fields::flags::SUPPLEMENTARY) != 0;
485                let is_unmapped = (flg & bam_fields::flags::UNMAPPED) != 0;
486                let is_mate_unmapped = (flg & bam_fields::flags::MATE_UNMAPPED) != 0;
487
488                if is_paired
489                    && !is_secondary
490                    && !is_supplementary
491                    && !is_unmapped
492                    && !is_mate_unmapped
493                {
494                    if bam_fields::find_mc_tag_in_record(raw).is_none() {
495                        return Err(io::Error::new(
496                            io::ErrorKind::InvalidData,
497                            "RecordPositionGrouper requires MC tags on paired-end reads. \
498                             Run `fgumi zipper` to add MC tags before `fgumi group`.",
499                        ));
500                    }
501                    self.mc_validated = true;
502                }
503            }
504            DecodedRecordData::Parsed(record) => {
505                let flags = record.flags();
506                if flags.is_segmented()
507                    && !flags.is_secondary()
508                    && !flags.is_supplementary()
509                    && !flags.is_unmapped()
510                    && !flags.is_mate_unmapped()
511                {
512                    let mc_tag = Tag::from([b'M', b'C']);
513                    if record.data().get(&mc_tag).is_none() {
514                        return Err(io::Error::new(
515                            io::ErrorKind::InvalidData,
516                            "RecordPositionGrouper requires MC tags on paired-end reads. \
517                             Run `fgumi zipper` to add MC tags before `fgumi group`.",
518                        ));
519                    }
520                    self.mc_validated = true;
521                }
522            }
523        }
524        Ok(())
525    }
526
527    /// Process a single decoded record, potentially emitting a completed group.
528    fn process_record(&mut self, decoded: DecodedRecord) -> io::Result<Option<RawPositionGroup>> {
529        // Skip secondary and supplementary reads (they have UNKNOWN ref_id1)
530        // unless configured to include them (for dedup, which needs them in templates).
531        if decoded.key.ref_id1 == GroupKey::UNKNOWN_REF && !self.include_secondary_supplementary {
532            return Ok(None);
533        }
534
535        // Validate MC tag on first paired primary record
536        self.validate_mc_tag(&decoded)?;
537
538        let record_pos_key = decoded.key.position_key();
539
540        match self.current_position_key {
541            Some(current_pos) if current_pos == record_pos_key => {
542                // Same position — accumulate
543                self.current_records.push(decoded);
544                Ok(None)
545            }
546            Some(_)
547                if self
548                    .current_records
549                    .last()
550                    .is_some_and(|last| last.key.name_hash == decoded.key.name_hash) =>
551            {
552                // Different position but same template (name_hash match with previous
553                // record). This happens for paired reads with unmapped mates in
554                // template-coordinate sorted input: R1 is mapped at some position while
555                // R2 is unmapped (position -1:0), but they're adjacent by QNAME.
556                // Keep them in the same group so they form a complete template.
557                self.current_records.push(decoded);
558                Ok(None)
559            }
560            Some(_) => {
561                // Different position — emit current group, start new one
562                let finished_records = std::mem::take(&mut self.current_records);
563                let finished_key = self
564                    .current_group_key
565                    .take()
566                    .expect("current_group_key set when current_position_key is set");
567
568                let group = RawPositionGroup { group_key: finished_key, records: finished_records };
569
570                // Start new position group
571                self.current_position_key = Some(record_pos_key);
572                self.current_group_key = Some(decoded.key);
573                self.current_records.push(decoded);
574
575                Ok(Some(group))
576            }
577            None => {
578                // First record
579                self.current_position_key = Some(record_pos_key);
580                self.current_group_key = Some(decoded.key);
581                self.current_records.push(decoded);
582                Ok(None)
583            }
584        }
585    }
586}
587
588impl Default for RecordPositionGrouper {
589    fn default() -> Self {
590        Self::new()
591    }
592}
593
594impl Grouper for RecordPositionGrouper {
595    type Group = RawPositionGroup;
596
597    fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
598        let mut completed_groups = Vec::new();
599        for decoded in records {
600            if let Some(group) = self.process_record(decoded)? {
601                completed_groups.push(group);
602            }
603        }
604        Ok(completed_groups)
605    }
606
607    fn finish(&mut self) -> io::Result<Option<Self::Group>> {
608        if !self.current_records.is_empty() {
609            debug_assert!(
610                self.current_group_key.is_some(),
611                "RecordPositionGrouper has {} buffered records but no group key",
612                self.current_records.len()
613            );
614            if let Some(key) = self.current_group_key.take() {
615                let records = std::mem::take(&mut self.current_records);
616                self.current_position_key = None;
617                return Ok(Some(RawPositionGroup { group_key: key, records }));
618            }
619        }
620        Ok(None)
621    }
622
623    fn has_pending(&self) -> bool {
624        !self.current_records.is_empty()
625    }
626}
627
628/// Build [`Template`]s from raw decoded records.
629///
630/// Groups decoded records by `name_hash` and builds a [`Template`] from each group.
631///
632/// This is a generic helper: `extract` pulls the per-record payload out of each
633/// [`DecodedRecord`], and `build` converts a batch of payloads into a [`Template`].
634fn group_by_name_and_build<T>(
635    records: Vec<DecodedRecord>,
636    extract: impl Fn(DecodedRecord) -> io::Result<T>,
637    build: impl Fn(Vec<T>) -> anyhow::Result<Template>,
638) -> io::Result<Vec<Template>> {
639    let mut templates = Vec::new();
640    let mut current_name_hash: Option<u64> = None;
641    let mut current_items: Vec<T> = Vec::new();
642
643    for decoded in records {
644        let name_hash = decoded.key.name_hash;
645        let item = extract(decoded)?;
646
647        match current_name_hash {
648            Some(h) if h == name_hash => {
649                current_items.push(item);
650            }
651            Some(_) => {
652                let template = build(std::mem::take(&mut current_items))
653                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
654                templates.push(template);
655                current_name_hash = Some(name_hash);
656                current_items.push(item);
657            }
658            None => {
659                current_name_hash = Some(name_hash);
660                current_items.push(item);
661            }
662        }
663    }
664
665    if !current_items.is_empty() {
666        let template =
667            build(current_items).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
668        templates.push(template);
669    }
670
671    Ok(templates)
672}
673
674/// Groups records by `name_hash` (QNAME) and builds a [`Template`] from each group.
675/// Records must be grouped by QNAME (guaranteed by template-coordinate sort order).
676///
677/// Designed to run in the parallel Process step after [`RecordPositionGrouper`]
678/// emits [`RawPositionGroup`]s.
679///
680/// # Errors
681///
682/// Returns an error if template construction from records fails.
683pub fn build_templates_from_records(records: Vec<DecodedRecord>) -> io::Result<Vec<Template>> {
684    use crate::unified_pipeline::DecodedRecordData;
685
686    if records.is_empty() {
687        return Ok(Vec::new());
688    }
689
690    let raw_byte_mode = matches!(records[0].data, DecodedRecordData::Raw(_));
691
692    if raw_byte_mode {
693        group_by_name_and_build(
694            records,
695            |d| match d.data {
696                DecodedRecordData::Raw(v) => Ok(v),
697                DecodedRecordData::Parsed(_) => Err(io::Error::new(
698                    io::ErrorKind::InvalidData,
699                    "expected raw-byte record but found parsed record",
700                )),
701            },
702            Template::from_raw_records,
703        )
704    } else {
705        group_by_name_and_build(
706            records,
707            |d| match d.data {
708                DecodedRecordData::Parsed(r) => Ok(r),
709                DecodedRecordData::Raw(_) => Err(io::Error::new(
710                    io::ErrorKind::InvalidData,
711                    "expected parsed record but found raw-byte record",
712                )),
713            },
714            Template::from_records,
715        )
716    }
717}
718
719// ============================================================================
720// FASTQ Record Parsing Utilities
721// ============================================================================
722
723/// A parsed FASTQ record.
724#[derive(Debug, Clone)]
725pub struct FastqRecord {
726    /// Read name (without @ prefix).
727    pub name: Vec<u8>,
728    /// Sequence bases.
729    pub sequence: Vec<u8>,
730    /// Quality scores (Phred+33 encoded).
731    pub quality: Vec<u8>,
732}
733
734/// Result of parsing a FASTQ record.
735#[derive(Debug)]
736enum FastqParseResult {
737    /// Record incomplete - need more data.
738    Incomplete,
739    /// Parse error.
740    Error(io::Error),
741}
742
743/// Parse FASTQ records from a byte buffer.
744///
745/// FASTQ format:
746/// ```text
747/// @read_name
748/// SEQUENCE
749/// +
750/// QUALITY
751/// ```
752///
753/// Returns parsed records and leftover bytes for incomplete records.
754///
755/// # Errors
756///
757/// Returns an error if a record has mismatched sequence and quality lengths.
758pub fn parse_fastq_records(data: &[u8]) -> io::Result<(Vec<FastqRecord>, Vec<u8>)> {
759    let mut records = Vec::new();
760    let mut pos = 0;
761
762    while pos < data.len() {
763        // Find the start of a record (@ character at start of line)
764        if data[pos] != b'@' {
765            // Skip until we find @ at start of line or run out of data
766            while pos < data.len() && data[pos] != b'@' {
767                pos += 1;
768            }
769            if pos >= data.len() {
770                return Ok((records, Vec::new()));
771            }
772        }
773
774        // Try to parse a complete record
775        match parse_single_fastq_record(&data[pos..]) {
776            Ok((record, consumed)) => {
777                records.push(record);
778                pos += consumed;
779            }
780            Err(FastqParseResult::Incomplete) => {
781                // Not enough data for a complete record
782                return Ok((records, data[pos..].to_vec()));
783            }
784            Err(FastqParseResult::Error(e)) => {
785                return Err(e);
786            }
787        }
788    }
789
790    Ok((records, Vec::new()))
791}
792
793/// Parse a single FASTQ record from the beginning of a buffer.
794/// Returns (record, `bytes_consumed`) or an error.
795fn parse_single_fastq_record(data: &[u8]) -> Result<(FastqRecord, usize), FastqParseResult> {
796    let mut pos = 0;
797
798    // Line 1: @name
799    if data.is_empty() || data[0] != b'@' {
800        return Err(FastqParseResult::Error(io::Error::new(
801            io::ErrorKind::InvalidData,
802            "FASTQ record must start with @",
803        )));
804    }
805    pos += 1; // Skip @
806
807    let name_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
808    let name = data[pos..pos + name_end].to_vec();
809    pos += name_end + 1; // +1 for newline
810
811    // Line 2: sequence
812    if pos >= data.len() {
813        return Err(FastqParseResult::Incomplete);
814    }
815    let seq_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
816    let sequence = data[pos..pos + seq_end].to_vec();
817    pos += seq_end + 1;
818
819    // Line 3: + (separator)
820    if pos >= data.len() {
821        return Err(FastqParseResult::Incomplete);
822    }
823    let plus_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
824    if data[pos] != b'+' {
825        return Err(FastqParseResult::Error(io::Error::new(
826            io::ErrorKind::InvalidData,
827            "FASTQ separator line must start with +",
828        )));
829    }
830    pos += plus_end + 1;
831
832    // Line 4: quality
833    if pos >= data.len() {
834        return Err(FastqParseResult::Incomplete);
835    }
836    let qual_end = find_newline(&data[pos..]).ok_or(FastqParseResult::Incomplete)?;
837    let quality = data[pos..pos + qual_end].to_vec();
838    pos += qual_end + 1;
839
840    // Validate lengths match
841    if sequence.len() != quality.len() {
842        return Err(FastqParseResult::Error(io::Error::new(
843            io::ErrorKind::InvalidData,
844            format!("Sequence length ({}) != quality length ({})", sequence.len(), quality.len()),
845        )));
846    }
847
848    Ok((FastqRecord { name, sequence, quality }, pos))
849}
850
851/// Find the position of the next newline character.
852fn find_newline(data: &[u8]) -> Option<usize> {
853    data.iter().position(|&b| b == b'\n')
854}
855
856/// Strip /1 or /2 suffix from read name for comparison.
857#[must_use]
858pub fn strip_read_suffix(name: &[u8]) -> &[u8] {
859    if name.len() >= 2 {
860        let suffix = &name[name.len() - 2..];
861        if suffix == b"/1" || suffix == b"/2" {
862            return &name[..name.len() - 2];
863        }
864    }
865    name
866}
867
868// ============================================================================
869// FastqGrouper - Groups FASTQ records from multiple input streams
870// ============================================================================
871
872/// A template containing FASTQ records from all input files.
873/// For paired-end: R1 + R2 records with matching names.
874#[derive(Debug)]
875pub struct FastqTemplate {
876    /// Records from each input file (index matches input order).
877    pub records: Vec<FastqRecord>,
878    /// The common read name (should match across all records).
879    pub name: Vec<u8>,
880}
881
882impl MemoryEstimate for FastqRecord {
883    fn estimate_heap_size(&self) -> usize {
884        self.name.capacity() + self.sequence.capacity() + self.quality.capacity()
885    }
886}
887
888impl MemoryEstimate for FastqTemplate {
889    fn estimate_heap_size(&self) -> usize {
890        let records_heap: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
891        let records_vec_overhead = self.records.capacity() * std::mem::size_of::<FastqRecord>();
892        self.name.capacity() + records_heap + records_vec_overhead
893    }
894}
895
896/// Groups FASTQ records from multiple synchronized input streams into templates.
897///
898/// This grouper expects decompressed bytes from multiple FASTQ files,
899/// provided via `add_bytes_for_stream`. It parses records and groups them by
900/// read name into templates.
901pub struct FastqGrouper {
902    /// Number of input streams.
903    num_inputs: usize,
904    /// Leftover bytes for each input stream.
905    leftovers: Vec<Vec<u8>>,
906    /// Parsed but not yet grouped records for each stream.
907    pending_records: Vec<VecDeque<FastqRecord>>,
908}
909
910impl FastqGrouper {
911    /// Create a new FASTQ grouper for the specified number of input streams.
912    #[must_use]
913    pub fn new(num_inputs: usize) -> Self {
914        log::debug!("FastqGrouper::new: num_inputs={num_inputs}");
915        Self {
916            num_inputs,
917            leftovers: vec![Vec::new(); num_inputs],
918            pending_records: (0..num_inputs).map(|_| VecDeque::new()).collect(),
919        }
920    }
921
922    /// Add decompressed bytes from a specific input stream.
923    ///
924    /// **Note:** This method parses the bytes into records under the caller's lock.
925    /// For better parallel scaling, use `add_records_for_stream` with pre-parsed records.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if the stream index is out of range or parsing fails.
930    pub fn add_bytes_for_stream(&mut self, stream_idx: usize, data: &[u8]) -> io::Result<()> {
931        log::trace!(
932            "FastqGrouper::add_bytes_for_stream: stream_idx={}, num_inputs={}, data_len={}",
933            stream_idx,
934            self.num_inputs,
935            data.len()
936        );
937        if stream_idx >= self.num_inputs {
938            return Err(io::Error::new(
939                io::ErrorKind::InvalidInput,
940                format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
941            ));
942        }
943
944        // Combine with leftover
945        let combined = if self.leftovers[stream_idx].is_empty() {
946            data.to_vec()
947        } else {
948            let mut combined = std::mem::take(&mut self.leftovers[stream_idx]);
949            combined.extend_from_slice(data);
950            combined
951        };
952
953        // Parse FASTQ records
954        let (records, leftover) = parse_fastq_records(&combined)?;
955        self.leftovers[stream_idx] = leftover;
956
957        // Add to pending
958        self.pending_records[stream_idx].extend(records);
959
960        Ok(())
961    }
962
963    /// Add pre-parsed records directly to a specific input stream.
964    ///
965    /// This method enables the parallel Parse optimization by accepting already-parsed
966    /// records instead of raw bytes. The parsing happens in a parallel step, while
967    /// grouping (which requires sequential access) only does the grouping work.
968    ///
969    /// **This is the key method for fixing the t8 scaling bottleneck.**
970    ///
971    /// # Errors
972    ///
973    /// Returns an error if the stream index is out of range.
974    pub fn add_records_for_stream(
975        &mut self,
976        stream_idx: usize,
977        records: Vec<FastqRecord>,
978    ) -> io::Result<()> {
979        log::trace!(
980            "FastqGrouper::add_records_for_stream: stream_idx={}, num_inputs={}, records_len={}",
981            stream_idx,
982            self.num_inputs,
983            records.len()
984        );
985        if stream_idx >= self.num_inputs {
986            return Err(io::Error::new(
987                io::ErrorKind::InvalidInput,
988                format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
989            ));
990        }
991
992        // Add directly to pending - no parsing needed!
993        self.pending_records[stream_idx].extend(records);
994
995        Ok(())
996    }
997
998    /// Check if there are leftover bytes for any stream.
999    ///
1000    /// When using the parallel Parse optimization, this should always return false
1001    /// because boundary finding handles leftovers separately.
1002    #[must_use]
1003    pub fn has_leftover_bytes(&self) -> bool {
1004        self.leftovers.iter().any(|l| !l.is_empty())
1005    }
1006
1007    /// Try to emit complete templates (records from all streams with matching names).
1008    ///
1009    /// # Errors
1010    ///
1011    /// Returns an error if FASTQ files are out of sync (mismatched read names).
1012    ///
1013    /// # Panics
1014    ///
1015    /// Panics if a pending record queue is unexpectedly empty after checking non-emptiness.
1016    pub fn drain_complete_templates(&mut self) -> io::Result<Vec<FastqTemplate>> {
1017        let mut templates = Vec::new();
1018
1019        // Keep emitting while we have at least one record in each stream
1020        while self.pending_records.iter().all(|q| !q.is_empty()) {
1021            // Validate names match and get base_name (in block so names is dropped before pop)
1022            let base_name = {
1023                // Peek at the first record from each stream
1024                let names: Vec<_> =
1025                    self.pending_records.iter().map(|q| &q.front().unwrap().name).collect();
1026
1027                // Copy base_name immediately
1028                let base_name = strip_read_suffix(names[0]).to_vec();
1029
1030                // Validate all names match (strip /1, /2 suffixes for comparison)
1031                for (i, name) in names.iter().enumerate().skip(1) {
1032                    let other_base = strip_read_suffix(name);
1033                    if base_name != other_base {
1034                        return Err(io::Error::new(
1035                            io::ErrorKind::InvalidData,
1036                            format!(
1037                                "FASTQ files out of sync: stream 0 has '{}', stream {} has '{}'",
1038                                String::from_utf8_lossy(&base_name),
1039                                i,
1040                                String::from_utf8_lossy(other_base),
1041                            ),
1042                        ));
1043                    }
1044                }
1045
1046                base_name // names dropped here
1047            };
1048
1049            // Pop records from all streams
1050            let records: Vec<_> =
1051                self.pending_records.iter_mut().map(|q| q.pop_front().unwrap()).collect();
1052
1053            templates.push(FastqTemplate { name: base_name, records });
1054        }
1055
1056        Ok(templates)
1057    }
1058
1059    /// Check if there are pending records or leftover bytes.
1060    #[must_use]
1061    pub fn has_pending(&self) -> bool {
1062        self.leftovers.iter().any(|l| !l.is_empty())
1063            || self.pending_records.iter().any(|q| !q.is_empty())
1064    }
1065
1066    /// Finish processing and return any remaining template.
1067    ///
1068    /// # Errors
1069    ///
1070    /// Returns an error if there is incomplete or unmatched data at EOF.
1071    pub fn finish(&mut self) -> io::Result<Option<FastqTemplate>> {
1072        // Check for any remaining complete templates
1073        let templates = self.drain_complete_templates()?;
1074        if templates.len() == 1 {
1075            return Ok(templates.into_iter().next());
1076        } else if templates.len() > 1 {
1077            return Err(io::Error::new(
1078                io::ErrorKind::InvalidData,
1079                "Multiple templates remaining at finish",
1080            ));
1081        }
1082
1083        // Check for incomplete data
1084        if self.leftovers.iter().any(|l| !l.is_empty()) {
1085            return Err(io::Error::new(
1086                io::ErrorKind::UnexpectedEof,
1087                "Incomplete FASTQ record at EOF",
1088            ));
1089        }
1090
1091        if self.pending_records.iter().any(|q| !q.is_empty()) {
1092            return Err(io::Error::new(
1093                io::ErrorKind::InvalidData,
1094                "Unmatched FASTQ records at EOF - files out of sync",
1095            ));
1096        }
1097
1098        Ok(None)
1099    }
1100}
1101
1102// ============================================================================
1103// Tests
1104// ============================================================================
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109
1110    #[test]
1111    fn test_single_record_grouper_empty() {
1112        let mut grouper = SingleRecordGrouper::new();
1113        assert!(!grouper.has_pending());
1114
1115        let result = grouper.finish().unwrap();
1116        assert!(result.is_none());
1117    }
1118
1119    #[test]
1120    fn test_single_raw_record_grouper_empty() {
1121        let mut grouper = SingleRawRecordGrouper::new();
1122        assert!(!grouper.has_pending());
1123
1124        let result = grouper.finish().unwrap();
1125        assert!(result.is_none());
1126    }
1127
1128    #[test]
1129    fn test_single_raw_record_grouper_emits_each_record() {
1130        use crate::unified_pipeline::{DecodedRecord, GroupKey};
1131
1132        let mut grouper = SingleRawRecordGrouper::new();
1133        let raw1 = vec![1u8; 36];
1134        let raw2 = vec![2u8; 36];
1135        let records = vec![
1136            DecodedRecord::from_raw_bytes(raw1.clone(), GroupKey::default()),
1137            DecodedRecord::from_raw_bytes(raw2.clone(), GroupKey::default()),
1138        ];
1139
1140        let groups = grouper.add_records(records).unwrap();
1141        assert_eq!(groups.len(), 2);
1142        assert_eq!(groups[0], raw1);
1143        assert_eq!(groups[1], raw2);
1144    }
1145
1146    #[test]
1147    fn test_single_raw_record_grouper_rejects_parsed() {
1148        use crate::sam::builder::RecordBuilder;
1149        use crate::unified_pipeline::{DecodedRecord, GroupKey};
1150
1151        let mut grouper = SingleRawRecordGrouper::new();
1152        let rec = RecordBuilder::new().sequence("ACGT").build();
1153        let records = vec![DecodedRecord::new(rec, GroupKey::default())];
1154
1155        let result = grouper.add_records(records);
1156        assert!(result.is_err());
1157    }
1158
1159    // FASTQ parsing tests
1160    #[test]
1161    fn test_parse_single_fastq_record() {
1162        let data = b"@read1\nACGT\n+\nIIII\n";
1163        let (record, consumed) = parse_single_fastq_record(data).unwrap();
1164        assert_eq!(record.name, b"read1");
1165        assert_eq!(record.sequence, b"ACGT");
1166        assert_eq!(record.quality, b"IIII");
1167        assert_eq!(consumed, data.len());
1168    }
1169
1170    #[test]
1171    fn test_parse_fastq_records_multiple() {
1172        let data = b"@read1\nACGT\n+\nIIII\n@read2\nTGCA\n+\nJJJJ\n";
1173        let (records, leftover) = parse_fastq_records(data).unwrap();
1174        assert_eq!(records.len(), 2);
1175        assert_eq!(records[0].name, b"read1");
1176        assert_eq!(records[1].name, b"read2");
1177        assert!(leftover.is_empty());
1178    }
1179
1180    #[test]
1181    fn test_parse_fastq_incomplete_record() {
1182        let data = b"@read1\nACGT\n+\n";
1183        let (records, leftover) = parse_fastq_records(data).unwrap();
1184        assert!(records.is_empty());
1185        assert_eq!(leftover, data);
1186    }
1187
1188    #[test]
1189    fn test_parse_fastq_with_leftover() {
1190        let data = b"@read1\nACGT\n+\nIIII\n@read2\nTG";
1191        let (records, leftover) = parse_fastq_records(data).unwrap();
1192        assert_eq!(records.len(), 1);
1193        assert_eq!(records[0].name, b"read1");
1194        assert_eq!(leftover, b"@read2\nTG");
1195    }
1196
1197    #[test]
1198    fn test_strip_read_suffix() {
1199        assert_eq!(strip_read_suffix(b"read1/1"), b"read1");
1200        assert_eq!(strip_read_suffix(b"read1/2"), b"read1");
1201        assert_eq!(strip_read_suffix(b"read1"), b"read1");
1202        assert_eq!(strip_read_suffix(b"a"), b"a");
1203        assert_eq!(strip_read_suffix(b""), b"" as &[u8]);
1204    }
1205
1206    // FastqGrouper tests
1207    #[test]
1208    fn test_fastq_grouper_paired() {
1209        let mut grouper = FastqGrouper::new(2);
1210
1211        // Add R1 record
1212        grouper.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n").unwrap();
1213        // Add R2 record
1214        grouper.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n").unwrap();
1215
1216        let templates = grouper.drain_complete_templates().unwrap();
1217        assert_eq!(templates.len(), 1);
1218        assert_eq!(templates[0].name, b"read1");
1219        assert_eq!(templates[0].records.len(), 2);
1220        assert_eq!(templates[0].records[0].sequence, b"ACGT");
1221        assert_eq!(templates[0].records[1].sequence, b"TGCA");
1222    }
1223
1224    #[test]
1225    fn test_fastq_grouper_multiple_templates() {
1226        let mut grouper = FastqGrouper::new(2);
1227
1228        // Add multiple R1 records
1229        grouper
1230            .add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n@read2/1\nAAAA\n+\nIIII\n")
1231            .unwrap();
1232        // Add multiple R2 records
1233        grouper
1234            .add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n@read2/2\nTTTT\n+\nJJJJ\n")
1235            .unwrap();
1236
1237        let templates = grouper.drain_complete_templates().unwrap();
1238        assert_eq!(templates.len(), 2);
1239        assert_eq!(templates[0].name, b"read1");
1240        assert_eq!(templates[1].name, b"read2");
1241    }
1242
1243    #[test]
1244    fn test_fastq_grouper_partial_then_complete() {
1245        let mut grouper = FastqGrouper::new(2);
1246
1247        // Add R1 record only
1248        grouper.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n").unwrap();
1249
1250        // No complete templates yet
1251        let templates = grouper.drain_complete_templates().unwrap();
1252        assert!(templates.is_empty());
1253        assert!(grouper.has_pending());
1254
1255        // Now add R2 record
1256        grouper.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n").unwrap();
1257
1258        let templates = grouper.drain_complete_templates().unwrap();
1259        assert_eq!(templates.len(), 1);
1260    }
1261
1262    #[test]
1263    fn test_fastq_grouper_finish_empty() {
1264        let mut grouper = FastqGrouper::new(2);
1265        let result = grouper.finish().unwrap();
1266        assert!(result.is_none());
1267    }
1268
1269    #[test]
1270    fn test_fastq_grouper_out_of_sync() {
1271        let mut grouper = FastqGrouper::new(2);
1272
1273        // Add mismatched records
1274        grouper.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n").unwrap();
1275        grouper.add_bytes_for_stream(1, b"@read2/2\nTGCA\n+\nJJJJ\n").unwrap();
1276
1277        let result = grouper.drain_complete_templates();
1278        assert!(result.is_err());
1279        assert!(result.unwrap_err().to_string().contains("out of sync"));
1280    }
1281
1282    // ========================================================================
1283    // RecordPositionGrouper tests
1284    // ========================================================================
1285
1286    use crate::sam::builder::RecordBuilder;
1287
1288    /// Helper: create a `DecodedRecord` with the given `GroupKey` and flags/tags.
1289    fn make_decoded(
1290        key: GroupKey,
1291        paired: bool,
1292        first_segment: bool,
1293        mc: Option<&str>,
1294    ) -> DecodedRecord {
1295        let mut builder = RecordBuilder::new()
1296            .name("read1")
1297            .sequence("ACGT")
1298            .qualities(&[30, 30, 30, 30])
1299            .cigar("4M")
1300            .reference_sequence_id(0)
1301            .alignment_start(100)
1302            .paired(paired);
1303        if first_segment {
1304            builder = builder.first_segment(true);
1305        }
1306        if let Some(mc_val) = mc {
1307            builder = builder.tag("MC", mc_val);
1308        }
1309        DecodedRecord::new(builder.build(), key)
1310    }
1311
1312    /// Helper: create a secondary/supplementary `DecodedRecord` with UNKNOWN key.
1313    fn make_secondary_decoded(name_hash: u64) -> DecodedRecord {
1314        let key = GroupKey { name_hash, ..GroupKey::default() };
1315        let record = RecordBuilder::new()
1316            .name("read1")
1317            .sequence("ACGT")
1318            .qualities(&[30, 30, 30, 30])
1319            .secondary(true)
1320            .build();
1321        DecodedRecord::new(record, key)
1322    }
1323
1324    #[test]
1325    fn test_record_position_grouper_empty() {
1326        let mut grouper = RecordPositionGrouper::new();
1327        assert!(!grouper.has_pending());
1328        let result = grouper.finish().unwrap();
1329        assert!(result.is_none());
1330    }
1331
1332    #[test]
1333    fn test_record_position_grouper_single_unpaired_record() {
1334        let mut grouper = RecordPositionGrouper::new();
1335        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1336        let decoded = make_decoded(key, false, false, None);
1337
1338        let groups = grouper.add_records(vec![decoded]).unwrap();
1339        assert!(groups.is_empty()); // Not emitted yet
1340        assert!(grouper.has_pending());
1341
1342        let final_group = grouper.finish().unwrap().expect("should emit final group");
1343        assert_eq!(final_group.records.len(), 1);
1344        assert_eq!(final_group.group_key.ref_id1, 0);
1345        assert_eq!(final_group.group_key.pos1, 100);
1346    }
1347
1348    #[test]
1349    fn test_record_position_grouper_same_position_multiple_records() {
1350        let mut grouper = RecordPositionGrouper::new();
1351        let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 11111);
1352        let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 22222);
1353        let key3 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 33333);
1354
1355        let records = vec![
1356            make_decoded(key1, true, true, Some("4M")),
1357            make_decoded(key2, true, true, Some("4M")),
1358            make_decoded(key3, true, true, Some("4M")),
1359        ];
1360
1361        let groups = grouper.add_records(records).unwrap();
1362        assert!(groups.is_empty()); // All same position — not emitted yet
1363
1364        let final_group = grouper.finish().unwrap().expect("should emit final group");
1365        assert_eq!(final_group.records.len(), 3);
1366    }
1367
1368    #[test]
1369    fn test_record_position_grouper_different_positions() {
1370        let mut grouper = RecordPositionGrouper::new();
1371        let key_pos1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
1372        let key_pos2 = GroupKey::single(0, 200, 0, 0, 0, 22222);
1373        let key_pos3 = GroupKey::single(0, 300, 0, 0, 0, 33333);
1374
1375        let records = vec![
1376            make_decoded(key_pos1, false, false, None),
1377            make_decoded(key_pos2, false, false, None),
1378            make_decoded(key_pos3, false, false, None),
1379        ];
1380
1381        let groups = grouper.add_records(records).unwrap();
1382        // First two positions emitted when boundary detected
1383        assert_eq!(groups.len(), 2);
1384        assert_eq!(groups[0].group_key.pos1, 100);
1385        assert_eq!(groups[0].records.len(), 1);
1386        assert_eq!(groups[1].group_key.pos1, 200);
1387        assert_eq!(groups[1].records.len(), 1);
1388
1389        // Third position still pending
1390        let final_group = grouper.finish().unwrap().expect("should emit final group");
1391        assert_eq!(final_group.group_key.pos1, 300);
1392    }
1393
1394    #[test]
1395    fn test_record_position_grouper_skips_secondary() {
1396        let mut grouper = RecordPositionGrouper::new();
1397        let primary_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
1398
1399        let records = vec![
1400            make_decoded(primary_key, false, false, None),
1401            make_secondary_decoded(11111), // Should be skipped
1402        ];
1403
1404        let groups = grouper.add_records(records).unwrap();
1405        assert!(groups.is_empty());
1406
1407        let final_group = grouper.finish().unwrap().expect("should emit final group");
1408        assert_eq!(final_group.records.len(), 1); // Only primary kept
1409    }
1410
1411    #[test]
1412    fn test_record_position_grouper_paired_same_position_key() {
1413        // R1 and R2 with MC tags produce identical position_key after normalization
1414        let mut grouper = RecordPositionGrouper::new();
1415        // Both R1 and R2 normalize to the same position key
1416        let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1417        let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1418        // position_key() should be identical for r1 and r2
1419        assert_eq!(r1_key.position_key(), r2_key.position_key());
1420
1421        let records = vec![
1422            make_decoded(r1_key, true, true, Some("4M")),
1423            make_decoded(r2_key, true, false, Some("4M")),
1424        ];
1425
1426        let groups = grouper.add_records(records).unwrap();
1427        assert!(groups.is_empty()); // Same position — all in one group
1428
1429        let final_group = grouper.finish().unwrap().expect("should emit final group");
1430        assert_eq!(final_group.records.len(), 2);
1431    }
1432
1433    #[test]
1434    fn test_record_position_grouper_groups_records_by_position() {
1435        let mut grouper = RecordPositionGrouper::new();
1436        // Group 1: 3 records at position 100
1437        let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
1438        let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
1439        let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
1440        // Group 2: 2 records at position 200
1441        let key4 = GroupKey::single(0, 200, 0, 0, 0, 44444);
1442        let key5 = GroupKey::single(0, 200, 0, 0, 0, 55555);
1443        // Group 3: 1 record at position 300
1444        let key6 = GroupKey::single(0, 300, 0, 0, 0, 66666);
1445
1446        let records = vec![
1447            make_decoded(key1, false, false, None),
1448            make_decoded(key2, false, false, None),
1449            make_decoded(key3, false, false, None),
1450            make_decoded(key4, false, false, None),
1451            make_decoded(key5, false, false, None),
1452            make_decoded(key6, false, false, None),
1453        ];
1454
1455        let groups = grouper.add_records(records).unwrap();
1456        assert_eq!(groups.len(), 2);
1457        assert_eq!(groups[0].records.len(), 3);
1458        assert_eq!(groups[1].records.len(), 2);
1459
1460        let final_group = grouper.finish().unwrap().expect("should emit final group");
1461        assert_eq!(final_group.records.len(), 1);
1462    }
1463
1464    #[test]
1465    fn test_record_position_grouper_coalesces_unmapped_mate_by_name_hash() {
1466        // In template-coordinate sort, a mapped R1 and its unmapped R2 are adjacent.
1467        // R1 gets GroupKey::single(5, 100, ...) and R2 gets GroupKey::single(-1, 0, ...).
1468        // They should stay in the same group because they share name_hash.
1469        let mut grouper = RecordPositionGrouper::new();
1470        let name_hash = 12345_u64;
1471
1472        // R1: mapped at chr5:100, mate unmapped — no MC tag
1473        let r1_key = GroupKey::single(5, 100, 0, 0, 0, name_hash);
1474        let r1 = {
1475            let record = RecordBuilder::new()
1476                .name("read1")
1477                .sequence("ACGT")
1478                .qualities(&[30, 30, 30, 30])
1479                .cigar("4M")
1480                .reference_sequence_id(5)
1481                .alignment_start(100)
1482                .paired(true)
1483                .first_segment(true)
1484                .mate_unmapped(true)
1485                .build();
1486            DecodedRecord::new(record, r1_key)
1487        };
1488
1489        // R2: unmapped, mate mapped — different position key
1490        let r2_key = GroupKey::single(-1, 0, 0, 0, 0, name_hash);
1491        let r2 = {
1492            let record = RecordBuilder::new()
1493                .name("read1")
1494                .sequence("TGCA")
1495                .qualities(&[30, 30, 30, 30])
1496                .paired(true)
1497                .first_segment(false)
1498                .unmapped(true)
1499                .build();
1500            DecodedRecord::new(record, r2_key)
1501        };
1502
1503        // Verify position keys differ (this is the bug scenario)
1504        assert_ne!(r1_key.position_key(), r2_key.position_key());
1505
1506        let groups = grouper.add_records(vec![r1, r2]).unwrap();
1507        // Both should be in the same group — no emission yet
1508        assert!(groups.is_empty());
1509
1510        let final_group = grouper.finish().unwrap().expect("should emit final group");
1511        assert_eq!(final_group.records.len(), 2, "R1 and R2 should be in the same group");
1512        assert_eq!(final_group.group_key.ref_id1, 5, "Group key should use R1's position");
1513    }
1514
1515    #[test]
1516    fn test_record_position_grouper_does_not_coalesce_different_name_hash() {
1517        // Records with different name_hashes at different positions should NOT coalesce.
1518        let mut grouper = RecordPositionGrouper::new();
1519
1520        let r1_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
1521        let r2_key = GroupKey::single(0, 200, 0, 0, 0, 22222);
1522
1523        let records = vec![
1524            make_decoded(r1_key, false, false, None),
1525            make_decoded(r2_key, false, false, None),
1526        ];
1527
1528        let groups = grouper.add_records(records).unwrap();
1529        // Position boundary with different name_hash → group emitted
1530        assert_eq!(groups.len(), 1);
1531        assert_eq!(groups[0].records.len(), 1);
1532    }
1533
1534    #[test]
1535    fn test_record_position_grouper_mc_validation_skips_unmapped_mate() {
1536        // Paired records with unmapped mates have no MC tag — validation should skip them.
1537        let mut grouper = RecordPositionGrouper::new();
1538        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1539        let record = RecordBuilder::new()
1540            .name("read1")
1541            .sequence("ACGT")
1542            .qualities(&[30, 30, 30, 30])
1543            .cigar("4M")
1544            .reference_sequence_id(0)
1545            .alignment_start(100)
1546            .paired(true)
1547            .first_segment(true)
1548            .mate_unmapped(true)
1549            .build();
1550        let decoded = DecodedRecord::new(record, key);
1551
1552        // Should NOT error even though there's no MC tag
1553        let result = grouper.add_records(vec![decoded]);
1554        assert!(result.is_ok());
1555        assert!(!grouper.mc_validated); // Not validated — skipped due to unmapped mate
1556    }
1557
1558    #[test]
1559    fn test_record_position_grouper_mc_validation_fails_without_mc() {
1560        let mut grouper = RecordPositionGrouper::new();
1561        // Paired record without MC tag
1562        let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1563        let decoded = make_decoded(key, true, true, None); // No MC tag
1564
1565        let result = grouper.add_records(vec![decoded]);
1566        assert!(result.is_err());
1567        let err_msg = result.unwrap_err().to_string();
1568        assert!(err_msg.contains("MC tags"), "Error should mention MC tags: {err_msg}");
1569    }
1570
1571    #[test]
1572    fn test_record_position_grouper_mc_validation_passes_with_mc() {
1573        let mut grouper = RecordPositionGrouper::new();
1574        let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1575        let decoded = make_decoded(key, true, true, Some("4M"));
1576
1577        let result = grouper.add_records(vec![decoded]);
1578        assert!(result.is_ok());
1579    }
1580
1581    #[test]
1582    fn test_record_position_grouper_mc_validation_skips_unpaired() {
1583        // Unpaired records should not trigger MC validation
1584        let mut grouper = RecordPositionGrouper::new();
1585        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1586        let decoded = make_decoded(key, false, false, None); // Unpaired, no MC tag
1587
1588        let result = grouper.add_records(vec![decoded]);
1589        assert!(result.is_ok());
1590        assert!(!grouper.mc_validated); // Should not be validated for unpaired
1591    }
1592
1593    #[test]
1594    fn test_record_position_grouper_default_impl() {
1595        let grouper = RecordPositionGrouper::default();
1596        assert!(!grouper.has_pending());
1597    }
1598
1599    // ========================================================================
1600    // build_templates_from_records tests
1601    // ========================================================================
1602
1603    #[test]
1604    fn test_build_templates_empty() {
1605        let result = build_templates_from_records(vec![]).unwrap();
1606        assert!(result.is_empty());
1607    }
1608
1609    #[test]
1610    fn test_build_templates_single_record() {
1611        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1612        let record = RecordBuilder::new()
1613            .name("read1")
1614            .sequence("ACGT")
1615            .qualities(&[30, 30, 30, 30])
1616            .cigar("4M")
1617            .reference_sequence_id(0)
1618            .alignment_start(100)
1619            .mapping_quality(60)
1620            .paired(true)
1621            .first_segment(true)
1622            .build();
1623        let decoded = DecodedRecord::new(record, key);
1624
1625        let templates = build_templates_from_records(vec![decoded]).unwrap();
1626        assert_eq!(templates.len(), 1);
1627    }
1628
1629    #[test]
1630    fn test_build_templates_paired_same_name_hash() {
1631        // R1 and R2 with same name_hash should produce one template
1632        let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1633        let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1634
1635        let r1 = DecodedRecord::new(
1636            RecordBuilder::new()
1637                .name("read1")
1638                .sequence("ACGT")
1639                .qualities(&[30, 30, 30, 30])
1640                .cigar("4M")
1641                .reference_sequence_id(0)
1642                .alignment_start(100)
1643                .paired(true)
1644                .first_segment(true)
1645                .build(),
1646            r1_key,
1647        );
1648        let r2 = DecodedRecord::new(
1649            RecordBuilder::new()
1650                .name("read1")
1651                .sequence("TGCA")
1652                .qualities(&[30, 30, 30, 30])
1653                .cigar("4M")
1654                .reference_sequence_id(0)
1655                .alignment_start(200)
1656                .paired(true)
1657                .reverse_complement(true)
1658                .build(),
1659            r2_key,
1660        );
1661
1662        let templates = build_templates_from_records(vec![r1, r2]).unwrap();
1663        assert_eq!(templates.len(), 1);
1664    }
1665
1666    #[test]
1667    fn test_build_templates_multiple_qnames() {
1668        // Records with different name_hashes should produce separate templates
1669        let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
1670        let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
1671        let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
1672
1673        let records: Vec<DecodedRecord> = vec![
1674            DecodedRecord::new(
1675                RecordBuilder::new()
1676                    .name("readA")
1677                    .sequence("ACGT")
1678                    .qualities(&[30, 30, 30, 30])
1679                    .cigar("4M")
1680                    .reference_sequence_id(0)
1681                    .alignment_start(100)
1682                    .build(),
1683                key1,
1684            ),
1685            DecodedRecord::new(
1686                RecordBuilder::new()
1687                    .name("readB")
1688                    .sequence("ACGT")
1689                    .qualities(&[30, 30, 30, 30])
1690                    .cigar("4M")
1691                    .reference_sequence_id(0)
1692                    .alignment_start(100)
1693                    .build(),
1694                key2,
1695            ),
1696            DecodedRecord::new(
1697                RecordBuilder::new()
1698                    .name("readC")
1699                    .sequence("ACGT")
1700                    .qualities(&[30, 30, 30, 30])
1701                    .cigar("4M")
1702                    .reference_sequence_id(0)
1703                    .alignment_start(100)
1704                    .build(),
1705                key3,
1706            ),
1707        ];
1708
1709        let templates = build_templates_from_records(records).unwrap();
1710        assert_eq!(templates.len(), 3);
1711    }
1712
1713    #[test]
1714    fn test_build_templates_from_raw_bytes() {
1715        use crate::sort::bam_fields;
1716        use crate::unified_pipeline::{DecodedRecord, GroupKey};
1717
1718        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1719        let raw = bam_fields::make_bam_bytes(
1720            0,                                                            // tid
1721            100,                                                          // pos
1722            bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT, // flags
1723            b"read1",                                                     // name
1724            &[bam_fields::encode_op(0, 4)],                               // 4M cigar
1725            4,                                                            // seq_len
1726            0,                                                            // mate_tid
1727            200,                                                          // mate_pos
1728            &[],                                                          // aux
1729        );
1730        let decoded = DecodedRecord::from_raw_bytes(raw, key);
1731
1732        let templates = build_templates_from_records(vec![decoded]).unwrap();
1733        assert_eq!(templates.len(), 1);
1734    }
1735
1736    #[test]
1737    fn test_build_templates_from_raw_bytes_paired() {
1738        use crate::sort::bam_fields;
1739        use crate::unified_pipeline::{DecodedRecord, GroupKey};
1740
1741        let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1742        let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
1743
1744        let r1 = bam_fields::make_bam_bytes(
1745            0,
1746            100,
1747            bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT,
1748            b"read1",
1749            &[bam_fields::encode_op(0, 4)],
1750            4,
1751            0,
1752            200,
1753            &[],
1754        );
1755        let r2 = bam_fields::make_bam_bytes(
1756            0,
1757            200,
1758            bam_fields::flags::PAIRED
1759                | bam_fields::flags::LAST_SEGMENT
1760                | bam_fields::flags::REVERSE,
1761            b"read1",
1762            &[bam_fields::encode_op(0, 4)],
1763            4,
1764            0,
1765            100,
1766            &[],
1767        );
1768
1769        let records =
1770            vec![DecodedRecord::from_raw_bytes(r1, key1), DecodedRecord::from_raw_bytes(r2, key2)];
1771
1772        let templates = build_templates_from_records(records).unwrap();
1773        assert_eq!(templates.len(), 1);
1774    }
1775
1776    // ========================================================================
1777    // RawPositionGroup trait impl tests
1778    // ========================================================================
1779
1780    #[test]
1781    fn test_raw_position_group_batch_weight() {
1782        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1783        let records = vec![
1784            make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None),
1785            make_decoded(GroupKey::single(0, 100, 0, 0, 0, 22222), false, false, None),
1786            make_decoded(GroupKey::single(0, 100, 0, 0, 0, 33333), false, false, None),
1787            make_decoded(GroupKey::single(0, 100, 0, 0, 0, 44444), false, false, None),
1788        ];
1789        let group = RawPositionGroup { group_key: key, records };
1790
1791        // div_ceil(4, 2) = 2 (paired-end estimate)
1792        assert_eq!(group.batch_weight(), 2);
1793    }
1794
1795    #[test]
1796    fn test_raw_position_group_batch_weight_single() {
1797        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1798        let records =
1799            vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
1800        let group = RawPositionGroup { group_key: key, records };
1801
1802        // div_ceil(1, 2) = 1
1803        assert_eq!(group.batch_weight(), 1);
1804    }
1805
1806    #[test]
1807    fn test_raw_position_group_memory_estimate() {
1808        let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
1809        let records =
1810            vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
1811        let group = RawPositionGroup { group_key: key, records };
1812
1813        // Should return a non-zero value
1814        assert!(group.estimate_heap_size() > 0);
1815    }
1816}