use std::io;
use noodles::sam::alignment::RecordBuf;
use crate::sort::bam_fields;
use crate::template::{Template, TemplateBatch};
use crate::unified_pipeline::{BatchWeight, DecodedRecord, Grouper, MemoryEstimate};
use fgumi_raw_bam::{RawRecord, raw_record_to_record_buf};
impl BatchWeight for RecordBuf {
fn batch_weight(&self) -> usize {
1
}
}
impl BatchWeight for Vec<u8> {
fn batch_weight(&self) -> usize {
1
}
}
impl BatchWeight for RawRecord {
fn batch_weight(&self) -> usize {
1
}
}
impl MemoryEstimate for RawRecord {
fn estimate_heap_size(&self) -> usize {
self.capacity()
}
}
impl BatchWeight for TemplateBatch {
fn batch_weight(&self) -> usize {
self.len()
}
}
pub struct SingleRecordGrouper {
header: noodles::sam::Header,
}
impl SingleRecordGrouper {
#[must_use]
pub fn with_header(header: noodles::sam::Header) -> Self {
Self { header }
}
}
impl Grouper for SingleRecordGrouper {
type Group = RecordBuf;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
records
.into_iter()
.map(|d| {
raw_record_to_record_buf(&d.into_raw_bytes(), &self.header)
.map_err(io::Error::other)
})
.collect()
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
Ok(None)
}
fn has_pending(&self) -> bool {
false
}
}
#[derive(Default)]
pub struct SingleRawRecordGrouper;
impl SingleRawRecordGrouper {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Grouper for SingleRawRecordGrouper {
type Group = RawRecord;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
Ok(records.into_iter().map(DecodedRecord::into_raw_bytes).collect())
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
Ok(None)
}
fn has_pending(&self) -> bool {
false
}
}
use std::collections::VecDeque;
pub struct TemplateGrouper {
batch_size: usize,
current_name: Option<Vec<u8>>,
current_name_hash: Option<u64>,
current_raw_records: Vec<RawRecord>,
pending_templates: VecDeque<Template>,
}
impl TemplateGrouper {
#[must_use]
pub fn new(batch_size: usize) -> Self {
Self {
batch_size: batch_size.max(1),
current_name: None,
current_name_hash: None,
current_raw_records: Vec::new(),
pending_templates: VecDeque::new(),
}
}
fn flush_current_template(&mut self) -> io::Result<()> {
if !self.current_raw_records.is_empty() {
let template = Template::from_records(std::mem::take(&mut self.current_raw_records))
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.pending_templates.push_back(template);
self.current_name = None;
self.current_name_hash = None;
}
Ok(())
}
fn collect_batches(&mut self) -> Vec<TemplateBatch> {
let mut batches = Vec::new();
while self.pending_templates.len() >= self.batch_size {
let mut batch = Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
if let Some(template) = self.pending_templates.pop_front() {
batch.push(template);
}
}
batches.push(batch);
}
batches
}
}
impl Grouper for TemplateGrouper {
type Group = TemplateBatch;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
for decoded in records {
let name_hash = decoded.key.name_hash;
let raw = decoded.data;
let read_name = bam_fields::read_name(&raw);
let same_template = match (self.current_name_hash, self.current_name.as_deref()) {
(Some(h), Some(name)) => h == name_hash && name == read_name,
_ => false,
};
if same_template {
self.current_raw_records.push(raw);
} else {
self.flush_current_template()?;
self.current_name = Some(read_name.to_vec());
self.current_name_hash = Some(name_hash);
self.current_raw_records.push(raw);
}
}
Ok(self.collect_batches())
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
self.flush_current_template()?;
if self.pending_templates.is_empty() {
return Ok(None);
}
let mut batch = Vec::with_capacity(self.pending_templates.len());
while let Some(template) = self.pending_templates.pop_front() {
batch.push(template);
}
Ok(Some(batch))
}
fn has_pending(&self) -> bool {
self.current_name.is_some()
|| !self.pending_templates.is_empty()
|| !self.current_raw_records.is_empty()
}
}
use crate::unified_pipeline::GroupKey;
impl MemoryEstimate for ProcessedPositionGroup {
fn estimate_heap_size(&self) -> usize {
let templates_size: usize =
self.templates.iter().map(MemoryEstimate::estimate_heap_size).sum();
let templates_vec_overhead = self.templates.capacity() * std::mem::size_of::<Template>();
let family_sizes_size = self.family_sizes.len() * 24;
templates_size + templates_vec_overhead + family_sizes_size
}
}
#[derive(Default, Clone, Debug)]
pub struct FilterMetrics {
pub total_templates: u64,
pub accepted_templates: u64,
pub discarded_non_pf: u64,
pub discarded_poor_alignment: u64,
pub discarded_ns_in_umi: u64,
pub discarded_umi_too_short: u64,
}
impl FilterMetrics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &FilterMetrics) {
self.total_templates += other.total_templates;
self.accepted_templates += other.accepted_templates;
self.discarded_non_pf += other.discarded_non_pf;
self.discarded_poor_alignment += other.discarded_poor_alignment;
self.discarded_ns_in_umi += other.discarded_ns_in_umi;
self.discarded_umi_too_short += other.discarded_umi_too_short;
}
}
#[derive(Debug)]
pub struct ProcessedPositionGroup {
pub templates: Vec<Template>,
pub family_sizes: ahash::AHashMap<usize, u64>,
pub filter_metrics: FilterMetrics,
pub input_record_count: u64,
pub distinct_mi_count: u64,
}
type PositionKeyTuple = (i32, i32, u8, i32, i32, u8, u16, u64);
#[derive(Debug)]
pub struct RawPositionGroup {
pub group_key: GroupKey,
pub records: Vec<DecodedRecord>,
}
impl BatchWeight for RawPositionGroup {
fn batch_weight(&self) -> usize {
self.records.len().div_ceil(2)
}
}
impl MemoryEstimate for RawPositionGroup {
fn estimate_heap_size(&self) -> usize {
let records_size: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
let records_vec_overhead = self.records.capacity() * std::mem::size_of::<DecodedRecord>();
records_size + records_vec_overhead
}
}
pub struct RecordPositionGrouper {
current_position_key: Option<PositionKeyTuple>,
current_group_key: Option<GroupKey>,
current_records: Vec<DecodedRecord>,
mc_validated: bool,
include_secondary_supplementary: bool,
}
impl RecordPositionGrouper {
#[must_use]
pub fn new() -> Self {
Self {
current_position_key: None,
current_group_key: None,
current_records: Vec::new(),
mc_validated: false,
include_secondary_supplementary: false,
}
}
#[must_use]
pub fn with_secondary_supplementary() -> Self {
Self { include_secondary_supplementary: true, ..Self::new() }
}
fn validate_mc_tag(&mut self, decoded: &DecodedRecord) -> io::Result<()> {
use fgumi_raw_bam::RawRecordView;
let raw = &decoded.data;
let flg = RawRecordView::new(raw).flags();
let is_paired = (flg & bam_fields::flags::PAIRED) != 0;
let is_secondary = (flg & bam_fields::flags::SECONDARY) != 0;
let is_supplementary = (flg & bam_fields::flags::SUPPLEMENTARY) != 0;
let is_unmapped = (flg & bam_fields::flags::UNMAPPED) != 0;
let is_mate_unmapped = (flg & bam_fields::flags::MATE_UNMAPPED) != 0;
if is_paired && !is_secondary && !is_supplementary && !is_unmapped && !is_mate_unmapped {
if bam_fields::find_mc_tag_in_record(raw).is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"RecordPositionGrouper requires MC tags on paired-end reads. \
Run `fgumi zipper` to add MC tags before `fgumi group`.",
));
}
self.mc_validated = true;
}
Ok(())
}
pub fn add_record(&mut self, decoded: DecodedRecord) -> io::Result<Option<RawPositionGroup>> {
self.process_record(decoded)
}
fn process_record(&mut self, decoded: DecodedRecord) -> io::Result<Option<RawPositionGroup>> {
if decoded.key.ref_id1 == GroupKey::UNKNOWN_REF && !self.include_secondary_supplementary {
return Ok(None);
}
self.validate_mc_tag(&decoded)?;
let record_pos_key = decoded.key.position_key();
match self.current_position_key {
Some(current_pos) if current_pos == record_pos_key => {
self.current_records.push(decoded);
Ok(None)
}
Some(_)
if self.current_records.last().is_some_and(|last| {
last.key.name_hash == decoded.key.name_hash
&& bam_fields::read_name(&last.data) == bam_fields::read_name(&decoded.data)
}) =>
{
self.current_records.push(decoded);
Ok(None)
}
Some(_) => {
let finished_records = std::mem::take(&mut self.current_records);
let finished_key = self
.current_group_key
.take()
.expect("current_group_key set when current_position_key is set");
let group = RawPositionGroup { group_key: finished_key, records: finished_records };
self.current_position_key = Some(record_pos_key);
self.current_group_key = Some(decoded.key);
self.current_records.push(decoded);
Ok(Some(group))
}
None => {
self.current_position_key = Some(record_pos_key);
self.current_group_key = Some(decoded.key);
self.current_records.push(decoded);
Ok(None)
}
}
}
}
impl Default for RecordPositionGrouper {
fn default() -> Self {
Self::new()
}
}
impl Grouper for RecordPositionGrouper {
type Group = RawPositionGroup;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
let mut completed_groups = Vec::new();
for decoded in records {
if let Some(group) = self.process_record(decoded)? {
completed_groups.push(group);
}
}
Ok(completed_groups)
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
if !self.current_records.is_empty() {
debug_assert!(
self.current_group_key.is_some(),
"RecordPositionGrouper has {} buffered records but no group key",
self.current_records.len()
);
if let Some(key) = self.current_group_key.take() {
let records = std::mem::take(&mut self.current_records);
self.current_position_key = None;
return Ok(Some(RawPositionGroup { group_key: key, records }));
}
}
Ok(None)
}
fn has_pending(&self) -> bool {
!self.current_records.is_empty()
}
}
fn group_by_name_and_build<T>(
records: Vec<DecodedRecord>,
extract: impl Fn(DecodedRecord) -> io::Result<T>,
build: impl Fn(Vec<T>) -> anyhow::Result<Template>,
) -> io::Result<Vec<Template>> {
let mut templates = Vec::new();
let mut current_name_hash: Option<u64> = None;
let mut current_name: Option<Vec<u8>> = None;
let mut current_items: Vec<T> = Vec::new();
for decoded in records {
let name_hash = decoded.key.name_hash;
let read_name = bam_fields::read_name(&decoded.data).to_vec();
let item = extract(decoded)?;
let same = current_name_hash == Some(name_hash)
&& current_name.as_deref() == Some(read_name.as_slice());
if same {
current_items.push(item);
} else if current_name_hash.is_some() {
let template = build(std::mem::take(&mut current_items))
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
templates.push(template);
current_name_hash = Some(name_hash);
current_name = Some(read_name);
current_items.push(item);
} else {
current_name_hash = Some(name_hash);
current_name = Some(read_name);
current_items.push(item);
}
}
if !current_items.is_empty() {
let template =
build(current_items).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
templates.push(template);
}
Ok(templates)
}
pub fn build_templates_from_records(records: Vec<DecodedRecord>) -> io::Result<Vec<Template>> {
group_by_name_and_build(records, |d| Ok(d.data), Template::from_records)
}
use crate::fastq_parse::{FastqRecord, parse_fastq_records, strip_read_suffix};
#[derive(Debug)]
pub struct FastqTemplate {
pub records: Vec<FastqRecord>,
pub name: Vec<u8>,
}
impl MemoryEstimate for FastqTemplate {
fn estimate_heap_size(&self) -> usize {
let records_heap: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
let records_vec_overhead = self.records.capacity() * std::mem::size_of::<FastqRecord>();
self.name.capacity() + records_heap + records_vec_overhead
}
}
pub struct FastqGrouper {
num_inputs: usize,
leftovers: Vec<Vec<u8>>,
pending_records: Vec<VecDeque<FastqRecord>>,
}
impl FastqGrouper {
#[must_use]
pub fn new(num_inputs: usize) -> Self {
log::debug!("FastqGrouper::new: num_inputs={num_inputs}");
Self {
num_inputs,
leftovers: vec![Vec::new(); num_inputs],
pending_records: (0..num_inputs).map(|_| VecDeque::new()).collect(),
}
}
pub fn add_bytes_for_stream(&mut self, stream_idx: usize, data: &[u8]) -> io::Result<()> {
log::trace!(
"FastqGrouper::add_bytes_for_stream: stream_idx={}, num_inputs={}, data_len={}",
stream_idx,
self.num_inputs,
data.len()
);
if stream_idx >= self.num_inputs {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
));
}
let combined = if self.leftovers[stream_idx].is_empty() {
data.to_vec()
} else {
let mut combined = std::mem::take(&mut self.leftovers[stream_idx]);
combined.extend_from_slice(data);
combined
};
let (records, leftover) = parse_fastq_records(&combined)?;
self.leftovers[stream_idx] = leftover;
self.pending_records[stream_idx].extend(records);
Ok(())
}
pub fn add_records_for_stream(
&mut self,
stream_idx: usize,
records: Vec<FastqRecord>,
) -> io::Result<()> {
log::trace!(
"FastqGrouper::add_records_for_stream: stream_idx={}, num_inputs={}, records_len={}",
stream_idx,
self.num_inputs,
records.len()
);
if stream_idx >= self.num_inputs {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
));
}
self.pending_records[stream_idx].extend(records);
Ok(())
}
#[must_use]
pub fn has_leftover_bytes(&self) -> bool {
self.leftovers.iter().any(|l| !l.is_empty())
}
pub fn drain_complete_templates(&mut self) -> io::Result<Vec<FastqTemplate>> {
let mut templates = Vec::new();
while self.pending_records.iter().all(|q| !q.is_empty()) {
let base_name = {
let names: Vec<&[u8]> = self
.pending_records
.iter()
.map(|q| {
q.front()
.expect("pending queue must be non-empty inside all-non-empty loop")
.name()
})
.collect();
let base_name = strip_read_suffix(names[0]).to_vec();
for (i, &name) in names.iter().enumerate().skip(1) {
let other_base = strip_read_suffix(name);
if base_name != other_base {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"FASTQ files out of sync: stream 0 has '{}', stream {} has '{}'",
String::from_utf8_lossy(&base_name),
i,
String::from_utf8_lossy(other_base),
),
));
}
}
base_name };
let records: Vec<_> = self
.pending_records
.iter_mut()
.map(|q| {
q.pop_front()
.expect("pending queue must be non-empty inside all-non-empty loop")
})
.collect();
templates.push(FastqTemplate { name: base_name, records });
}
Ok(templates)
}
#[must_use]
pub fn has_pending(&self) -> bool {
self.leftovers.iter().any(|l| !l.is_empty())
|| self.pending_records.iter().any(|q| !q.is_empty())
}
pub fn finish(&mut self) -> io::Result<Option<FastqTemplate>> {
let templates = self.drain_complete_templates()?;
if templates.len() == 1 {
return Ok(templates.into_iter().next());
} else if templates.len() > 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Multiple templates remaining at finish",
));
}
if self.leftovers.iter().any(|l| !l.is_empty()) {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Incomplete FASTQ record at EOF",
));
}
if self.pending_records.iter().any(|q| !q.is_empty()) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unmatched FASTQ records at EOF - files out of sync",
));
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_record_grouper_empty() {
let mut grouper = SingleRecordGrouper::with_header(noodles::sam::Header::default());
assert!(!grouper.has_pending());
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_single_raw_record_grouper_empty() {
let mut grouper = SingleRawRecordGrouper::new();
assert!(!grouper.has_pending());
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_single_raw_record_grouper_emits_each_record() {
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let mut grouper = SingleRawRecordGrouper::new();
let raw1 = RawRecord::from(vec![1u8; 36]);
let raw2 = RawRecord::from(vec![2u8; 36]);
let records = vec![
DecodedRecord::from_raw_bytes(raw1.clone(), GroupKey::default()),
DecodedRecord::from_raw_bytes(raw2.clone(), GroupKey::default()),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 2);
assert_eq!(groups[0], raw1);
assert_eq!(groups[1], raw2);
}
#[test]
fn test_fastq_grouper_paired() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n")
.expect("add_bytes_for_stream failed");
grouper
.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n")
.expect("add_bytes_for_stream failed");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert_eq!(templates.len(), 1);
assert_eq!(templates[0].name, b"read1");
assert_eq!(templates[0].records.len(), 2);
assert_eq!(templates[0].records[0].sequence(), b"ACGT");
assert_eq!(templates[0].records[1].sequence(), b"TGCA");
}
#[test]
fn test_fastq_grouper_multiple_templates() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n@read2/1\nAAAA\n+\nIIII\n")
.expect("add_bytes_for_stream should succeed for R1");
grouper
.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n@read2/2\nTTTT\n+\nJJJJ\n")
.expect("add_bytes_for_stream should succeed for R2");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert_eq!(templates.len(), 2);
assert_eq!(templates[0].name, b"read1");
assert_eq!(templates[1].name, b"read2");
}
#[test]
fn test_fastq_grouper_partial_then_complete() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n")
.expect("add_bytes_for_stream failed");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert!(templates.is_empty());
assert!(grouper.has_pending());
grouper
.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n")
.expect("add_bytes_for_stream failed");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_fastq_grouper_finish_empty() {
let mut grouper = FastqGrouper::new(2);
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_fastq_grouper_out_of_sync() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n")
.expect("add_bytes_for_stream failed");
grouper
.add_bytes_for_stream(1, b"@read2/2\nTGCA\n+\nJJJJ\n")
.expect("add_bytes_for_stream failed");
let result = grouper.drain_complete_templates();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("out of sync"));
}
use fgumi_raw_bam::{SamBuilder as RawSamBuilder, flags as raw_flags};
fn make_decoded(
key: GroupKey,
paired: bool,
first_segment: bool,
mc: Option<&str>,
) -> DecodedRecord {
use fgumi_raw_bam::testutil::encode_op;
let mut flags: u16 = 0;
if paired {
flags |= raw_flags::PAIRED;
}
if first_segment {
flags |= raw_flags::FIRST_SEGMENT;
}
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"ACGT")
.qualities(&[30; 4])
.flags(flags)
.ref_id(0)
.pos(99) .cigar_ops(&[encode_op(0, 4)]);
if let Some(mc_val) = mc {
b.add_string_tag(b"MC", mc_val.as_bytes());
}
DecodedRecord::from_raw_bytes(b.build(), key)
}
fn make_secondary_decoded(name_hash: u64) -> DecodedRecord {
let key = GroupKey { name_hash, ..GroupKey::default() };
let mut b = RawSamBuilder::new();
b.read_name(b"read1").sequence(b"ACGT").qualities(&[30; 4]).flags(raw_flags::SECONDARY);
DecodedRecord::from_raw_bytes(b.build(), key)
}
#[test]
fn test_record_position_grouper_empty() {
let mut grouper = RecordPositionGrouper::new();
assert!(!grouper.has_pending());
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_record_position_grouper_single_unpaired_record() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let decoded = make_decoded(key, false, false, None);
let groups = grouper.add_records(vec![decoded]).expect("add_records should succeed");
assert!(groups.is_empty()); assert!(grouper.has_pending());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 1);
assert_eq!(final_group.group_key.ref_id1, 0);
assert_eq!(final_group.group_key.pos1, 100);
}
#[test]
fn test_record_position_grouper_same_position_multiple_records() {
let mut grouper = RecordPositionGrouper::new();
let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 11111);
let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 22222);
let key3 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 33333);
let records = vec![
make_decoded(key1, true, true, Some("4M")),
make_decoded(key2, true, true, Some("4M")),
make_decoded(key3, true, true, Some("4M")),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 3);
}
#[test]
fn test_record_position_grouper_different_positions() {
let mut grouper = RecordPositionGrouper::new();
let key_pos1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
let key_pos2 = GroupKey::single(0, 200, 0, 0, 0, 22222);
let key_pos3 = GroupKey::single(0, 300, 0, 0, 0, 33333);
let records = vec![
make_decoded(key_pos1, false, false, None),
make_decoded(key_pos2, false, false, None),
make_decoded(key_pos3, false, false, None),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].group_key.pos1, 100);
assert_eq!(groups[0].records.len(), 1);
assert_eq!(groups[1].group_key.pos1, 200);
assert_eq!(groups[1].records.len(), 1);
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.group_key.pos1, 300);
}
#[test]
fn test_record_position_grouper_skips_secondary() {
let mut grouper = RecordPositionGrouper::new();
let primary_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
let records = vec![
make_decoded(primary_key, false, false, None),
make_secondary_decoded(11111), ];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 1); }
#[test]
fn test_record_position_grouper_paired_same_position_key() {
let mut grouper = RecordPositionGrouper::new();
let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
assert_eq!(r1_key.position_key(), r2_key.position_key());
let records = vec![
make_decoded(r1_key, true, true, Some("4M")),
make_decoded(r2_key, true, false, Some("4M")),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 2);
}
#[test]
fn test_record_position_grouper_groups_records_by_position() {
let mut grouper = RecordPositionGrouper::new();
let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
let key4 = GroupKey::single(0, 200, 0, 0, 0, 44444);
let key5 = GroupKey::single(0, 200, 0, 0, 0, 55555);
let key6 = GroupKey::single(0, 300, 0, 0, 0, 66666);
let records = vec![
make_decoded(key1, false, false, None),
make_decoded(key2, false, false, None),
make_decoded(key3, false, false, None),
make_decoded(key4, false, false, None),
make_decoded(key5, false, false, None),
make_decoded(key6, false, false, None),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].records.len(), 3);
assert_eq!(groups[1].records.len(), 2);
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 1);
}
#[test]
fn test_record_position_grouper_coalesces_unmapped_mate_by_name_hash() {
let mut grouper = RecordPositionGrouper::new();
let name_hash = 12345_u64;
let r1_key = GroupKey::single(5, 100, 0, 0, 0, name_hash);
let r1 = {
use fgumi_raw_bam::testutil::encode_op;
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"ACGT")
.qualities(&[30; 4])
.flags(raw_flags::PAIRED | raw_flags::FIRST_SEGMENT | raw_flags::MATE_UNMAPPED)
.ref_id(5)
.pos(99)
.cigar_ops(&[encode_op(0, 4)]);
DecodedRecord::from_raw_bytes(b.build(), r1_key)
};
let r2_key = GroupKey::single(-1, 0, 0, 0, 0, name_hash);
let r2 = {
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"TGCA")
.qualities(&[30; 4])
.flags(raw_flags::PAIRED | raw_flags::LAST_SEGMENT | raw_flags::UNMAPPED);
DecodedRecord::from_raw_bytes(b.build(), r2_key)
};
assert_ne!(r1_key.position_key(), r2_key.position_key());
let groups = grouper.add_records(vec![r1, r2]).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 2, "R1 and R2 should be in the same group");
assert_eq!(final_group.group_key.ref_id1, 5, "Group key should use R1's position");
}
#[test]
fn test_record_position_grouper_does_not_coalesce_different_name_hash() {
let mut grouper = RecordPositionGrouper::new();
let r1_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
let r2_key = GroupKey::single(0, 200, 0, 0, 0, 22222);
let records = vec![
make_decoded(r1_key, false, false, None),
make_decoded(r2_key, false, false, None),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].records.len(), 1);
}
#[test]
fn test_record_position_grouper_mc_validation_skips_unmapped_mate() {
use fgumi_raw_bam::testutil::encode_op;
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"ACGT")
.qualities(&[30; 4])
.flags(raw_flags::PAIRED | raw_flags::FIRST_SEGMENT | raw_flags::MATE_UNMAPPED)
.ref_id(0)
.pos(99)
.cigar_ops(&[encode_op(0, 4)]);
let decoded = DecodedRecord::from_raw_bytes(b.build(), key);
let result = grouper.add_records(vec![decoded]);
assert!(result.is_ok());
assert!(!grouper.mc_validated); }
#[test]
fn test_record_position_grouper_mc_validation_fails_without_mc() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let decoded = make_decoded(key, true, true, None);
let result = grouper.add_records(vec![decoded]);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("MC tags"), "Error should mention MC tags: {err_msg}");
}
#[test]
fn test_record_position_grouper_mc_validation_passes_with_mc() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let decoded = make_decoded(key, true, true, Some("4M"));
let result = grouper.add_records(vec![decoded]);
assert!(result.is_ok());
}
#[test]
fn test_record_position_grouper_mc_validation_catches_later_missing_mc() {
let mut grouper = RecordPositionGrouper::new();
let key_ok = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 11111);
let ok_record = make_decoded(key_ok, true, true, Some("4M"));
grouper.add_records(vec![ok_record]).expect("first record with MC should validate");
assert!(grouper.mc_validated);
let key_bad = GroupKey::paired(0, 300, 0, 0, 400, 1, 0, 0, 22222);
let bad_record = make_decoded(key_bad, true, true, None);
let result = grouper.add_records(vec![bad_record]);
assert!(result.is_err(), "later paired primary missing MC must fail");
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("MC tags"), "Error should mention MC tags: {err_msg}");
}
#[test]
fn test_record_position_grouper_mc_validation_skips_unpaired() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let decoded = make_decoded(key, false, false, None);
let result = grouper.add_records(vec![decoded]);
assert!(result.is_ok());
assert!(!grouper.mc_validated); }
#[test]
fn test_record_position_grouper_default_impl() {
let grouper = RecordPositionGrouper::default();
assert!(!grouper.has_pending());
}
#[test]
fn test_build_templates_empty() {
let result = build_templates_from_records(vec![]).expect("build templates from records");
assert!(result.is_empty());
}
#[test]
fn test_build_templates_single_record() {
use fgumi_raw_bam::testutil::encode_op;
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"ACGT")
.qualities(&[30; 4])
.flags(raw_flags::PAIRED | raw_flags::FIRST_SEGMENT)
.ref_id(0)
.pos(99)
.mapq(60)
.cigar_ops(&[encode_op(0, 4)]);
let decoded = DecodedRecord::from_raw_bytes(b.build(), key);
let templates =
build_templates_from_records(vec![decoded]).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_build_templates_paired_same_name_hash() {
use fgumi_raw_bam::testutil::encode_op;
let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r1 = {
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"ACGT")
.qualities(&[30; 4])
.flags(raw_flags::PAIRED | raw_flags::FIRST_SEGMENT)
.ref_id(0)
.pos(99)
.cigar_ops(&[encode_op(0, 4)]);
DecodedRecord::from_raw_bytes(b.build(), r1_key)
};
let r2 = {
let mut b = RawSamBuilder::new();
b.read_name(b"read1")
.sequence(b"TGCA")
.qualities(&[30; 4])
.flags(raw_flags::PAIRED | raw_flags::LAST_SEGMENT | raw_flags::REVERSE)
.ref_id(0)
.pos(199)
.cigar_ops(&[encode_op(0, 4)]);
DecodedRecord::from_raw_bytes(b.build(), r2_key)
};
let templates =
build_templates_from_records(vec![r1, r2]).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_build_templates_multiple_qnames() {
use fgumi_raw_bam::testutil::encode_op;
let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
let make_rec = |name: &[u8]| {
let mut b = RawSamBuilder::new();
b.read_name(name)
.sequence(b"ACGT")
.qualities(&[30; 4])
.flags(0)
.ref_id(0)
.pos(99)
.cigar_ops(&[encode_op(0, 4)]);
b.build()
};
let records: Vec<DecodedRecord> = vec![
DecodedRecord::from_raw_bytes(make_rec(b"readA"), key1),
DecodedRecord::from_raw_bytes(make_rec(b"readB"), key2),
DecodedRecord::from_raw_bytes(make_rec(b"readC"), key3),
];
let templates =
build_templates_from_records(records).expect("build templates from records");
assert_eq!(templates.len(), 3);
}
#[test]
fn test_build_templates_from_raw_bytes() {
use crate::sort::bam_fields;
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let raw = bam_fields::make_bam_bytes(
0, 100, bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT, b"read1", &[bam_fields::encode_op(0, 4)], 4, 0, 200, &[], );
let decoded = DecodedRecord::from_raw_bytes(raw, key);
let templates =
build_templates_from_records(vec![decoded]).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_build_templates_from_raw_bytes_paired() {
use crate::sort::bam_fields;
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r1 = bam_fields::make_bam_bytes(
0,
100,
bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT,
b"read1",
&[bam_fields::encode_op(0, 4)],
4,
0,
200,
&[],
);
let r2 = bam_fields::make_bam_bytes(
0,
200,
bam_fields::flags::PAIRED
| bam_fields::flags::LAST_SEGMENT
| bam_fields::flags::REVERSE,
b"read1",
&[bam_fields::encode_op(0, 4)],
4,
0,
100,
&[],
);
let records =
vec![DecodedRecord::from_raw_bytes(r1, key1), DecodedRecord::from_raw_bytes(r2, key2)];
let templates =
build_templates_from_records(records).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_raw_position_group_batch_weight() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let records = vec![
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None),
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 22222), false, false, None),
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 33333), false, false, None),
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 44444), false, false, None),
];
let group = RawPositionGroup { group_key: key, records };
assert_eq!(group.batch_weight(), 2);
}
#[test]
fn test_raw_position_group_batch_weight_single() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let records =
vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
let group = RawPositionGroup { group_key: key, records };
assert_eq!(group.batch_weight(), 1);
}
#[test]
fn test_raw_position_group_memory_estimate() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let records =
vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
let group = RawPositionGroup { group_key: key, records };
assert!(group.estimate_heap_size() > 0);
}
}