Skip to main content

fgumi_lib/
mi_group.rs

1//! Molecular Identifier (MI) group utilities for streaming BAM processing.
2//!
3//! This module provides an iterator for grouping consecutive BAM records by their MI tag.
4//! This is useful for streaming consensus calling where input is already sorted by MI groups
5//! (e.g., output from `group`).
6
7use anyhow::{Result, bail};
8use noodles::sam::alignment::record::data::field::Tag;
9use noodles::sam::alignment::record_buf::RecordBuf;
10use std::io;
11
12use crate::unified_pipeline::{BatchWeight, DecodedRecord, MemoryEstimate};
13
14/// An iterator that groups consecutive BAM records by their MI (Molecular Identifier) tag.
15///
16/// This iterator assumes records with the same MI value are consecutive in the input stream,
17/// which is the output format from `group`. Each call to `next()` returns all records
18/// sharing the same MI value as a single group.
19///
20/// # Example
21///
22/// ```ignore
23/// use fgumi_lib::mi_group::MiGroupIterator;
24///
25/// let record_iter = reader.record_bufs(&header).map(|r| r.map_err(Into::into));
26/// let mi_groups = MiGroupIterator::new(record_iter, "MI");
27///
28/// for result in mi_groups {
29///     let (mi_value, records) = result?;
30///     // Process all records with this MI value
31/// }
32/// ```
33pub struct MiGroupIterator<I>
34where
35    I: Iterator<Item = Result<RecordBuf>>,
36{
37    record_iter: I,
38    tag: Tag,
39    current_mi: Option<String>,
40    current_group: Vec<RecordBuf>,
41    pending_error: Option<anyhow::Error>,
42    done: bool,
43}
44
45impl<I> MiGroupIterator<I>
46where
47    I: Iterator<Item = Result<RecordBuf>>,
48{
49    /// Creates a new `MiGroupIterator`.
50    ///
51    /// # Arguments
52    ///
53    /// * `record_iter` - Iterator over records to group by MI tag
54    /// * `tag_name` - The two-character tag name (e.g., "MI")
55    ///
56    /// # Panics
57    ///
58    /// Panics if `tag_name` is not exactly 2 characters.
59    pub fn new(record_iter: I, tag_name: &str) -> Self {
60        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
61        let tag_bytes = tag_name.as_bytes();
62        let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
63
64        MiGroupIterator {
65            record_iter,
66            tag,
67            current_mi: None,
68            current_group: Vec::new(),
69            pending_error: None,
70            done: false,
71        }
72    }
73
74    /// Extracts the MI tag value from a record.
75    fn get_mi(&self, record: &RecordBuf) -> Result<Option<String>> {
76        if let Some(tag_value) = record.data().get(&self.tag) {
77            match tag_value {
78                noodles::sam::alignment::record_buf::data::field::Value::String(s) => {
79                    Ok(Some(s.to_string()))
80                }
81                _ => {
82                    bail!("MI tag must be a string value");
83                }
84            }
85        } else {
86            Ok(None)
87        }
88    }
89}
90
91impl<I> Iterator for MiGroupIterator<I>
92where
93    I: Iterator<Item = Result<RecordBuf>>,
94{
95    /// Each item is a Result containing (`MI_value`, `Vec<RecordBuf>`) or an error.
96    type Item = Result<(String, Vec<RecordBuf>)>;
97
98    /// Returns the next MI group from the record iterator.
99    ///
100    /// Reads records until the MI tag changes, then returns the completed group.
101    /// On EOF, returns any remaining group, then None.
102    ///
103    /// Records without an MI tag are skipped.
104    fn next(&mut self) -> Option<Self::Item> {
105        if self.done {
106            return None;
107        }
108
109        // Check for pending error (return after we've flushed any pending group)
110        if let Some(e) = self.pending_error.take() {
111            self.done = true;
112            return Some(Err(e));
113        }
114
115        loop {
116            match self.record_iter.next() {
117                None => {
118                    // EOF - return any remaining group
119                    self.done = true;
120                    if self.current_group.is_empty() {
121                        return None;
122                    }
123                    let mi = self.current_mi.take().unwrap_or_default();
124                    let group = std::mem::take(&mut self.current_group);
125                    return Some(Ok((mi, group)));
126                }
127                Some(Err(e)) => {
128                    // If we have a pending group, return it first and save the error
129                    if !self.current_group.is_empty() {
130                        self.pending_error = Some(e);
131                        let mi = self.current_mi.take().unwrap_or_default();
132                        let group = std::mem::take(&mut self.current_group);
133                        return Some(Ok((mi, group)));
134                    }
135                    self.done = true;
136                    return Some(Err(e));
137                }
138                Some(Ok(record)) => {
139                    // Extract MI tag
140                    let mi = match self.get_mi(&record) {
141                        Ok(Some(mi)) => mi,
142                        Ok(None) => {
143                            // Skip records without MI tag
144                            continue;
145                        }
146                        Err(e) => {
147                            // If we have a pending group, return it first and save the error
148                            if !self.current_group.is_empty() {
149                                self.pending_error = Some(e);
150                                let mi = self.current_mi.take().unwrap_or_default();
151                                let group = std::mem::take(&mut self.current_group);
152                                return Some(Ok((mi, group)));
153                            }
154                            self.done = true;
155                            return Some(Err(e));
156                        }
157                    };
158
159                    if self.current_group.is_empty() {
160                        // First record or first record after returning a group
161                        self.current_mi = Some(mi);
162                        self.current_group.push(record);
163                    } else if self.current_mi.as_ref() == Some(&mi) {
164                        // Same MI, add to current group
165                        self.current_group.push(record);
166                    } else {
167                        // Different MI - return current group and start new one
168                        let old_mi = self.current_mi.take().unwrap_or_default();
169                        let group = std::mem::take(&mut self.current_group);
170                        self.current_mi = Some(mi);
171                        self.current_group.push(record);
172                        return Some(Ok((old_mi, group)));
173                    }
174                }
175            }
176        }
177    }
178}
179
180/// An iterator that groups consecutive BAM records by a transformed key.
181///
182/// Similar to `MiGroupIterator` but applies a key transformation function to the MI tag
183/// before grouping. This is useful for duplex consensus calling where reads with
184/// "1/A" and "1/B" should be grouped together under key "1".
185///
186/// # Example
187///
188/// ```ignore
189/// use fgumi_lib::mi_group::MiGroupIteratorWithTransform;
190/// use fgumi_lib::umi::extract_mi_base;
191///
192/// let record_iter = reader.record_bufs(&header).map(|r| r.map_err(Into::into));
193/// let mi_groups = MiGroupIteratorWithTransform::new(
194///     record_iter,
195///     "MI",
196///     |mi| extract_mi_base(mi).to_string(),
197/// );
198///
199/// for result in mi_groups {
200///     let (base_mi, records) = result?;
201///     // records contains all reads with MI tags like "base_mi/A" and "base_mi/B"
202/// }
203/// ```
204pub struct MiGroupIteratorWithTransform<I, F>
205where
206    I: Iterator<Item = Result<RecordBuf>>,
207    F: Fn(&str) -> String,
208{
209    record_iter: I,
210    tag: Tag,
211    key_transform: F,
212    current_key: Option<String>,
213    current_group: Vec<RecordBuf>,
214    pending_error: Option<anyhow::Error>,
215    done: bool,
216}
217
218impl<I, F> MiGroupIteratorWithTransform<I, F>
219where
220    I: Iterator<Item = Result<RecordBuf>>,
221    F: Fn(&str) -> String,
222{
223    /// Creates a new `MiGroupIteratorWithTransform`.
224    ///
225    /// # Arguments
226    ///
227    /// * `record_iter` - Iterator over records to group
228    /// * `tag_name` - The two-character tag name (e.g., "MI")
229    /// * `key_transform` - Function to transform the tag value into a grouping key
230    ///
231    /// # Panics
232    ///
233    /// Panics if `tag_name` is not exactly 2 characters.
234    pub fn new(record_iter: I, tag_name: &str, key_transform: F) -> Self {
235        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
236        let tag_bytes = tag_name.as_bytes();
237        let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
238
239        MiGroupIteratorWithTransform {
240            record_iter,
241            tag,
242            key_transform,
243            current_key: None,
244            current_group: Vec::new(),
245            pending_error: None,
246            done: false,
247        }
248    }
249
250    /// Extracts the tag value from a record and transforms it using the key function.
251    fn get_key(&self, record: &RecordBuf) -> Result<Option<String>> {
252        if let Some(tag_value) = record.data().get(&self.tag) {
253            match tag_value {
254                noodles::sam::alignment::record_buf::data::field::Value::String(s) => {
255                    let raw = s.to_string();
256                    Ok(Some((self.key_transform)(&raw)))
257                }
258                _ => {
259                    bail!("Tag must be a string value");
260                }
261            }
262        } else {
263            Ok(None)
264        }
265    }
266}
267
268impl<I, F> Iterator for MiGroupIteratorWithTransform<I, F>
269where
270    I: Iterator<Item = Result<RecordBuf>>,
271    F: Fn(&str) -> String,
272{
273    /// Each item is a Result containing (key, `Vec<RecordBuf>`) or an error.
274    type Item = Result<(String, Vec<RecordBuf>)>;
275
276    /// Returns the next group from the record iterator.
277    ///
278    /// Reads records until the transformed key changes, then returns the completed group.
279    /// On EOF, returns any remaining group, then None.
280    ///
281    /// Records without the tag are skipped.
282    fn next(&mut self) -> Option<Self::Item> {
283        if self.done {
284            return None;
285        }
286
287        // Check for pending error (return after we've flushed any pending group)
288        if let Some(e) = self.pending_error.take() {
289            self.done = true;
290            return Some(Err(e));
291        }
292
293        loop {
294            match self.record_iter.next() {
295                None => {
296                    // EOF - return any remaining group
297                    self.done = true;
298                    if self.current_group.is_empty() {
299                        return None;
300                    }
301                    let key = self.current_key.take().unwrap_or_default();
302                    let group = std::mem::take(&mut self.current_group);
303                    return Some(Ok((key, group)));
304                }
305                Some(Err(e)) => {
306                    // If we have a pending group, return it first and save the error
307                    if !self.current_group.is_empty() {
308                        self.pending_error = Some(e);
309                        let key = self.current_key.take().unwrap_or_default();
310                        let group = std::mem::take(&mut self.current_group);
311                        return Some(Ok((key, group)));
312                    }
313                    self.done = true;
314                    return Some(Err(e));
315                }
316                Some(Ok(record)) => {
317                    // Extract and transform tag
318                    let key = match self.get_key(&record) {
319                        Ok(Some(key)) => key,
320                        Ok(None) => {
321                            // Skip records without tag
322                            continue;
323                        }
324                        Err(e) => {
325                            // If we have a pending group, return it first and save the error
326                            if !self.current_group.is_empty() {
327                                self.pending_error = Some(e);
328                                let key = self.current_key.take().unwrap_or_default();
329                                let group = std::mem::take(&mut self.current_group);
330                                return Some(Ok((key, group)));
331                            }
332                            self.done = true;
333                            return Some(Err(e));
334                        }
335                    };
336
337                    if self.current_group.is_empty() {
338                        // First record or first record after returning a group
339                        self.current_key = Some(key);
340                        self.current_group.push(record);
341                    } else if self.current_key.as_ref() == Some(&key) {
342                        // Same key, add to current group
343                        self.current_group.push(record);
344                    } else {
345                        // Different key - return current group and start new one
346                        let old_key = self.current_key.take().unwrap_or_default();
347                        let group = std::mem::take(&mut self.current_group);
348                        self.current_key = Some(key);
349                        self.current_group.push(record);
350                        return Some(Ok((old_key, group)));
351                    }
352                }
353            }
354        }
355    }
356}
357
358// ============================================================================
359// Parallel processing support for MI groups
360// ============================================================================
361
362/// A single MI group: the MI tag value and all records with that MI.
363///
364/// This represents records that share the same Molecular Identifier (MI) tag value,
365/// used by consensus calling commands to group reads from the same source molecule.
366#[derive(Debug, Clone)]
367pub struct MiGroup {
368    /// The MI tag value (e.g., "0", "1/A", "1/B")
369    pub mi: String,
370    /// All records sharing this MI value
371    pub records: Vec<RecordBuf>,
372}
373
374impl MiGroup {
375    /// Creates a new MI group.
376    #[must_use]
377    pub fn new(mi: String, records: Vec<RecordBuf>) -> Self {
378        Self { mi, records }
379    }
380}
381
382impl BatchWeight for MiGroup {
383    fn batch_weight(&self) -> usize {
384        self.records.len()
385    }
386}
387
388impl MemoryEstimate for MiGroup {
389    fn estimate_heap_size(&self) -> usize {
390        // mi: String (heap allocated)
391        let mi_size = self.mi.capacity();
392
393        // records: Vec<RecordBuf>
394        let records_size: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
395        let records_vec_overhead = self.records.capacity() * std::mem::size_of::<RecordBuf>();
396
397        mi_size + records_size + records_vec_overhead
398    }
399}
400
401impl MemoryEstimate for MiGroupBatch {
402    fn estimate_heap_size(&self) -> usize {
403        let groups_size: usize = self.groups.iter().map(MemoryEstimate::estimate_heap_size).sum();
404        let groups_vec_overhead = self.groups.capacity() * std::mem::size_of::<MiGroup>();
405        groups_size + groups_vec_overhead
406    }
407}
408
409/// A batch of MI groups for parallel processing.
410///
411/// This structure holds a collection of MI groups that will be processed together
412/// in parallel.
413///
414/// # Performance
415///
416/// The batch is designed to be reused across iterations to minimize allocations.
417#[derive(Default)]
418pub struct MiGroupBatch {
419    /// The MI groups in this batch
420    pub groups: Vec<MiGroup>,
421}
422
423impl MiGroupBatch {
424    /// Creates a new empty MI group batch.
425    #[must_use]
426    pub fn new() -> Self {
427        Self { groups: Vec::new() }
428    }
429
430    /// Creates a new MI group batch with pre-allocated capacity.
431    #[must_use]
432    pub fn with_capacity(capacity: usize) -> Self {
433        Self { groups: Vec::with_capacity(capacity) }
434    }
435
436    /// Returns the number of groups in the batch.
437    #[must_use]
438    pub fn len(&self) -> usize {
439        self.groups.len()
440    }
441
442    /// Returns true if the batch is empty.
443    #[must_use]
444    pub fn is_empty(&self) -> bool {
445        self.groups.is_empty()
446    }
447
448    /// Clears all groups from the batch.
449    pub fn clear(&mut self) {
450        self.groups.clear();
451    }
452}
453
454impl BatchWeight for MiGroupBatch {
455    fn batch_weight(&self) -> usize {
456        self.groups.iter().map(|g| g.records.len()).sum()
457    }
458}
459
460// ============================================================================
461// MiGrouper for 7-step unified pipeline
462// ============================================================================
463
464use crate::unified_pipeline::Grouper;
465use std::collections::VecDeque;
466
467/// Type alias for record filter function used in [`MiGrouper`].
468type RecordFilterFn = Box<dyn Fn(&RecordBuf) -> bool + Send + Sync>;
469
470/// Type alias for MI tag transformation function used in [`MiGrouper`].
471type MiTransformFn = Box<dyn Fn(&str) -> String + Send + Sync>;
472
473/// A Grouper that reads pre-grouped BAM records by MI tag for the 7-step pipeline.
474///
475/// Input BAM is expected to be sorted/grouped by MI tag (output from group command).
476/// This grouper reads consecutive records with the same MI tag and yields them as groups.
477///
478/// Unlike `OwnedMiGroupReader` which implements the `Reader` trait for the parallel
479/// framework, this implements the `Grouper` trait for the 7-step unified pipeline.
480/// The key difference is that `Grouper::add_bytes` receives raw decompressed bytes
481/// and must deserialize BAM records inline.
482///
483/// # Example
484///
485/// ```ignore
486/// use fgumi_lib::mi_group::MiGrouper;
487/// use fgumi_lib::unified_pipeline::Grouper;
488///
489/// let grouper = MiGrouper::new("MI", 100);
490/// // Use with run_bam_pipeline_with_grouper...
491/// ```
492///
493/// For duplex consensus calling with record filtering and MI transformation:
494///
495/// ```ignore
496/// let grouper = MiGrouper::with_filter_and_transform(
497///     "MI",
498///     100,
499///     |r| !r.flags().is_secondary(), // filter function
500///     |mi| extract_mi_base(mi).to_string(),   // transform function
501/// );
502/// ```
503pub struct MiGrouper {
504    /// The MI tag to group by (e.g., "MI")
505    tag: Tag,
506    /// Number of MI groups per batch
507    batch_size: usize,
508    /// Current MI value being accumulated
509    current_mi: Option<String>,
510    /// Records in current MI group
511    current_records: Vec<RecordBuf>,
512    /// Completed groups waiting to be batched
513    pending_groups: VecDeque<MiGroup>,
514    /// Whether `finish()` has been called
515    finished: bool,
516    /// Optional record filter (returns true to keep record)
517    record_filter: Option<RecordFilterFn>,
518    /// Optional MI tag transformation function
519    mi_transform: Option<MiTransformFn>,
520}
521
522impl MiGrouper {
523    /// Create a new `MiGrouper`.
524    ///
525    /// # Arguments
526    /// * `tag_name` - The MI tag name (e.g., "MI")
527    /// * `batch_size` - Number of MI groups per batch (100 is typical)
528    ///
529    /// # Panics
530    ///
531    /// Panics if `tag_name` is not exactly 2 characters.
532    #[must_use]
533    pub fn new(tag_name: &str, batch_size: usize) -> Self {
534        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
535        let tag_bytes = tag_name.as_bytes();
536        let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
537
538        Self {
539            tag,
540            batch_size: batch_size.max(1),
541            current_mi: None,
542            current_records: Vec::new(),
543            pending_groups: VecDeque::new(),
544            finished: false,
545            record_filter: None,
546            mi_transform: None,
547        }
548    }
549
550    /// Create a `MiGrouper` with record filtering and MI tag transformation.
551    ///
552    /// This is useful for duplex consensus calling where we need to:
553    /// - Filter out secondary/supplementary reads
554    /// - Transform MI tags by stripping /A and /B suffixes
555    ///
556    /// # Arguments
557    /// * `tag_name` - The MI tag name (e.g., "MI")
558    /// * `batch_size` - Number of MI groups per batch (100 is typical)
559    /// * `record_filter` - Function that returns true to keep a record
560    /// * `mi_transform` - Function to transform MI tag value (e.g., strip /A /B suffix)
561    ///
562    /// # Panics
563    ///
564    /// Panics if `tag_name` is not exactly 2 characters.
565    pub fn with_filter_and_transform<F, T>(
566        tag_name: &str,
567        batch_size: usize,
568        record_filter: F,
569        mi_transform: T,
570    ) -> Self
571    where
572        F: Fn(&RecordBuf) -> bool + Send + Sync + 'static,
573        T: Fn(&str) -> String + Send + Sync + 'static,
574    {
575        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
576        let tag_bytes = tag_name.as_bytes();
577        let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
578
579        Self {
580            tag,
581            batch_size: batch_size.max(1),
582            current_mi: None,
583            current_records: Vec::new(),
584            pending_groups: VecDeque::new(),
585            finished: false,
586            record_filter: Some(Box::new(record_filter)),
587            mi_transform: Some(Box::new(mi_transform)),
588        }
589    }
590
591    /// Get the MI tag value from a record, optionally applying transformation.
592    fn get_mi_tag(&self, record: &RecordBuf) -> Option<String> {
593        record.data().get(&self.tag).and_then(|v| match v {
594            noodles::sam::alignment::record_buf::data::field::Value::String(s) => {
595                let raw = s.to_string();
596                // Apply transform if configured
597                if let Some(ref transform) = self.mi_transform {
598                    Some(transform(&raw))
599                } else {
600                    Some(raw)
601                }
602            }
603            _ => None,
604        })
605    }
606
607    /// Check if a record passes the filter (or return true if no filter).
608    fn should_keep(&self, record: &RecordBuf) -> bool {
609        match &self.record_filter {
610            Some(filter) => filter(record),
611            None => true,
612        }
613    }
614
615    /// Flush current MI group to pending.
616    fn flush_current_group(&mut self) {
617        if let Some(mi) = self.current_mi.take() {
618            if !self.current_records.is_empty() {
619                let records = std::mem::take(&mut self.current_records);
620                self.pending_groups.push_back(MiGroup::new(mi, records));
621            }
622        }
623    }
624
625    /// Try to form complete batches from pending groups.
626    fn drain_batches(&mut self) -> Vec<MiGroupBatch> {
627        let mut batches = Vec::new();
628        while self.pending_groups.len() >= self.batch_size {
629            let groups: Vec<MiGroup> = self.pending_groups.drain(..self.batch_size).collect();
630            batches.push(MiGroupBatch { groups });
631        }
632        batches
633    }
634}
635
636impl Grouper for MiGrouper {
637    type Group = MiGroupBatch;
638
639    fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
640        for decoded in records {
641            let record = decoded.into_record().ok_or_else(|| {
642                io::Error::new(io::ErrorKind::InvalidData, "MiGrouper requires parsed records")
643            })?;
644            // Apply record filter if configured - skip records that don't pass
645            if !self.should_keep(&record) {
646                continue;
647            }
648
649            // Get MI tag from record (with optional transformation)
650            let mi = self.get_mi_tag(&record).unwrap_or_default();
651
652            // Check if this starts a new MI group
653            match &self.current_mi {
654                Some(current) if current == &mi => {
655                    // Same MI, add to current group
656                    self.current_records.push(record);
657                }
658                Some(_) => {
659                    // Different MI, flush current and start new
660                    self.flush_current_group();
661                    self.current_mi = Some(mi);
662                    self.current_records.push(record);
663                }
664                None => {
665                    // First record
666                    self.current_mi = Some(mi);
667                    self.current_records.push(record);
668                }
669            }
670        }
671
672        // Return complete batches
673        Ok(self.drain_batches())
674    }
675
676    fn finish(&mut self) -> io::Result<Option<Self::Group>> {
677        if self.finished {
678            return Ok(None);
679        }
680        self.finished = true;
681
682        // Flush any remaining current group
683        self.flush_current_group();
684
685        // Return any remaining groups as final batch
686        if self.pending_groups.is_empty() {
687            Ok(None)
688        } else {
689            let groups: Vec<MiGroup> = self.pending_groups.drain(..).collect();
690            Ok(Some(MiGroupBatch { groups }))
691        }
692    }
693
694    fn has_pending(&self) -> bool {
695        !self.pending_groups.is_empty() || self.current_mi.is_some()
696    }
697}
698
699// ============================================================================
700// Raw-byte MI grouping for consensus callers
701// ============================================================================
702
703/// A single MI group holding raw-byte BAM records.
704#[derive(Debug, Clone)]
705pub struct RawMiGroup {
706    /// The MI tag value (e.g., "0", "1/A", "1/B")
707    pub mi: String,
708    /// Raw BAM records sharing this MI value
709    pub records: Vec<Vec<u8>>,
710}
711
712impl RawMiGroup {
713    /// Creates a new raw MI group.
714    #[must_use]
715    pub fn new(mi: String, records: Vec<Vec<u8>>) -> Self {
716        Self { mi, records }
717    }
718}
719
720impl BatchWeight for RawMiGroup {
721    fn batch_weight(&self) -> usize {
722        self.records.len()
723    }
724}
725
726impl MemoryEstimate for RawMiGroup {
727    fn estimate_heap_size(&self) -> usize {
728        let mi_size = self.mi.capacity();
729        let records_size: usize = self.records.iter().map(std::vec::Vec::capacity).sum();
730        let records_vec_overhead = self.records.capacity() * std::mem::size_of::<Vec<u8>>();
731        mi_size + records_size + records_vec_overhead
732    }
733}
734
735/// A batch of raw MI groups for parallel processing.
736#[derive(Default)]
737pub struct RawMiGroupBatch {
738    /// The raw MI groups in this batch
739    pub groups: Vec<RawMiGroup>,
740}
741
742impl RawMiGroupBatch {
743    /// Creates a new empty raw MI group batch.
744    #[must_use]
745    pub fn new() -> Self {
746        Self { groups: Vec::new() }
747    }
748}
749
750impl BatchWeight for RawMiGroupBatch {
751    fn batch_weight(&self) -> usize {
752        self.groups.iter().map(|g| g.records.len()).sum()
753    }
754}
755
756impl MemoryEstimate for RawMiGroupBatch {
757    fn estimate_heap_size(&self) -> usize {
758        let groups_size: usize = self.groups.iter().map(MemoryEstimate::estimate_heap_size).sum();
759        let groups_vec_overhead = self.groups.capacity() * std::mem::size_of::<RawMiGroup>();
760        groups_size + groups_vec_overhead
761    }
762}
763
764/// Type alias for raw-byte MI tag transformation function.
765type RawMiTransformFn = Box<dyn Fn(&[u8]) -> String + Send + Sync>;
766
767/// Type alias for raw-byte record filter function.
768type RawRecordFilterFn = Box<dyn Fn(&[u8]) -> bool + Send + Sync>;
769
770/// A Grouper that groups raw-byte BAM records by MI tag.
771///
772/// This is the raw-byte equivalent of `MiGrouper`. Instead of parsing records
773/// into `RecordBuf`, it extracts MI tags directly from raw BAM bytes using
774/// `bam_fields::find_string_tag_in_record()`.
775pub struct RawMiGrouper {
776    /// The MI tag bytes to search for
777    tag: [u8; 2],
778    /// Number of MI groups per batch
779    batch_size: usize,
780    /// Current MI value being accumulated
781    current_mi: Option<String>,
782    /// Records in current MI group (raw bytes)
783    current_records: Vec<Vec<u8>>,
784    /// Completed groups waiting to be batched
785    pending_groups: VecDeque<RawMiGroup>,
786    /// Whether `finish()` has been called
787    finished: bool,
788    /// Optional MI tag transformation function
789    mi_transform: Option<RawMiTransformFn>,
790    /// Optional record filter
791    record_filter: Option<RawRecordFilterFn>,
792}
793
794impl RawMiGrouper {
795    /// Create a new `RawMiGrouper`.
796    ///
797    /// # Panics
798    ///
799    /// Panics if `tag_name` is not exactly 2 characters.
800    #[must_use]
801    pub fn new(tag_name: &str, batch_size: usize) -> Self {
802        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
803        let tag_bytes = tag_name.as_bytes();
804
805        Self {
806            tag: [tag_bytes[0], tag_bytes[1]],
807            batch_size: batch_size.max(1),
808            current_mi: None,
809            current_records: Vec::new(),
810            pending_groups: VecDeque::new(),
811            finished: false,
812            mi_transform: None,
813            record_filter: None,
814        }
815    }
816
817    /// Create a `RawMiGrouper` with record filtering and MI tag transformation.
818    ///
819    /// # Panics
820    ///
821    /// Panics if `tag_name` is not exactly 2 characters.
822    pub fn with_filter_and_transform<F, T>(
823        tag_name: &str,
824        batch_size: usize,
825        record_filter: F,
826        mi_transform: T,
827    ) -> Self
828    where
829        F: Fn(&[u8]) -> bool + Send + Sync + 'static,
830        T: Fn(&[u8]) -> String + Send + Sync + 'static,
831    {
832        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
833        let tag_bytes = tag_name.as_bytes();
834
835        Self {
836            tag: [tag_bytes[0], tag_bytes[1]],
837            batch_size: batch_size.max(1),
838            current_mi: None,
839            current_records: Vec::new(),
840            pending_groups: VecDeque::new(),
841            finished: false,
842            mi_transform: Some(Box::new(mi_transform)),
843            record_filter: Some(Box::new(record_filter)),
844        }
845    }
846
847    /// Get MI tag from raw BAM bytes, optionally applying transformation.
848    fn get_mi_tag(&self, bam: &[u8]) -> Option<String> {
849        use crate::sort::bam_fields;
850        let value = bam_fields::find_string_tag_in_record(bam, &self.tag)?;
851        if let Some(ref transform) = self.mi_transform {
852            Some(transform(value))
853        } else {
854            Some(String::from_utf8_lossy(value).into_owned())
855        }
856    }
857
858    /// Check if a raw record passes the filter.
859    fn should_keep(&self, bam: &[u8]) -> bool {
860        match &self.record_filter {
861            Some(filter) => filter(bam),
862            None => true,
863        }
864    }
865
866    /// Flush current MI group to pending.
867    fn flush_current_group(&mut self) {
868        if let Some(mi) = self.current_mi.take() {
869            if !self.current_records.is_empty() {
870                let records = std::mem::take(&mut self.current_records);
871                self.pending_groups.push_back(RawMiGroup::new(mi, records));
872            }
873        }
874    }
875
876    /// Try to form complete batches from pending groups.
877    fn drain_batches(&mut self) -> Vec<RawMiGroupBatch> {
878        let mut batches = Vec::new();
879        while self.pending_groups.len() >= self.batch_size {
880            let groups: Vec<RawMiGroup> = self.pending_groups.drain(..self.batch_size).collect();
881            batches.push(RawMiGroupBatch { groups });
882        }
883        batches
884    }
885}
886
887impl Grouper for RawMiGrouper {
888    type Group = RawMiGroupBatch;
889
890    fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
891        for decoded in records {
892            let raw = decoded.into_raw_bytes().ok_or_else(|| {
893                io::Error::new(io::ErrorKind::InvalidData, "RawMiGrouper requires raw byte records")
894            })?;
895
896            // Apply record filter if configured
897            if !self.should_keep(&raw) {
898                continue;
899            }
900
901            // Get MI tag from raw bytes (use empty string if absent, matching MiGrouper behavior)
902            let mi = self.get_mi_tag(&raw).unwrap_or_default();
903
904            // Check if this starts a new MI group
905            match &self.current_mi {
906                Some(current) if current == &mi => {
907                    self.current_records.push(raw);
908                }
909                Some(_) => {
910                    self.flush_current_group();
911                    self.current_mi = Some(mi);
912                    self.current_records.push(raw);
913                }
914                None => {
915                    self.current_mi = Some(mi);
916                    self.current_records.push(raw);
917                }
918            }
919        }
920
921        Ok(self.drain_batches())
922    }
923
924    fn finish(&mut self) -> io::Result<Option<Self::Group>> {
925        if self.finished {
926            return Ok(None);
927        }
928        self.finished = true;
929
930        self.flush_current_group();
931
932        if self.pending_groups.is_empty() {
933            Ok(None)
934        } else {
935            let groups: Vec<RawMiGroup> = self.pending_groups.drain(..).collect();
936            Ok(Some(RawMiGroupBatch { groups }))
937        }
938    }
939
940    fn has_pending(&self) -> bool {
941        !self.pending_groups.is_empty() || self.current_mi.is_some()
942    }
943}
944
945/// An iterator that groups consecutive raw BAM records by their MI tag.
946///
947/// This is the raw-byte equivalent of `MiGroupIterator` for the single-threaded path.
948#[allow(clippy::type_complexity)]
949pub struct RawMiGroupIterator<I>
950where
951    I: Iterator<Item = Result<Vec<u8>>>,
952{
953    record_iter: I,
954    tag: [u8; 2],
955    /// Optional cell barcode tag for composite grouping (MI + cell barcode)
956    cell_tag: Option<[u8; 2]>,
957    current_mi: Option<String>,
958    current_group: Vec<Vec<u8>>,
959    done: bool,
960    /// Pending error to return after flushing a group
961    pending_error: Option<anyhow::Error>,
962    /// Optional MI tag transformation function
963    mi_transform: Option<Box<dyn Fn(&[u8]) -> String>>,
964}
965
966impl<I> RawMiGroupIterator<I>
967where
968    I: Iterator<Item = Result<Vec<u8>>>,
969{
970    /// Creates a new `RawMiGroupIterator`.
971    ///
972    /// # Panics
973    ///
974    /// Panics if `tag_name` is not exactly 2 characters.
975    pub fn new(record_iter: I, tag_name: &str) -> Self {
976        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
977        let tag_bytes = tag_name.as_bytes();
978        Self {
979            record_iter,
980            tag: [tag_bytes[0], tag_bytes[1]],
981            cell_tag: None,
982            current_mi: None,
983            current_group: Vec::new(),
984            done: false,
985            pending_error: None,
986            mi_transform: None,
987        }
988    }
989
990    /// Creates a new `RawMiGroupIterator` with MI tag transformation.
991    ///
992    /// # Panics
993    ///
994    /// Panics if `tag_name` is not exactly 2 characters.
995    pub fn with_transform<F>(record_iter: I, tag_name: &str, mi_transform: F) -> Self
996    where
997        F: Fn(&[u8]) -> String + 'static,
998    {
999        assert!(tag_name.len() == 2, "Tag name must be exactly 2 characters");
1000        let tag_bytes = tag_name.as_bytes();
1001        Self {
1002            record_iter,
1003            tag: [tag_bytes[0], tag_bytes[1]],
1004            cell_tag: None,
1005            current_mi: None,
1006            current_group: Vec::new(),
1007            done: false,
1008            pending_error: None,
1009            mi_transform: Some(Box::new(mi_transform)),
1010        }
1011    }
1012
1013    /// Sets an optional cell barcode tag for composite grouping.
1014    ///
1015    /// When set, the group key becomes `"MI_VALUE\tCELL_VALUE"` so that reads from
1016    /// different cells with the same MI tag are placed in separate groups.
1017    #[must_use]
1018    pub fn with_cell_tag(mut self, cell_tag: Option<[u8; 2]>) -> Self {
1019        self.cell_tag = cell_tag;
1020        self
1021    }
1022
1023    /// Extracts the group key from raw BAM bytes.
1024    ///
1025    /// When `cell_tag` is set, returns a composite key of `"MI\tCELL"`.
1026    /// Otherwise returns just the MI tag value.
1027    fn get_mi(&self, bam: &[u8]) -> Option<String> {
1028        use crate::sort::bam_fields;
1029        let value = bam_fields::find_string_tag_in_record(bam, &self.tag)?;
1030        let mut key = if let Some(ref transform) = self.mi_transform {
1031            transform(value)
1032        } else {
1033            String::from_utf8_lossy(value).into_owned()
1034        };
1035        if let Some(ct) = &self.cell_tag {
1036            key.push('\t');
1037            if let Some(cell_value) = bam_fields::find_string_tag_in_record(bam, ct) {
1038                key.push_str(&String::from_utf8_lossy(cell_value));
1039            }
1040        }
1041        Some(key)
1042    }
1043}
1044
1045impl<I> Iterator for RawMiGroupIterator<I>
1046where
1047    I: Iterator<Item = Result<Vec<u8>>>,
1048{
1049    type Item = Result<(String, Vec<Vec<u8>>)>;
1050
1051    fn next(&mut self) -> Option<Self::Item> {
1052        if self.done {
1053            return None;
1054        }
1055
1056        // Check for pending error (return after we've flushed any pending group)
1057        if let Some(e) = self.pending_error.take() {
1058            self.done = true;
1059            return Some(Err(e));
1060        }
1061
1062        loop {
1063            match self.record_iter.next() {
1064                None => {
1065                    self.done = true;
1066                    if self.current_group.is_empty() {
1067                        return None;
1068                    }
1069                    let mi = self.current_mi.take().unwrap_or_default();
1070                    let group = std::mem::take(&mut self.current_group);
1071                    return Some(Ok((mi, group)));
1072                }
1073                Some(Err(e)) => {
1074                    if !self.current_group.is_empty() {
1075                        self.pending_error = Some(e);
1076                        let mi = self.current_mi.take().unwrap_or_default();
1077                        let group = std::mem::take(&mut self.current_group);
1078                        return Some(Ok((mi, group)));
1079                    }
1080                    self.done = true;
1081                    return Some(Err(e));
1082                }
1083                Some(Ok(raw)) => {
1084                    let Some(mi) = self.get_mi(&raw) else {
1085                        continue;
1086                    };
1087
1088                    if self.current_group.is_empty() {
1089                        self.current_mi = Some(mi);
1090                        self.current_group.push(raw);
1091                    } else if self.current_mi.as_ref() == Some(&mi) {
1092                        self.current_group.push(raw);
1093                    } else {
1094                        let old_mi = self.current_mi.take().unwrap_or_default();
1095                        let group = std::mem::take(&mut self.current_group);
1096                        self.current_mi = Some(mi);
1097                        self.current_group.push(raw);
1098                        return Some(Ok((old_mi, group)));
1099                    }
1100                }
1101            }
1102        }
1103    }
1104}
1105
1106#[cfg(test)]
1107#[allow(clippy::similar_names)]
1108mod tests {
1109    use super::*;
1110    use crate::sam::builder::RecordBuilder;
1111    use crate::umi::extract_mi_base;
1112
1113    fn create_record_with_mi(mi: &str) -> RecordBuf {
1114        RecordBuilder::new()
1115            .sequence("ACGT") // Minimal sequence
1116            .tag("MI", mi)
1117            .build()
1118    }
1119
1120    fn create_record_without_mi() -> RecordBuf {
1121        RecordBuilder::new()
1122            .sequence("ACGT") // Minimal sequence
1123            .build()
1124    }
1125
1126    #[test]
1127    fn test_empty_iterator() {
1128        let records: Vec<Result<RecordBuf>> = vec![];
1129        let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1130        assert!(iter.next().is_none());
1131    }
1132
1133    #[test]
1134    fn test_single_group() {
1135        let records: Vec<Result<RecordBuf>> = vec![
1136            Ok(create_record_with_mi("0")),
1137            Ok(create_record_with_mi("0")),
1138            Ok(create_record_with_mi("0")),
1139        ];
1140        let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1141
1142        let result = iter.next().unwrap().unwrap();
1143        assert_eq!(result.0, "0");
1144        assert_eq!(result.1.len(), 3);
1145
1146        assert!(iter.next().is_none());
1147    }
1148
1149    #[test]
1150    fn test_multiple_groups() {
1151        let records: Vec<Result<RecordBuf>> = vec![
1152            Ok(create_record_with_mi("0")),
1153            Ok(create_record_with_mi("0")),
1154            Ok(create_record_with_mi("1")),
1155            Ok(create_record_with_mi("1")),
1156            Ok(create_record_with_mi("1")),
1157            Ok(create_record_with_mi("2")),
1158        ];
1159        let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1160
1161        let result = iter.next().unwrap().unwrap();
1162        assert_eq!(result.0, "0");
1163        assert_eq!(result.1.len(), 2);
1164
1165        let result = iter.next().unwrap().unwrap();
1166        assert_eq!(result.0, "1");
1167        assert_eq!(result.1.len(), 3);
1168
1169        let result = iter.next().unwrap().unwrap();
1170        assert_eq!(result.0, "2");
1171        assert_eq!(result.1.len(), 1);
1172
1173        assert!(iter.next().is_none());
1174    }
1175
1176    #[test]
1177    fn test_skips_records_without_mi_tag() {
1178        let records: Vec<Result<RecordBuf>> = vec![
1179            Ok(create_record_with_mi("0")),
1180            Ok(create_record_without_mi()),
1181            Ok(create_record_with_mi("0")),
1182            Ok(create_record_without_mi()),
1183            Ok(create_record_with_mi("1")),
1184        ];
1185        let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1186
1187        let result = iter.next().unwrap().unwrap();
1188        assert_eq!(result.0, "0");
1189        assert_eq!(result.1.len(), 2); // Skipped record without MI
1190
1191        let result = iter.next().unwrap().unwrap();
1192        assert_eq!(result.0, "1");
1193        assert_eq!(result.1.len(), 1);
1194
1195        assert!(iter.next().is_none());
1196    }
1197
1198    #[test]
1199    fn test_error_propagation() {
1200        let records: Vec<Result<RecordBuf>> = vec![
1201            Ok(create_record_with_mi("0")),
1202            Err(anyhow::anyhow!("test error")),
1203            Ok(create_record_with_mi("1")),
1204        ];
1205        let mut iter = MiGroupIterator::new(records.into_iter(), "MI");
1206
1207        // First group before error
1208        let result = iter.next().unwrap().unwrap();
1209        assert_eq!(result.0, "0");
1210        assert_eq!(result.1.len(), 1);
1211
1212        // Error
1213        let result = iter.next().unwrap();
1214        assert!(result.is_err());
1215
1216        // Iterator should be done after error
1217        assert!(iter.next().is_none());
1218    }
1219
1220    #[test]
1221    fn test_custom_tag() {
1222        let record1 = RecordBuilder::new().sequence("ACGT").tag("RX", "ACGT").build();
1223
1224        let record2 = RecordBuilder::new().sequence("ACGT").tag("RX", "ACGT").build();
1225
1226        let records: Vec<Result<RecordBuf>> = vec![Ok(record1), Ok(record2)];
1227        let mut iter = MiGroupIterator::new(records.into_iter(), "RX");
1228
1229        let result = iter.next().unwrap().unwrap();
1230        assert_eq!(result.0, "ACGT");
1231        assert_eq!(result.1.len(), 2);
1232
1233        assert!(iter.next().is_none());
1234    }
1235
1236    #[test]
1237    #[should_panic(expected = "Tag name must be exactly 2 characters")]
1238    fn test_invalid_tag_length() {
1239        let records: Vec<Result<RecordBuf>> = vec![];
1240        let _ = MiGroupIterator::new(records.into_iter(), "M");
1241    }
1242
1243    // Tests for MiGroupIteratorWithTransform
1244    #[test]
1245    fn test_transform_groups_by_base_mi() {
1246        // Simulate duplex reads: 1/A, 1/A, 1/B, 1/B, 2/A, 2/B
1247        let records: Vec<Result<RecordBuf>> = vec![
1248            Ok(create_record_with_mi("1/A")),
1249            Ok(create_record_with_mi("1/A")),
1250            Ok(create_record_with_mi("1/B")),
1251            Ok(create_record_with_mi("1/B")),
1252            Ok(create_record_with_mi("2/A")),
1253            Ok(create_record_with_mi("2/B")),
1254        ];
1255        let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1256            extract_mi_base(mi).to_string()
1257        });
1258
1259        // First group: base MI "1" with 4 reads (2 /A + 2 /B)
1260        let result = iter.next().unwrap().unwrap();
1261        assert_eq!(result.0, "1");
1262        assert_eq!(result.1.len(), 4);
1263
1264        // Second group: base MI "2" with 2 reads (1 /A + 1 /B)
1265        let result = iter.next().unwrap().unwrap();
1266        assert_eq!(result.0, "2");
1267        assert_eq!(result.1.len(), 2);
1268
1269        assert!(iter.next().is_none());
1270    }
1271
1272    #[test]
1273    fn test_transform_empty_iterator() {
1274        let records: Vec<Result<RecordBuf>> = vec![];
1275        let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1276            extract_mi_base(mi).to_string()
1277        });
1278        assert!(iter.next().is_none());
1279    }
1280
1281    #[test]
1282    fn test_transform_single_group() {
1283        let records: Vec<Result<RecordBuf>> = vec![
1284            Ok(create_record_with_mi("0/A")),
1285            Ok(create_record_with_mi("0/B")),
1286            Ok(create_record_with_mi("0/A")),
1287        ];
1288        let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1289            extract_mi_base(mi).to_string()
1290        });
1291
1292        let result = iter.next().unwrap().unwrap();
1293        assert_eq!(result.0, "0");
1294        assert_eq!(result.1.len(), 3);
1295
1296        assert!(iter.next().is_none());
1297    }
1298
1299    #[test]
1300    fn test_transform_error_propagation() {
1301        let records: Vec<Result<RecordBuf>> = vec![
1302            Ok(create_record_with_mi("0/A")),
1303            Err(anyhow::anyhow!("test error")),
1304            Ok(create_record_with_mi("1/B")),
1305        ];
1306        let mut iter = MiGroupIteratorWithTransform::new(records.into_iter(), "MI", |mi| {
1307            extract_mi_base(mi).to_string()
1308        });
1309
1310        // First group before error
1311        let result = iter.next().unwrap().unwrap();
1312        assert_eq!(result.0, "0");
1313        assert_eq!(result.1.len(), 1);
1314
1315        // Error
1316        let result = iter.next().unwrap();
1317        assert!(result.is_err());
1318
1319        // Iterator should be done after error
1320        assert!(iter.next().is_none());
1321    }
1322
1323    #[test]
1324    fn test_transform_custom_function() {
1325        // Test with a custom transformation function (uppercase)
1326        let records: Vec<Result<RecordBuf>> = vec![
1327            Ok(create_record_with_mi("abc")),
1328            Ok(create_record_with_mi("ABC")),
1329            Ok(create_record_with_mi("Abc")),
1330        ];
1331        let mut iter =
1332            MiGroupIteratorWithTransform::new(records.into_iter(), "MI", str::to_uppercase);
1333
1334        // All should be grouped together since they uppercase to "ABC"
1335        let result = iter.next().unwrap().unwrap();
1336        assert_eq!(result.0, "ABC");
1337        assert_eq!(result.1.len(), 3);
1338
1339        assert!(iter.next().is_none());
1340    }
1341
1342    // ========================================================================
1343    // Helpers for raw BAM byte construction
1344    // ========================================================================
1345
1346    // Build a minimal raw BAM record with the given string tag.
1347    //
1348    // The BAM binary format (after the 4-byte block_size prefix, which we omit):
1349    //   bytes 0..4   : refID  (i32 LE)
1350    //   bytes 4..8   : pos    (i32 LE)
1351    //   byte  8      : l_read_name (u8, includes NUL)
1352    //   byte  9      : mapq
1353    //   bytes 10..12 : bin    (u16 LE)
1354    //   bytes 12..14 : n_cigar_op (u16 LE)
1355    //   bytes 14..16 : flag   (u16 LE)
1356    //   bytes 16..20 : l_seq  (u32 LE)
1357    //   bytes 20..24 : next_refID (i32 LE)
1358    //   bytes 24..28 : next_pos   (i32 LE)
1359    //   bytes 28..32 : tlen       (i32 LE)
1360    //   then: read_name (l_read_name bytes, NUL-terminated)
1361    //   then: cigar    (n_cigar_op * 4 bytes)
1362    //   then: seq      (ceil(l_seq/2) bytes, 4-bit encoded)
1363    //   then: qual     (l_seq bytes)
1364    //   then: aux data
1365    #[allow(clippy::cast_possible_truncation)]
1366    fn make_raw_bam_with_tag(tag_name: &str, tag_value: &str) -> Vec<u8> {
1367        let name = b"read";
1368        let l_read_name: u8 = (name.len() + 1) as u8; // +1 for NUL
1369        let seq_len: u32 = 4; // 4 bases (ACGT)
1370        let seq_bytes = seq_len.div_ceil(2) as usize;
1371
1372        // Aux data: tag[0] tag[1] 'Z' value NUL
1373        let tag_bytes = tag_name.as_bytes();
1374        let aux: Vec<u8> =
1375            [&[tag_bytes[0], tag_bytes[1], b'Z'], tag_value.as_bytes(), &[0u8]].concat();
1376
1377        let total = 32 + l_read_name as usize + seq_bytes + seq_len as usize + aux.len();
1378        let mut buf = vec![0u8; total];
1379
1380        // refID = -1 (unmapped)
1381        buf[0..4].copy_from_slice(&(-1i32).to_le_bytes());
1382        // pos = -1 (unmapped)
1383        buf[4..8].copy_from_slice(&(-1i32).to_le_bytes());
1384        // l_read_name
1385        buf[8] = l_read_name;
1386        // n_cigar_op = 0
1387        buf[12..14].copy_from_slice(&0u16.to_le_bytes());
1388        // l_seq
1389        buf[16..20].copy_from_slice(&seq_len.to_le_bytes());
1390        // next_refID = -1
1391        buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
1392        // next_pos = -1
1393        buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
1394
1395        // read_name + NUL
1396        let name_start = 32;
1397        buf[name_start..name_start + name.len()].copy_from_slice(name);
1398        buf[name_start + name.len()] = 0;
1399
1400        // seq and qual are left as zeros
1401
1402        // aux data
1403        let aux_start = 32 + l_read_name as usize + seq_bytes + seq_len as usize;
1404        buf[aux_start..aux_start + aux.len()].copy_from_slice(&aux);
1405
1406        buf
1407    }
1408
1409    /// Build a minimal raw BAM record with no aux tags.
1410    #[allow(clippy::cast_possible_truncation)]
1411    fn make_raw_bam_without_tag() -> Vec<u8> {
1412        let name = b"read";
1413        let l_read_name: u8 = (name.len() + 1) as u8;
1414        let seq_len: u32 = 4;
1415        let seq_bytes = seq_len.div_ceil(2) as usize;
1416
1417        let total = 32 + l_read_name as usize + seq_bytes + seq_len as usize;
1418        let mut buf = vec![0u8; total];
1419
1420        buf[0..4].copy_from_slice(&(-1i32).to_le_bytes());
1421        buf[4..8].copy_from_slice(&(-1i32).to_le_bytes());
1422        buf[8] = l_read_name;
1423        buf[12..14].copy_from_slice(&0u16.to_le_bytes());
1424        buf[16..20].copy_from_slice(&seq_len.to_le_bytes());
1425        buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
1426        buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
1427
1428        let name_start = 32;
1429        buf[name_start..name_start + name.len()].copy_from_slice(name);
1430        buf[name_start + name.len()] = 0;
1431
1432        buf
1433    }
1434
1435    // ========================================================================
1436    // RawMiGroupIterator tests
1437    // ========================================================================
1438
1439    #[test]
1440    fn test_raw_empty_iterator() {
1441        let records: Vec<Result<Vec<u8>>> = vec![];
1442        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1443        assert!(iter.next().is_none());
1444    }
1445
1446    #[test]
1447    fn test_raw_single_group() {
1448        let records: Vec<Result<Vec<u8>>> = vec![
1449            Ok(make_raw_bam_with_tag("MI", "0")),
1450            Ok(make_raw_bam_with_tag("MI", "0")),
1451            Ok(make_raw_bam_with_tag("MI", "0")),
1452        ];
1453        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1454
1455        let result = iter.next().unwrap().unwrap();
1456        assert_eq!(result.0, "0");
1457        assert_eq!(result.1.len(), 3);
1458
1459        assert!(iter.next().is_none());
1460    }
1461
1462    #[test]
1463    fn test_raw_multiple_groups() {
1464        let records: Vec<Result<Vec<u8>>> = vec![
1465            Ok(make_raw_bam_with_tag("MI", "0")),
1466            Ok(make_raw_bam_with_tag("MI", "0")),
1467            Ok(make_raw_bam_with_tag("MI", "1")),
1468            Ok(make_raw_bam_with_tag("MI", "1")),
1469            Ok(make_raw_bam_with_tag("MI", "1")),
1470            Ok(make_raw_bam_with_tag("MI", "2")),
1471        ];
1472        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1473
1474        let result = iter.next().unwrap().unwrap();
1475        assert_eq!(result.0, "0");
1476        assert_eq!(result.1.len(), 2);
1477
1478        let result = iter.next().unwrap().unwrap();
1479        assert_eq!(result.0, "1");
1480        assert_eq!(result.1.len(), 3);
1481
1482        let result = iter.next().unwrap().unwrap();
1483        assert_eq!(result.0, "2");
1484        assert_eq!(result.1.len(), 1);
1485
1486        assert!(iter.next().is_none());
1487    }
1488
1489    #[test]
1490    fn test_raw_skips_records_without_mi_tag() {
1491        let records: Vec<Result<Vec<u8>>> = vec![
1492            Ok(make_raw_bam_with_tag("MI", "0")),
1493            Ok(make_raw_bam_without_tag()),
1494            Ok(make_raw_bam_with_tag("MI", "0")),
1495            Ok(make_raw_bam_without_tag()),
1496            Ok(make_raw_bam_with_tag("MI", "1")),
1497        ];
1498        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1499
1500        let result = iter.next().unwrap().unwrap();
1501        assert_eq!(result.0, "0");
1502        assert_eq!(result.1.len(), 2); // Skipped records without MI
1503
1504        let result = iter.next().unwrap().unwrap();
1505        assert_eq!(result.0, "1");
1506        assert_eq!(result.1.len(), 1);
1507
1508        assert!(iter.next().is_none());
1509    }
1510
1511    #[test]
1512    fn test_raw_error_propagation() {
1513        let records: Vec<Result<Vec<u8>>> = vec![
1514            Ok(make_raw_bam_with_tag("MI", "0")),
1515            Err(anyhow::anyhow!("test error")),
1516            Ok(make_raw_bam_with_tag("MI", "1")),
1517        ];
1518        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1519
1520        // First group before error
1521        let result = iter.next().unwrap().unwrap();
1522        assert_eq!(result.0, "0");
1523        assert_eq!(result.1.len(), 1);
1524
1525        // Error should be returned after flushing the pending group
1526        let err = iter.next().unwrap();
1527        assert!(err.is_err());
1528
1529        assert!(iter.next().is_none());
1530    }
1531
1532    #[test]
1533    fn test_raw_error_with_no_pending_group() {
1534        let records: Vec<Result<Vec<u8>>> =
1535            vec![Err(anyhow::anyhow!("immediate error")), Ok(make_raw_bam_with_tag("MI", "0"))];
1536        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI");
1537
1538        // Error should be returned directly
1539        let result = iter.next().unwrap();
1540        assert!(result.is_err());
1541
1542        // Iterator should be done after error
1543        assert!(iter.next().is_none());
1544    }
1545
1546    #[test]
1547    fn test_raw_custom_tag() {
1548        let records: Vec<Result<Vec<u8>>> =
1549            vec![Ok(make_raw_bam_with_tag("RX", "ACGT")), Ok(make_raw_bam_with_tag("RX", "ACGT"))];
1550        let mut iter = RawMiGroupIterator::new(records.into_iter(), "RX");
1551
1552        let result = iter.next().unwrap().unwrap();
1553        assert_eq!(result.0, "ACGT");
1554        assert_eq!(result.1.len(), 2);
1555
1556        assert!(iter.next().is_none());
1557    }
1558
1559    #[test]
1560    #[should_panic(expected = "Tag name must be exactly 2 characters")]
1561    fn test_raw_invalid_tag_length() {
1562        let records: Vec<Result<Vec<u8>>> = vec![];
1563        let _ = RawMiGroupIterator::new(records.into_iter(), "M");
1564    }
1565
1566    #[test]
1567    fn test_raw_with_transform() {
1568        // Simulate duplex reads: 1/A, 1/B should group under "1"
1569        let records: Vec<Result<Vec<u8>>> = vec![
1570            Ok(make_raw_bam_with_tag("MI", "1/A")),
1571            Ok(make_raw_bam_with_tag("MI", "1/A")),
1572            Ok(make_raw_bam_with_tag("MI", "1/B")),
1573            Ok(make_raw_bam_with_tag("MI", "1/B")),
1574            Ok(make_raw_bam_with_tag("MI", "2/A")),
1575            Ok(make_raw_bam_with_tag("MI", "2/B")),
1576        ];
1577        let mut iter = RawMiGroupIterator::with_transform(records.into_iter(), "MI", |raw| {
1578            let s = String::from_utf8_lossy(raw);
1579            extract_mi_base(&s).to_string()
1580        });
1581
1582        // First group: base MI "1" with 4 reads
1583        let result = iter.next().unwrap().unwrap();
1584        assert_eq!(result.0, "1");
1585        assert_eq!(result.1.len(), 4);
1586
1587        // Second group: base MI "2" with 2 reads
1588        let result = iter.next().unwrap().unwrap();
1589        assert_eq!(result.0, "2");
1590        assert_eq!(result.1.len(), 2);
1591
1592        assert!(iter.next().is_none());
1593    }
1594
1595    #[test]
1596    fn test_raw_with_transform_empty() {
1597        let records: Vec<Result<Vec<u8>>> = vec![];
1598        let mut iter = RawMiGroupIterator::with_transform(records.into_iter(), "MI", |raw| {
1599            String::from_utf8_lossy(raw).to_uppercase()
1600        });
1601        assert!(iter.next().is_none());
1602    }
1603
1604    #[test]
1605    #[should_panic(expected = "Tag name must be exactly 2 characters")]
1606    fn test_raw_with_transform_invalid_tag_length() {
1607        let records: Vec<Result<Vec<u8>>> = vec![];
1608        let _ = RawMiGroupIterator::with_transform(records.into_iter(), "ABC", |raw| {
1609            String::from_utf8_lossy(raw).into_owned()
1610        });
1611    }
1612
1613    #[test]
1614    fn test_raw_get_mi_without_transform() {
1615        let iter = RawMiGroupIterator::new(std::iter::empty::<Result<Vec<u8>>>(), "MI");
1616        let bam = make_raw_bam_with_tag("MI", "42");
1617        assert_eq!(iter.get_mi(&bam), Some("42".to_string()));
1618    }
1619
1620    #[test]
1621    fn test_raw_get_mi_with_transform() {
1622        let iter = RawMiGroupIterator::with_transform(
1623            std::iter::empty::<Result<Vec<u8>>>(),
1624            "MI",
1625            |raw| {
1626                let s = String::from_utf8_lossy(raw);
1627                s.to_uppercase()
1628            },
1629        );
1630        let bam = make_raw_bam_with_tag("MI", "abc");
1631        assert_eq!(iter.get_mi(&bam), Some("ABC".to_string()));
1632    }
1633
1634    #[test]
1635    fn test_raw_get_mi_missing_tag() {
1636        let iter = RawMiGroupIterator::new(std::iter::empty::<Result<Vec<u8>>>(), "MI");
1637        let bam = make_raw_bam_without_tag();
1638        assert_eq!(iter.get_mi(&bam), None);
1639    }
1640
1641    #[test]
1642    fn test_raw_get_mi_wrong_tag() {
1643        let iter = RawMiGroupIterator::new(std::iter::empty::<Result<Vec<u8>>>(), "MI");
1644        let bam = make_raw_bam_with_tag("RX", "ACGT");
1645        assert_eq!(iter.get_mi(&bam), None);
1646    }
1647
1648    /// Build a minimal raw BAM record with two string tags.
1649    #[allow(clippy::cast_possible_truncation)]
1650    fn make_raw_bam_with_two_tags(tag1: &str, val1: &str, tag2: &str, val2: &str) -> Vec<u8> {
1651        let name = b"read";
1652        let l_read_name: u8 = (name.len() + 1) as u8;
1653        let seq_len: u32 = 4;
1654        let seq_bytes = seq_len.div_ceil(2) as usize;
1655
1656        let t1 = tag1.as_bytes();
1657        let t2 = tag2.as_bytes();
1658        let aux: Vec<u8> = [
1659            &[t1[0], t1[1], b'Z'],
1660            val1.as_bytes(),
1661            &[0u8],
1662            &[t2[0], t2[1], b'Z'],
1663            val2.as_bytes(),
1664            &[0u8],
1665        ]
1666        .concat();
1667
1668        let total = 32 + l_read_name as usize + seq_bytes + seq_len as usize + aux.len();
1669        let mut buf = vec![0u8; total];
1670
1671        buf[0..4].copy_from_slice(&(-1i32).to_le_bytes());
1672        buf[4..8].copy_from_slice(&(-1i32).to_le_bytes());
1673        buf[8] = l_read_name;
1674        buf[12..14].copy_from_slice(&0u16.to_le_bytes());
1675        buf[16..20].copy_from_slice(&seq_len.to_le_bytes());
1676        buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
1677        buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
1678
1679        let name_start = 32;
1680        buf[name_start..name_start + name.len()].copy_from_slice(name);
1681        buf[name_start + name.len()] = 0;
1682
1683        let aux_start = 32 + l_read_name as usize + seq_bytes + seq_len as usize;
1684        buf[aux_start..aux_start + aux.len()].copy_from_slice(&aux);
1685
1686        buf
1687    }
1688
1689    #[test]
1690    fn test_raw_cell_tag_composite_grouping() {
1691        // Same MI but different cell barcodes should form separate groups
1692        let records: Vec<Result<Vec<u8>>> = vec![
1693            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1694            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1695            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "TGCA")),
1696            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "TGCA")),
1697        ];
1698        let mut iter =
1699            RawMiGroupIterator::new(records.into_iter(), "MI").with_cell_tag(Some([b'C', b'B']));
1700
1701        // First group: MI=1, CB=ACGT
1702        let result = iter.next().unwrap().unwrap();
1703        assert_eq!(result.0, "1\tACGT");
1704        assert_eq!(result.1.len(), 2);
1705
1706        // Second group: MI=1, CB=TGCA
1707        let result = iter.next().unwrap().unwrap();
1708        assert_eq!(result.0, "1\tTGCA");
1709        assert_eq!(result.1.len(), 2);
1710
1711        assert!(iter.next().is_none());
1712    }
1713
1714    #[test]
1715    fn test_raw_cell_tag_none_groups_by_mi_only() {
1716        // Without cell_tag, same MI records group together regardless of other tags
1717        let records: Vec<Result<Vec<u8>>> = vec![
1718            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1719            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "TGCA")),
1720        ];
1721        let mut iter = RawMiGroupIterator::new(records.into_iter(), "MI").with_cell_tag(None);
1722
1723        let result = iter.next().unwrap().unwrap();
1724        assert_eq!(result.0, "1");
1725        assert_eq!(result.1.len(), 2); // Both records in same group
1726
1727        assert!(iter.next().is_none());
1728    }
1729
1730    #[test]
1731    fn test_raw_cell_tag_missing_cell_value() {
1732        // Records with MI but no cell tag get grouped under "MI\t" (empty cell)
1733        let records: Vec<Result<Vec<u8>>> = vec![
1734            Ok(make_raw_bam_with_tag("MI", "1")),
1735            Ok(make_raw_bam_with_tag("MI", "1")),
1736            Ok(make_raw_bam_with_two_tags("MI", "1", "CB", "ACGT")),
1737        ];
1738        let mut iter =
1739            RawMiGroupIterator::new(records.into_iter(), "MI").with_cell_tag(Some([b'C', b'B']));
1740
1741        // First group: MI=1, no CB (key = "1\t")
1742        let result = iter.next().unwrap().unwrap();
1743        assert_eq!(result.0, "1\t");
1744        assert_eq!(result.1.len(), 2);
1745
1746        // Second group: MI=1, CB=ACGT (key = "1\tACGT")
1747        let result = iter.next().unwrap().unwrap();
1748        assert_eq!(result.0, "1\tACGT");
1749        assert_eq!(result.1.len(), 1);
1750
1751        assert!(iter.next().is_none());
1752    }
1753
1754    #[test]
1755    fn test_raw_cell_tag_with_transform() {
1756        // Cell tag should work with MI transform (duplex-style /A /B stripping)
1757        let records: Vec<Result<Vec<u8>>> = vec![
1758            Ok(make_raw_bam_with_two_tags("MI", "1/A", "CB", "ACGT")),
1759            Ok(make_raw_bam_with_two_tags("MI", "1/B", "CB", "ACGT")),
1760            Ok(make_raw_bam_with_two_tags("MI", "1/A", "CB", "TGCA")),
1761        ];
1762        let mut iter = RawMiGroupIterator::with_transform(records.into_iter(), "MI", |raw| {
1763            let s = String::from_utf8_lossy(raw);
1764            extract_mi_base(&s).to_string()
1765        })
1766        .with_cell_tag(Some([b'C', b'B']));
1767
1768        // All three have base MI "1", but different cells
1769        // First group: base MI=1, CB=ACGT (2 records: 1/A and 1/B)
1770        let result = iter.next().unwrap().unwrap();
1771        assert_eq!(result.0, "1\tACGT");
1772        assert_eq!(result.1.len(), 2);
1773
1774        // Second group: base MI=1, CB=TGCA
1775        let result = iter.next().unwrap().unwrap();
1776        assert_eq!(result.0, "1\tTGCA");
1777        assert_eq!(result.1.len(), 1);
1778
1779        assert!(iter.next().is_none());
1780    }
1781
1782    // ========================================================================
1783    // RawMiGrouper tests (Grouper trait impl)
1784    // ========================================================================
1785
1786    /// Helper: create a `DecodedRecord` from raw bytes with a dummy group key.
1787    fn make_raw_decoded_record(tag_name: &str, tag_value: &str) -> DecodedRecord {
1788        let raw = make_raw_bam_with_tag(tag_name, tag_value);
1789        let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1790        DecodedRecord::from_raw_bytes(raw, key)
1791    }
1792
1793    /// Helper: create a `DecodedRecord` from raw bytes with no tags.
1794    fn make_raw_decoded_record_no_tag() -> DecodedRecord {
1795        let raw = make_raw_bam_without_tag();
1796        let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1797        DecodedRecord::from_raw_bytes(raw, key)
1798    }
1799
1800    /// Helper: create a parsed `DecodedRecord` (for testing error paths).
1801    fn make_parsed_decoded_record(mi: &str) -> DecodedRecord {
1802        let record = create_record_with_mi(mi);
1803        let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1804        DecodedRecord::new(record, key)
1805    }
1806
1807    #[test]
1808    fn test_raw_grouper_single_mi_group() {
1809        let mut grouper = RawMiGrouper::new("MI", 10);
1810
1811        let records = vec![
1812            make_raw_decoded_record("MI", "0"),
1813            make_raw_decoded_record("MI", "0"),
1814            make_raw_decoded_record("MI", "0"),
1815        ];
1816
1817        let batches = grouper.add_records(records).unwrap();
1818        // Only 1 MI group so far, batch_size=10, no complete batch yet
1819        assert!(batches.is_empty());
1820        assert!(grouper.has_pending());
1821
1822        let final_batch = grouper.finish().unwrap().unwrap();
1823        assert_eq!(final_batch.groups.len(), 1);
1824        assert_eq!(final_batch.groups[0].mi, "0");
1825        assert_eq!(final_batch.groups[0].records.len(), 3);
1826    }
1827
1828    #[test]
1829    fn test_raw_grouper_multiple_mi_groups() {
1830        let mut grouper = RawMiGrouper::new("MI", 10);
1831
1832        let records = vec![
1833            make_raw_decoded_record("MI", "0"),
1834            make_raw_decoded_record("MI", "0"),
1835            make_raw_decoded_record("MI", "1"),
1836            make_raw_decoded_record("MI", "1"),
1837            make_raw_decoded_record("MI", "1"),
1838            make_raw_decoded_record("MI", "2"),
1839        ];
1840
1841        let batches = grouper.add_records(records).unwrap();
1842        assert!(batches.is_empty()); // 3 groups < batch_size 10
1843
1844        let final_batch = grouper.finish().unwrap().unwrap();
1845        assert_eq!(final_batch.groups.len(), 3);
1846        assert_eq!(final_batch.groups[0].mi, "0");
1847        assert_eq!(final_batch.groups[0].records.len(), 2);
1848        assert_eq!(final_batch.groups[1].mi, "1");
1849        assert_eq!(final_batch.groups[1].records.len(), 3);
1850        assert_eq!(final_batch.groups[2].mi, "2");
1851        assert_eq!(final_batch.groups[2].records.len(), 1);
1852    }
1853
1854    #[test]
1855    fn test_raw_grouper_batch_size_triggers() {
1856        let mut grouper = RawMiGrouper::new("MI", 2);
1857
1858        let records = vec![
1859            make_raw_decoded_record("MI", "0"),
1860            make_raw_decoded_record("MI", "1"),
1861            make_raw_decoded_record("MI", "2"),
1862            make_raw_decoded_record("MI", "3"),
1863            make_raw_decoded_record("MI", "4"),
1864        ];
1865
1866        let batches = grouper.add_records(records).unwrap();
1867        // 5 MI values => 4 completed groups (0,1,2,3) while "4" is still current
1868        // batch_size=2 => 2 batches of 2 groups each
1869        assert_eq!(batches.len(), 2);
1870        assert_eq!(batches[0].groups.len(), 2);
1871        assert_eq!(batches[0].groups[0].mi, "0");
1872        assert_eq!(batches[0].groups[1].mi, "1");
1873        assert_eq!(batches[1].groups.len(), 2);
1874        assert_eq!(batches[1].groups[0].mi, "2");
1875        assert_eq!(batches[1].groups[1].mi, "3");
1876
1877        // "4" is still pending
1878        assert!(grouper.has_pending());
1879
1880        let final_batch = grouper.finish().unwrap().unwrap();
1881        assert_eq!(final_batch.groups.len(), 1);
1882        assert_eq!(final_batch.groups[0].mi, "4");
1883    }
1884
1885    #[test]
1886    fn test_raw_grouper_groups_records_without_mi_under_empty_key() {
1887        let mut grouper = RawMiGrouper::new("MI", 10);
1888
1889        let records = vec![
1890            make_raw_decoded_record("MI", "0"),
1891            make_raw_decoded_record_no_tag(),
1892            make_raw_decoded_record("MI", "0"),
1893        ];
1894
1895        let batches = grouper.add_records(records).unwrap();
1896        assert!(batches.is_empty());
1897
1898        // Records without MI are now grouped under "" (matching MiGrouper behavior)
1899        let final_batch = grouper.finish().unwrap().unwrap();
1900        assert_eq!(final_batch.groups.len(), 3);
1901        assert_eq!(final_batch.groups[0].mi, "0");
1902        assert_eq!(final_batch.groups[0].records.len(), 1);
1903        assert_eq!(final_batch.groups[1].mi, "");
1904        assert_eq!(final_batch.groups[1].records.len(), 1);
1905        assert_eq!(final_batch.groups[2].mi, "0");
1906        assert_eq!(final_batch.groups[2].records.len(), 1);
1907    }
1908
1909    #[test]
1910    fn test_raw_grouper_rejects_parsed_records() {
1911        let mut grouper = RawMiGrouper::new("MI", 10);
1912
1913        let records = vec![make_parsed_decoded_record("0")];
1914        let result = grouper.add_records(records);
1915        assert!(result.is_err());
1916    }
1917
1918    #[test]
1919    fn test_raw_grouper_finish_empty() {
1920        let mut grouper = RawMiGrouper::new("MI", 10);
1921        assert!(!grouper.has_pending());
1922
1923        let final_batch = grouper.finish().unwrap();
1924        assert!(final_batch.is_none());
1925    }
1926
1927    #[test]
1928    fn test_raw_grouper_finish_idempotent() {
1929        let mut grouper = RawMiGrouper::new("MI", 10);
1930
1931        let records = vec![make_raw_decoded_record("MI", "0")];
1932        grouper.add_records(records).unwrap();
1933
1934        let batch1 = grouper.finish().unwrap();
1935        assert!(batch1.is_some());
1936
1937        let batch2 = grouper.finish().unwrap();
1938        assert!(batch2.is_none());
1939    }
1940
1941    #[test]
1942    fn test_raw_grouper_has_pending_states() {
1943        let mut grouper = RawMiGrouper::new("MI", 10);
1944
1945        // Initially nothing pending
1946        assert!(!grouper.has_pending());
1947
1948        // After adding records, current_mi is set
1949        let records = vec![make_raw_decoded_record("MI", "0")];
1950        grouper.add_records(records).unwrap();
1951        assert!(grouper.has_pending());
1952
1953        // After adding a different MI, first group moves to pending_groups
1954        let records = vec![make_raw_decoded_record("MI", "1")];
1955        grouper.add_records(records).unwrap();
1956        assert!(grouper.has_pending());
1957    }
1958
1959    #[test]
1960    fn test_raw_grouper_with_filter_and_transform() {
1961        // Build records with flag field set: use byte 14..16 for flag.
1962        // We create a filter that checks the flag field for secondary alignment (0x100).
1963        let mut grouper = RawMiGrouper::with_filter_and_transform(
1964            "MI",
1965            10,
1966            |bam: &[u8]| {
1967                // Check flag field
1968                let flag = fgumi_raw_bam::flags(bam);
1969                flag & fgumi_raw_bam::flags::SECONDARY == 0 // keep if NOT secondary
1970            },
1971            |raw: &[u8]| {
1972                let s = String::from_utf8_lossy(raw);
1973                extract_mi_base(&s).to_string()
1974            },
1975        );
1976
1977        // Create records: 1/A (primary), 1/A (secondary flag), 1/B (primary)
1978        let rec_primary = make_raw_bam_with_tag("MI", "1/A");
1979        // rec_primary flag is 0 (primary) - default
1980
1981        let mut rec_secondary = make_raw_bam_with_tag("MI", "1/A");
1982        // Set secondary flag (0x100) at bytes 14..16
1983        rec_secondary[14..16].copy_from_slice(&0x100u16.to_le_bytes());
1984
1985        let rec_b = make_raw_bam_with_tag("MI", "1/B");
1986
1987        let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
1988        let records = vec![
1989            DecodedRecord::from_raw_bytes(rec_primary, key),
1990            DecodedRecord::from_raw_bytes(rec_secondary, key),
1991            DecodedRecord::from_raw_bytes(rec_b, key),
1992        ];
1993
1994        let batches = grouper.add_records(records).unwrap();
1995        assert!(batches.is_empty());
1996
1997        let final_batch = grouper.finish().unwrap().unwrap();
1998        // Secondary was filtered out, and 1/A and 1/B transform to "1"
1999        assert_eq!(final_batch.groups.len(), 1);
2000        assert_eq!(final_batch.groups[0].mi, "1");
2001        assert_eq!(final_batch.groups[0].records.len(), 2); // primary 1/A + 1/B
2002    }
2003
2004    // ========================================================================
2005    // MiGrouper tests (Grouper trait impl)
2006    // ========================================================================
2007
2008    // Helper: create a parsed `DecodedRecord` for `MiGrouper` tests.
2009    fn make_decoded_record_for_mi_grouper(mi: &str) -> DecodedRecord {
2010        let record = create_record_with_mi(mi);
2011        let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
2012        DecodedRecord::new(record, key)
2013    }
2014
2015    #[test]
2016    fn test_mi_grouper_single_group() {
2017        let mut grouper = MiGrouper::new("MI", 10);
2018
2019        let records = vec![
2020            make_decoded_record_for_mi_grouper("0"),
2021            make_decoded_record_for_mi_grouper("0"),
2022            make_decoded_record_for_mi_grouper("0"),
2023        ];
2024
2025        let batches = grouper.add_records(records).unwrap();
2026        assert!(batches.is_empty());
2027        assert!(grouper.has_pending());
2028
2029        let final_batch = grouper.finish().unwrap().unwrap();
2030        assert_eq!(final_batch.groups.len(), 1);
2031        assert_eq!(final_batch.groups[0].mi, "0");
2032        assert_eq!(final_batch.groups[0].records.len(), 3);
2033    }
2034
2035    #[test]
2036    fn test_mi_grouper_multiple_groups() {
2037        let mut grouper = MiGrouper::new("MI", 10);
2038
2039        let records = vec![
2040            make_decoded_record_for_mi_grouper("0"),
2041            make_decoded_record_for_mi_grouper("0"),
2042            make_decoded_record_for_mi_grouper("1"),
2043            make_decoded_record_for_mi_grouper("1"),
2044            make_decoded_record_for_mi_grouper("1"),
2045            make_decoded_record_for_mi_grouper("2"),
2046        ];
2047
2048        let batches = grouper.add_records(records).unwrap();
2049        assert!(batches.is_empty());
2050
2051        let final_batch = grouper.finish().unwrap().unwrap();
2052        assert_eq!(final_batch.groups.len(), 3);
2053        assert_eq!(final_batch.groups[0].mi, "0");
2054        assert_eq!(final_batch.groups[0].records.len(), 2);
2055        assert_eq!(final_batch.groups[1].mi, "1");
2056        assert_eq!(final_batch.groups[1].records.len(), 3);
2057        assert_eq!(final_batch.groups[2].mi, "2");
2058        assert_eq!(final_batch.groups[2].records.len(), 1);
2059    }
2060
2061    #[test]
2062    fn test_mi_grouper_batch_size_triggers() {
2063        let mut grouper = MiGrouper::new("MI", 2);
2064
2065        let records = vec![
2066            make_decoded_record_for_mi_grouper("0"),
2067            make_decoded_record_for_mi_grouper("1"),
2068            make_decoded_record_for_mi_grouper("2"),
2069            make_decoded_record_for_mi_grouper("3"),
2070            make_decoded_record_for_mi_grouper("4"),
2071        ];
2072
2073        let batches = grouper.add_records(records).unwrap();
2074        assert_eq!(batches.len(), 2);
2075        assert_eq!(batches[0].groups.len(), 2);
2076        assert_eq!(batches[0].groups[0].mi, "0");
2077        assert_eq!(batches[0].groups[1].mi, "1");
2078        assert_eq!(batches[1].groups.len(), 2);
2079        assert_eq!(batches[1].groups[0].mi, "2");
2080        assert_eq!(batches[1].groups[1].mi, "3");
2081
2082        assert!(grouper.has_pending());
2083
2084        let final_batch = grouper.finish().unwrap().unwrap();
2085        assert_eq!(final_batch.groups.len(), 1);
2086        assert_eq!(final_batch.groups[0].mi, "4");
2087    }
2088
2089    #[test]
2090    fn test_mi_grouper_rejects_raw_records() {
2091        let mut grouper = MiGrouper::new("MI", 10);
2092
2093        let records = vec![make_raw_decoded_record("MI", "0")];
2094        let result = grouper.add_records(records);
2095        assert!(result.is_err());
2096    }
2097
2098    #[test]
2099    fn test_mi_grouper_finish_empty() {
2100        let mut grouper = MiGrouper::new("MI", 10);
2101        assert!(!grouper.has_pending());
2102
2103        let final_batch = grouper.finish().unwrap();
2104        assert!(final_batch.is_none());
2105    }
2106
2107    #[test]
2108    fn test_mi_grouper_finish_idempotent() {
2109        let mut grouper = MiGrouper::new("MI", 10);
2110
2111        let records = vec![make_decoded_record_for_mi_grouper("0")];
2112        grouper.add_records(records).unwrap();
2113
2114        let batch1 = grouper.finish().unwrap();
2115        assert!(batch1.is_some());
2116
2117        let batch2 = grouper.finish().unwrap();
2118        assert!(batch2.is_none());
2119    }
2120
2121    #[test]
2122    fn test_mi_grouper_with_filter_and_transform() {
2123        let mut grouper = MiGrouper::with_filter_and_transform(
2124            "MI",
2125            10,
2126            |_r| true, // keep all records
2127            |mi| extract_mi_base(mi).to_string(),
2128        );
2129
2130        let records = vec![
2131            make_decoded_record_for_mi_grouper("1/A"),
2132            make_decoded_record_for_mi_grouper("1/B"),
2133            make_decoded_record_for_mi_grouper("2/A"),
2134        ];
2135
2136        let batches = grouper.add_records(records).unwrap();
2137        assert!(batches.is_empty());
2138
2139        let final_batch = grouper.finish().unwrap().unwrap();
2140        assert_eq!(final_batch.groups.len(), 2);
2141        assert_eq!(final_batch.groups[0].mi, "1");
2142        assert_eq!(final_batch.groups[0].records.len(), 2);
2143        assert_eq!(final_batch.groups[1].mi, "2");
2144        assert_eq!(final_batch.groups[1].records.len(), 1);
2145    }
2146
2147    #[test]
2148    fn test_mi_grouper_has_pending_states() {
2149        let mut grouper = MiGrouper::new("MI", 10);
2150
2151        assert!(!grouper.has_pending());
2152
2153        let records = vec![make_decoded_record_for_mi_grouper("0")];
2154        grouper.add_records(records).unwrap();
2155        assert!(grouper.has_pending());
2156
2157        let records = vec![make_decoded_record_for_mi_grouper("1")];
2158        grouper.add_records(records).unwrap();
2159        assert!(grouper.has_pending());
2160    }
2161
2162    #[test]
2163    fn test_mi_grouper_record_without_mi_gets_empty_string() {
2164        let mut grouper = MiGrouper::new("MI", 10);
2165
2166        let record = create_record_without_mi();
2167        let key = crate::unified_pipeline::GroupKey::single(0, 0, 0, 0, 0, 0);
2168        let records = vec![DecodedRecord::new(record, key)];
2169
2170        let batches = grouper.add_records(records).unwrap();
2171        assert!(batches.is_empty());
2172
2173        // Record without MI tag gets default empty string as MI value
2174        let final_batch = grouper.finish().unwrap().unwrap();
2175        assert_eq!(final_batch.groups.len(), 1);
2176        assert_eq!(final_batch.groups[0].mi, "");
2177        assert_eq!(final_batch.groups[0].records.len(), 1);
2178    }
2179
2180    // ========================================================================
2181    // Batch type tests
2182    // ========================================================================
2183
2184    #[test]
2185    fn test_mi_group_batch_new() {
2186        let batch = MiGroupBatch::new();
2187        assert!(batch.is_empty());
2188        assert_eq!(batch.len(), 0);
2189    }
2190
2191    #[test]
2192    fn test_mi_group_batch_with_capacity() {
2193        let batch = MiGroupBatch::with_capacity(16);
2194        assert!(batch.is_empty());
2195        assert_eq!(batch.len(), 0);
2196    }
2197
2198    #[test]
2199    fn test_mi_group_batch_len_and_is_empty() {
2200        let mut batch = MiGroupBatch::new();
2201        assert!(batch.is_empty());
2202
2203        batch.groups.push(MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]));
2204        assert!(!batch.is_empty());
2205        assert_eq!(batch.len(), 1);
2206
2207        batch.groups.push(MiGroup::new(
2208            "1".to_string(),
2209            vec![create_record_with_mi("1"), create_record_with_mi("1")],
2210        ));
2211        assert_eq!(batch.len(), 2);
2212    }
2213
2214    #[test]
2215    fn test_mi_group_batch_clear() {
2216        let mut batch = MiGroupBatch::new();
2217        batch.groups.push(MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]));
2218        assert!(!batch.is_empty());
2219
2220        batch.clear();
2221        assert!(batch.is_empty());
2222        assert_eq!(batch.len(), 0);
2223    }
2224
2225    #[test]
2226    fn test_mi_group_batch_weight() {
2227        let mut batch = MiGroupBatch::new();
2228        assert_eq!(batch.batch_weight(), 0);
2229
2230        batch.groups.push(MiGroup::new(
2231            "0".to_string(),
2232            vec![create_record_with_mi("0"), create_record_with_mi("0")],
2233        ));
2234        assert_eq!(batch.batch_weight(), 2);
2235
2236        batch.groups.push(MiGroup::new("1".to_string(), vec![create_record_with_mi("1")]));
2237        assert_eq!(batch.batch_weight(), 3);
2238    }
2239
2240    #[test]
2241    fn test_mi_group_batch_default() {
2242        let batch = MiGroupBatch::default();
2243        assert!(batch.is_empty());
2244    }
2245
2246    #[test]
2247    fn test_mi_group_batch_memory_estimate() {
2248        let batch = MiGroupBatch::new();
2249        // Empty batch should have minimal heap size (just vec overhead)
2250        let _ = batch.estimate_heap_size(); // Should not panic
2251
2252        let mut batch = MiGroupBatch::new();
2253        batch.groups.push(MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]));
2254        let size = batch.estimate_heap_size();
2255        assert!(size > 0);
2256    }
2257
2258    #[test]
2259    fn test_mi_group_new() {
2260        let group = MiGroup::new("42".to_string(), vec![create_record_with_mi("42")]);
2261        assert_eq!(group.mi, "42");
2262        assert_eq!(group.records.len(), 1);
2263    }
2264
2265    #[test]
2266    fn test_mi_group_batch_weight_single() {
2267        let group = MiGroup::new(
2268            "0".to_string(),
2269            vec![
2270                create_record_with_mi("0"),
2271                create_record_with_mi("0"),
2272                create_record_with_mi("0"),
2273            ],
2274        );
2275        assert_eq!(group.batch_weight(), 3);
2276    }
2277
2278    #[test]
2279    fn test_mi_group_memory_estimate() {
2280        let group = MiGroup::new("0".to_string(), vec![create_record_with_mi("0")]);
2281        let size = group.estimate_heap_size();
2282        assert!(size > 0);
2283    }
2284
2285    // ========================================================================
2286    // Raw batch type tests
2287    // ========================================================================
2288
2289    #[test]
2290    fn test_raw_mi_group_new() {
2291        let raw = make_raw_bam_with_tag("MI", "42");
2292        let group = RawMiGroup::new("42".to_string(), vec![raw]);
2293        assert_eq!(group.mi, "42");
2294        assert_eq!(group.records.len(), 1);
2295    }
2296
2297    #[test]
2298    fn test_raw_mi_group_batch_weight() {
2299        let group = RawMiGroup::new(
2300            "0".to_string(),
2301            vec![make_raw_bam_with_tag("MI", "0"), make_raw_bam_with_tag("MI", "0")],
2302        );
2303        assert_eq!(group.batch_weight(), 2);
2304    }
2305
2306    #[test]
2307    fn test_raw_mi_group_memory_estimate() {
2308        let group = RawMiGroup::new("0".to_string(), vec![make_raw_bam_with_tag("MI", "0")]);
2309        let size = group.estimate_heap_size();
2310        assert!(size > 0);
2311    }
2312
2313    #[test]
2314    fn test_raw_mi_group_batch_new() {
2315        let batch = RawMiGroupBatch::new();
2316        assert!(batch.groups.is_empty());
2317    }
2318
2319    #[test]
2320    fn test_raw_mi_group_batch_default() {
2321        let batch = RawMiGroupBatch::default();
2322        assert!(batch.groups.is_empty());
2323    }
2324
2325    #[test]
2326    fn test_raw_mi_group_batch_weight_method() {
2327        let mut batch = RawMiGroupBatch::new();
2328        assert_eq!(batch.batch_weight(), 0);
2329
2330        batch.groups.push(RawMiGroup::new(
2331            "0".to_string(),
2332            vec![make_raw_bam_with_tag("MI", "0"), make_raw_bam_with_tag("MI", "0")],
2333        ));
2334        assert_eq!(batch.batch_weight(), 2);
2335
2336        batch.groups.push(RawMiGroup::new("1".to_string(), vec![make_raw_bam_with_tag("MI", "1")]));
2337        assert_eq!(batch.batch_weight(), 3);
2338    }
2339
2340    #[test]
2341    fn test_raw_mi_group_batch_memory_estimate() {
2342        let batch = RawMiGroupBatch::new();
2343        let _ = batch.estimate_heap_size(); // Should not panic
2344
2345        let mut batch = RawMiGroupBatch::new();
2346        batch.groups.push(RawMiGroup::new("0".to_string(), vec![make_raw_bam_with_tag("MI", "0")]));
2347        let size = batch.estimate_heap_size();
2348        assert!(size > 0);
2349    }
2350}