use std::io;
use noodles::sam::alignment::RecordBuf;
use crate::sam::SamTag;
use crate::sort::bam_fields;
use crate::template::{Template, TemplateBatch};
use crate::unified_pipeline::{BatchWeight, DecodedRecord, Grouper, MemoryEstimate};
impl BatchWeight for RecordBuf {
fn batch_weight(&self) -> usize {
1
}
}
impl BatchWeight for Vec<u8> {
fn batch_weight(&self) -> usize {
1
}
}
impl BatchWeight for TemplateBatch {
fn batch_weight(&self) -> usize {
self.len()
}
}
#[derive(Default)]
pub struct SingleRecordGrouper;
impl SingleRecordGrouper {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Grouper for SingleRecordGrouper {
type Group = RecordBuf;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
records
.into_iter()
.map(|d| {
d.into_record().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"SingleRecordGrouper requires parsed records, got raw bytes",
)
})
})
.collect()
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
Ok(None)
}
fn has_pending(&self) -> bool {
false
}
}
#[derive(Default)]
pub struct SingleRawRecordGrouper;
impl SingleRawRecordGrouper {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl Grouper for SingleRawRecordGrouper {
type Group = Vec<u8>;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
records
.into_iter()
.map(|d| {
d.into_raw_bytes().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"SingleRawRecordGrouper requires raw bytes, got parsed record",
)
})
})
.collect()
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
Ok(None)
}
fn has_pending(&self) -> bool {
false
}
}
use std::collections::VecDeque;
pub struct TemplateGrouper {
batch_size: usize,
current_name: Option<Vec<u8>>,
current_name_hash: Option<u64>,
current_records: Vec<RecordBuf>,
current_raw_records: Vec<Vec<u8>>,
pending_templates: VecDeque<Template>,
}
impl TemplateGrouper {
#[must_use]
pub fn new(batch_size: usize) -> Self {
Self {
batch_size: batch_size.max(1),
current_name: None,
current_name_hash: None,
current_records: Vec::new(),
current_raw_records: Vec::new(),
pending_templates: VecDeque::new(),
}
}
fn flush_current_template(&mut self) -> io::Result<()> {
debug_assert!(
self.current_raw_records.is_empty() || self.current_records.is_empty(),
"mixed raw/parsed records in same template group"
);
if !self.current_raw_records.is_empty() {
let template =
Template::from_raw_records(std::mem::take(&mut self.current_raw_records))
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.pending_templates.push_back(template);
self.current_name = None;
self.current_name_hash = None;
} else if !self.current_records.is_empty() {
let template = Template::from_records(std::mem::take(&mut self.current_records))
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.pending_templates.push_back(template);
self.current_name = None;
self.current_name_hash = None;
}
Ok(())
}
fn collect_batches(&mut self) -> Vec<TemplateBatch> {
let mut batches = Vec::new();
while self.pending_templates.len() >= self.batch_size {
let mut batch = Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
if let Some(template) = self.pending_templates.pop_front() {
batch.push(template);
}
}
batches.push(batch);
}
batches
}
}
impl Grouper for TemplateGrouper {
type Group = TemplateBatch;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
use crate::unified_pipeline::DecodedRecordData;
for decoded in records {
let name_hash = decoded.key.name_hash;
match decoded.data {
DecodedRecordData::Raw(raw) => {
match (self.current_name_hash, name_hash) {
(Some(current_hash), new_hash) if current_hash == new_hash => {
self.current_raw_records.push(raw);
}
_ => {
self.flush_current_template()?;
self.current_name = Some(bam_fields::read_name(&raw).to_vec());
self.current_name_hash = Some(name_hash);
self.current_raw_records.push(raw);
}
}
}
DecodedRecordData::Parsed(record) => {
let name = record.name().map(|n| Vec::from(<_ as AsRef<[u8]>>::as_ref(n)));
match (self.current_name_hash, name_hash) {
(Some(current_hash), new_hash) if current_hash == new_hash => {
self.current_records.push(record);
}
_ => {
self.flush_current_template()?;
self.current_name = name;
self.current_name_hash = Some(name_hash);
self.current_records.push(record);
}
}
}
}
}
Ok(self.collect_batches())
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
self.flush_current_template()?;
if self.pending_templates.is_empty() {
return Ok(None);
}
let mut batch = Vec::with_capacity(self.pending_templates.len());
while let Some(template) = self.pending_templates.pop_front() {
batch.push(template);
}
Ok(Some(batch))
}
fn has_pending(&self) -> bool {
self.current_name.is_some()
|| !self.pending_templates.is_empty()
|| !self.current_raw_records.is_empty()
|| !self.current_records.is_empty()
}
}
use noodles::sam::alignment::record::data::field::Tag;
use crate::unified_pipeline::GroupKey;
impl MemoryEstimate for ProcessedPositionGroup {
fn estimate_heap_size(&self) -> usize {
let templates_size: usize =
self.templates.iter().map(MemoryEstimate::estimate_heap_size).sum();
let templates_vec_overhead = self.templates.capacity() * std::mem::size_of::<Template>();
let family_sizes_size = self.family_sizes.len() * 24;
templates_size + templates_vec_overhead + family_sizes_size
}
}
#[derive(Default, Clone, Debug)]
pub struct FilterMetrics {
pub total_templates: u64,
pub accepted_templates: u64,
pub discarded_non_pf: u64,
pub discarded_poor_alignment: u64,
pub discarded_ns_in_umi: u64,
pub discarded_umi_too_short: u64,
}
impl FilterMetrics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &FilterMetrics) {
self.total_templates += other.total_templates;
self.accepted_templates += other.accepted_templates;
self.discarded_non_pf += other.discarded_non_pf;
self.discarded_poor_alignment += other.discarded_poor_alignment;
self.discarded_ns_in_umi += other.discarded_ns_in_umi;
self.discarded_umi_too_short += other.discarded_umi_too_short;
}
}
#[derive(Debug)]
pub struct ProcessedPositionGroup {
pub templates: Vec<Template>,
pub family_sizes: ahash::AHashMap<usize, u64>,
pub filter_metrics: FilterMetrics,
pub input_record_count: u64,
}
type PositionKeyTuple = (i32, i32, u8, i32, i32, u8, u16, u64);
#[derive(Debug)]
pub struct RawPositionGroup {
pub group_key: GroupKey,
pub records: Vec<DecodedRecord>,
}
impl BatchWeight for RawPositionGroup {
fn batch_weight(&self) -> usize {
self.records.len().div_ceil(2)
}
}
impl MemoryEstimate for RawPositionGroup {
fn estimate_heap_size(&self) -> usize {
let records_size: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
let records_vec_overhead = self.records.capacity() * std::mem::size_of::<DecodedRecord>();
records_size + records_vec_overhead
}
}
pub struct RecordPositionGrouper {
current_position_key: Option<PositionKeyTuple>,
current_group_key: Option<GroupKey>,
current_records: Vec<DecodedRecord>,
mc_validated: bool,
include_secondary_supplementary: bool,
}
impl RecordPositionGrouper {
#[must_use]
pub fn new() -> Self {
Self {
current_position_key: None,
current_group_key: None,
current_records: Vec::new(),
mc_validated: false,
include_secondary_supplementary: false,
}
}
#[must_use]
pub fn with_secondary_supplementary() -> Self {
Self { include_secondary_supplementary: true, ..Self::new() }
}
fn validate_mc_tag(&mut self, decoded: &DecodedRecord) -> io::Result<()> {
use crate::sort::bam_fields;
use crate::unified_pipeline::DecodedRecordData;
if self.mc_validated {
return Ok(());
}
match &decoded.data {
DecodedRecordData::Raw(raw) => {
let flg = bam_fields::flags(raw);
let is_paired = (flg & bam_fields::flags::PAIRED) != 0;
let is_secondary = (flg & bam_fields::flags::SECONDARY) != 0;
let is_supplementary = (flg & bam_fields::flags::SUPPLEMENTARY) != 0;
let is_unmapped = (flg & bam_fields::flags::UNMAPPED) != 0;
let is_mate_unmapped = (flg & bam_fields::flags::MATE_UNMAPPED) != 0;
if is_paired
&& !is_secondary
&& !is_supplementary
&& !is_unmapped
&& !is_mate_unmapped
{
if bam_fields::find_mc_tag_in_record(raw).is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"RecordPositionGrouper requires MC tags on paired-end reads. \
Run `fgumi zipper` to add MC tags before `fgumi group`.",
));
}
self.mc_validated = true;
}
}
DecodedRecordData::Parsed(record) => {
let flags = record.flags();
if flags.is_segmented()
&& !flags.is_secondary()
&& !flags.is_supplementary()
&& !flags.is_unmapped()
&& !flags.is_mate_unmapped()
{
let mc_tag = Tag::from(SamTag::MC);
if record.data().get(&mc_tag).is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"RecordPositionGrouper requires MC tags on paired-end reads. \
Run `fgumi zipper` to add MC tags before `fgumi group`.",
));
}
self.mc_validated = true;
}
}
}
Ok(())
}
fn process_record(&mut self, decoded: DecodedRecord) -> io::Result<Option<RawPositionGroup>> {
if decoded.key.ref_id1 == GroupKey::UNKNOWN_REF && !self.include_secondary_supplementary {
return Ok(None);
}
self.validate_mc_tag(&decoded)?;
let record_pos_key = decoded.key.position_key();
match self.current_position_key {
Some(current_pos) if current_pos == record_pos_key => {
self.current_records.push(decoded);
Ok(None)
}
Some(_)
if self
.current_records
.last()
.is_some_and(|last| last.key.name_hash == decoded.key.name_hash) =>
{
self.current_records.push(decoded);
Ok(None)
}
Some(_) => {
let finished_records = std::mem::take(&mut self.current_records);
let finished_key = self
.current_group_key
.take()
.expect("current_group_key set when current_position_key is set");
let group = RawPositionGroup { group_key: finished_key, records: finished_records };
self.current_position_key = Some(record_pos_key);
self.current_group_key = Some(decoded.key);
self.current_records.push(decoded);
Ok(Some(group))
}
None => {
self.current_position_key = Some(record_pos_key);
self.current_group_key = Some(decoded.key);
self.current_records.push(decoded);
Ok(None)
}
}
}
}
impl Default for RecordPositionGrouper {
fn default() -> Self {
Self::new()
}
}
impl Grouper for RecordPositionGrouper {
type Group = RawPositionGroup;
fn add_records(&mut self, records: Vec<DecodedRecord>) -> io::Result<Vec<Self::Group>> {
let mut completed_groups = Vec::new();
for decoded in records {
if let Some(group) = self.process_record(decoded)? {
completed_groups.push(group);
}
}
Ok(completed_groups)
}
fn finish(&mut self) -> io::Result<Option<Self::Group>> {
if !self.current_records.is_empty() {
debug_assert!(
self.current_group_key.is_some(),
"RecordPositionGrouper has {} buffered records but no group key",
self.current_records.len()
);
if let Some(key) = self.current_group_key.take() {
let records = std::mem::take(&mut self.current_records);
self.current_position_key = None;
return Ok(Some(RawPositionGroup { group_key: key, records }));
}
}
Ok(None)
}
fn has_pending(&self) -> bool {
!self.current_records.is_empty()
}
}
fn group_by_name_and_build<T>(
records: Vec<DecodedRecord>,
extract: impl Fn(DecodedRecord) -> io::Result<T>,
build: impl Fn(Vec<T>) -> anyhow::Result<Template>,
) -> io::Result<Vec<Template>> {
let mut templates = Vec::new();
let mut current_name_hash: Option<u64> = None;
let mut current_items: Vec<T> = Vec::new();
for decoded in records {
let name_hash = decoded.key.name_hash;
let item = extract(decoded)?;
match current_name_hash {
Some(h) if h == name_hash => {
current_items.push(item);
}
Some(_) => {
let template = build(std::mem::take(&mut current_items))
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
templates.push(template);
current_name_hash = Some(name_hash);
current_items.push(item);
}
None => {
current_name_hash = Some(name_hash);
current_items.push(item);
}
}
}
if !current_items.is_empty() {
let template =
build(current_items).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
templates.push(template);
}
Ok(templates)
}
pub fn build_templates_from_records(records: Vec<DecodedRecord>) -> io::Result<Vec<Template>> {
use crate::unified_pipeline::DecodedRecordData;
if records.is_empty() {
return Ok(Vec::new());
}
let raw_byte_mode = matches!(records[0].data, DecodedRecordData::Raw(_));
if raw_byte_mode {
group_by_name_and_build(
records,
|d| match d.data {
DecodedRecordData::Raw(v) => Ok(v),
DecodedRecordData::Parsed(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected raw-byte record but found parsed record",
)),
},
Template::from_raw_records,
)
} else {
group_by_name_and_build(
records,
|d| match d.data {
DecodedRecordData::Parsed(r) => Ok(r),
DecodedRecordData::Raw(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected parsed record but found raw-byte record",
)),
},
Template::from_records,
)
}
}
use crate::fastq_parse::{FastqRecord, parse_fastq_records, strip_read_suffix};
#[derive(Debug)]
pub struct FastqTemplate {
pub records: Vec<FastqRecord>,
pub name: Vec<u8>,
}
impl MemoryEstimate for FastqTemplate {
fn estimate_heap_size(&self) -> usize {
let records_heap: usize = self.records.iter().map(MemoryEstimate::estimate_heap_size).sum();
let records_vec_overhead = self.records.capacity() * std::mem::size_of::<FastqRecord>();
self.name.capacity() + records_heap + records_vec_overhead
}
}
pub struct FastqGrouper {
num_inputs: usize,
leftovers: Vec<Vec<u8>>,
pending_records: Vec<VecDeque<FastqRecord>>,
}
impl FastqGrouper {
#[must_use]
pub fn new(num_inputs: usize) -> Self {
log::debug!("FastqGrouper::new: num_inputs={num_inputs}");
Self {
num_inputs,
leftovers: vec![Vec::new(); num_inputs],
pending_records: (0..num_inputs).map(|_| VecDeque::new()).collect(),
}
}
pub fn add_bytes_for_stream(&mut self, stream_idx: usize, data: &[u8]) -> io::Result<()> {
log::trace!(
"FastqGrouper::add_bytes_for_stream: stream_idx={}, num_inputs={}, data_len={}",
stream_idx,
self.num_inputs,
data.len()
);
if stream_idx >= self.num_inputs {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
));
}
let combined = if self.leftovers[stream_idx].is_empty() {
data.to_vec()
} else {
let mut combined = std::mem::take(&mut self.leftovers[stream_idx]);
combined.extend_from_slice(data);
combined
};
let (records, leftover) = parse_fastq_records(&combined)?;
self.leftovers[stream_idx] = leftover;
self.pending_records[stream_idx].extend(records);
Ok(())
}
pub fn add_records_for_stream(
&mut self,
stream_idx: usize,
records: Vec<FastqRecord>,
) -> io::Result<()> {
log::trace!(
"FastqGrouper::add_records_for_stream: stream_idx={}, num_inputs={}, records_len={}",
stream_idx,
self.num_inputs,
records.len()
);
if stream_idx >= self.num_inputs {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Stream index {} out of range (max {})", stream_idx, self.num_inputs - 1),
));
}
self.pending_records[stream_idx].extend(records);
Ok(())
}
#[must_use]
pub fn has_leftover_bytes(&self) -> bool {
self.leftovers.iter().any(|l| !l.is_empty())
}
pub fn drain_complete_templates(&mut self) -> io::Result<Vec<FastqTemplate>> {
let mut templates = Vec::new();
while self.pending_records.iter().all(|q| !q.is_empty()) {
let base_name = {
let names: Vec<&[u8]> = self
.pending_records
.iter()
.map(|q| {
q.front()
.expect("pending queue must be non-empty inside all-non-empty loop")
.name()
})
.collect();
let base_name = strip_read_suffix(names[0]).to_vec();
for (i, &name) in names.iter().enumerate().skip(1) {
let other_base = strip_read_suffix(name);
if base_name != other_base {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"FASTQ files out of sync: stream 0 has '{}', stream {} has '{}'",
String::from_utf8_lossy(&base_name),
i,
String::from_utf8_lossy(other_base),
),
));
}
}
base_name };
let records: Vec<_> = self
.pending_records
.iter_mut()
.map(|q| {
q.pop_front()
.expect("pending queue must be non-empty inside all-non-empty loop")
})
.collect();
templates.push(FastqTemplate { name: base_name, records });
}
Ok(templates)
}
#[must_use]
pub fn has_pending(&self) -> bool {
self.leftovers.iter().any(|l| !l.is_empty())
|| self.pending_records.iter().any(|q| !q.is_empty())
}
pub fn finish(&mut self) -> io::Result<Option<FastqTemplate>> {
let templates = self.drain_complete_templates()?;
if templates.len() == 1 {
return Ok(templates.into_iter().next());
} else if templates.len() > 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Multiple templates remaining at finish",
));
}
if self.leftovers.iter().any(|l| !l.is_empty()) {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Incomplete FASTQ record at EOF",
));
}
if self.pending_records.iter().any(|q| !q.is_empty()) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unmatched FASTQ records at EOF - files out of sync",
));
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_single_record_grouper_empty() {
let mut grouper = SingleRecordGrouper::new();
assert!(!grouper.has_pending());
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_single_raw_record_grouper_empty() {
let mut grouper = SingleRawRecordGrouper::new();
assert!(!grouper.has_pending());
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_single_raw_record_grouper_emits_each_record() {
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let mut grouper = SingleRawRecordGrouper::new();
let raw1 = vec![1u8; 36];
let raw2 = vec![2u8; 36];
let records = vec![
DecodedRecord::from_raw_bytes(raw1.clone(), GroupKey::default()),
DecodedRecord::from_raw_bytes(raw2.clone(), GroupKey::default()),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 2);
assert_eq!(groups[0], raw1);
assert_eq!(groups[1], raw2);
}
#[test]
fn test_single_raw_record_grouper_rejects_parsed() {
use crate::sam::builder::RecordBuilder;
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let mut grouper = SingleRawRecordGrouper::new();
let rec = RecordBuilder::new().sequence("ACGT").build();
let records = vec![DecodedRecord::new(rec, GroupKey::default())];
let result = grouper.add_records(records);
assert!(result.is_err());
}
#[test]
fn test_fastq_grouper_paired() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n")
.expect("add_bytes_for_stream failed");
grouper
.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n")
.expect("add_bytes_for_stream failed");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert_eq!(templates.len(), 1);
assert_eq!(templates[0].name, b"read1");
assert_eq!(templates[0].records.len(), 2);
assert_eq!(templates[0].records[0].sequence(), b"ACGT");
assert_eq!(templates[0].records[1].sequence(), b"TGCA");
}
#[test]
fn test_fastq_grouper_multiple_templates() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n@read2/1\nAAAA\n+\nIIII\n")
.expect("add_bytes_for_stream should succeed for R1");
grouper
.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n@read2/2\nTTTT\n+\nJJJJ\n")
.expect("add_bytes_for_stream should succeed for R2");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert_eq!(templates.len(), 2);
assert_eq!(templates[0].name, b"read1");
assert_eq!(templates[1].name, b"read2");
}
#[test]
fn test_fastq_grouper_partial_then_complete() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n")
.expect("add_bytes_for_stream failed");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert!(templates.is_empty());
assert!(grouper.has_pending());
grouper
.add_bytes_for_stream(1, b"@read1/2\nTGCA\n+\nJJJJ\n")
.expect("add_bytes_for_stream failed");
let templates =
grouper.drain_complete_templates().expect("drain_complete_templates failed");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_fastq_grouper_finish_empty() {
let mut grouper = FastqGrouper::new(2);
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_fastq_grouper_out_of_sync() {
let mut grouper = FastqGrouper::new(2);
grouper
.add_bytes_for_stream(0, b"@read1/1\nACGT\n+\nIIII\n")
.expect("add_bytes_for_stream failed");
grouper
.add_bytes_for_stream(1, b"@read2/2\nTGCA\n+\nJJJJ\n")
.expect("add_bytes_for_stream failed");
let result = grouper.drain_complete_templates();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("out of sync"));
}
use crate::sam::builder::RecordBuilder;
fn make_decoded(
key: GroupKey,
paired: bool,
first_segment: bool,
mc: Option<&str>,
) -> DecodedRecord {
let mut builder = RecordBuilder::new()
.name("read1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.paired(paired);
if first_segment {
builder = builder.first_segment(true);
}
if let Some(mc_val) = mc {
builder = builder.tag("MC", mc_val);
}
DecodedRecord::new(builder.build(), key)
}
fn make_secondary_decoded(name_hash: u64) -> DecodedRecord {
let key = GroupKey { name_hash, ..GroupKey::default() };
let record = RecordBuilder::new()
.name("read1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.secondary(true)
.build();
DecodedRecord::new(record, key)
}
#[test]
fn test_record_position_grouper_empty() {
let mut grouper = RecordPositionGrouper::new();
assert!(!grouper.has_pending());
let result = grouper.finish().expect("finish should succeed");
assert!(result.is_none());
}
#[test]
fn test_record_position_grouper_single_unpaired_record() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let decoded = make_decoded(key, false, false, None);
let groups = grouper.add_records(vec![decoded]).expect("add_records should succeed");
assert!(groups.is_empty()); assert!(grouper.has_pending());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 1);
assert_eq!(final_group.group_key.ref_id1, 0);
assert_eq!(final_group.group_key.pos1, 100);
}
#[test]
fn test_record_position_grouper_same_position_multiple_records() {
let mut grouper = RecordPositionGrouper::new();
let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 11111);
let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 22222);
let key3 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 33333);
let records = vec![
make_decoded(key1, true, true, Some("4M")),
make_decoded(key2, true, true, Some("4M")),
make_decoded(key3, true, true, Some("4M")),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 3);
}
#[test]
fn test_record_position_grouper_different_positions() {
let mut grouper = RecordPositionGrouper::new();
let key_pos1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
let key_pos2 = GroupKey::single(0, 200, 0, 0, 0, 22222);
let key_pos3 = GroupKey::single(0, 300, 0, 0, 0, 33333);
let records = vec![
make_decoded(key_pos1, false, false, None),
make_decoded(key_pos2, false, false, None),
make_decoded(key_pos3, false, false, None),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].group_key.pos1, 100);
assert_eq!(groups[0].records.len(), 1);
assert_eq!(groups[1].group_key.pos1, 200);
assert_eq!(groups[1].records.len(), 1);
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.group_key.pos1, 300);
}
#[test]
fn test_record_position_grouper_skips_secondary() {
let mut grouper = RecordPositionGrouper::new();
let primary_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
let records = vec![
make_decoded(primary_key, false, false, None),
make_secondary_decoded(11111), ];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 1); }
#[test]
fn test_record_position_grouper_paired_same_position_key() {
let mut grouper = RecordPositionGrouper::new();
let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
assert_eq!(r1_key.position_key(), r2_key.position_key());
let records = vec![
make_decoded(r1_key, true, true, Some("4M")),
make_decoded(r2_key, true, false, Some("4M")),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 2);
}
#[test]
fn test_record_position_grouper_groups_records_by_position() {
let mut grouper = RecordPositionGrouper::new();
let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
let key4 = GroupKey::single(0, 200, 0, 0, 0, 44444);
let key5 = GroupKey::single(0, 200, 0, 0, 0, 55555);
let key6 = GroupKey::single(0, 300, 0, 0, 0, 66666);
let records = vec![
make_decoded(key1, false, false, None),
make_decoded(key2, false, false, None),
make_decoded(key3, false, false, None),
make_decoded(key4, false, false, None),
make_decoded(key5, false, false, None),
make_decoded(key6, false, false, None),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].records.len(), 3);
assert_eq!(groups[1].records.len(), 2);
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 1);
}
#[test]
fn test_record_position_grouper_coalesces_unmapped_mate_by_name_hash() {
let mut grouper = RecordPositionGrouper::new();
let name_hash = 12345_u64;
let r1_key = GroupKey::single(5, 100, 0, 0, 0, name_hash);
let r1 = {
let record = RecordBuilder::new()
.name("read1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(5)
.alignment_start(100)
.paired(true)
.first_segment(true)
.mate_unmapped(true)
.build();
DecodedRecord::new(record, r1_key)
};
let r2_key = GroupKey::single(-1, 0, 0, 0, 0, name_hash);
let r2 = {
let record = RecordBuilder::new()
.name("read1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.paired(true)
.first_segment(false)
.unmapped(true)
.build();
DecodedRecord::new(record, r2_key)
};
assert_ne!(r1_key.position_key(), r2_key.position_key());
let groups = grouper.add_records(vec![r1, r2]).expect("add_records should succeed");
assert!(groups.is_empty());
let final_group =
grouper.finish().expect("finish should succeed").expect("should emit final group");
assert_eq!(final_group.records.len(), 2, "R1 and R2 should be in the same group");
assert_eq!(final_group.group_key.ref_id1, 5, "Group key should use R1's position");
}
#[test]
fn test_record_position_grouper_does_not_coalesce_different_name_hash() {
let mut grouper = RecordPositionGrouper::new();
let r1_key = GroupKey::single(0, 100, 0, 0, 0, 11111);
let r2_key = GroupKey::single(0, 200, 0, 0, 0, 22222);
let records = vec![
make_decoded(r1_key, false, false, None),
make_decoded(r2_key, false, false, None),
];
let groups = grouper.add_records(records).expect("add_records should succeed");
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].records.len(), 1);
}
#[test]
fn test_record_position_grouper_mc_validation_skips_unmapped_mate() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let record = RecordBuilder::new()
.name("read1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.paired(true)
.first_segment(true)
.mate_unmapped(true)
.build();
let decoded = DecodedRecord::new(record, key);
let result = grouper.add_records(vec![decoded]);
assert!(result.is_ok());
assert!(!grouper.mc_validated); }
#[test]
fn test_record_position_grouper_mc_validation_fails_without_mc() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let decoded = make_decoded(key, true, true, None);
let result = grouper.add_records(vec![decoded]);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("MC tags"), "Error should mention MC tags: {err_msg}");
}
#[test]
fn test_record_position_grouper_mc_validation_passes_with_mc() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let decoded = make_decoded(key, true, true, Some("4M"));
let result = grouper.add_records(vec![decoded]);
assert!(result.is_ok());
}
#[test]
fn test_record_position_grouper_mc_validation_skips_unpaired() {
let mut grouper = RecordPositionGrouper::new();
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let decoded = make_decoded(key, false, false, None);
let result = grouper.add_records(vec![decoded]);
assert!(result.is_ok());
assert!(!grouper.mc_validated); }
#[test]
fn test_record_position_grouper_default_impl() {
let grouper = RecordPositionGrouper::default();
assert!(!grouper.has_pending());
}
#[test]
fn test_build_templates_empty() {
let result = build_templates_from_records(vec![]).expect("build templates from records");
assert!(result.is_empty());
}
#[test]
fn test_build_templates_single_record() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let record = RecordBuilder::new()
.name("read1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.mapping_quality(60)
.paired(true)
.first_segment(true)
.build();
let decoded = DecodedRecord::new(record, key);
let templates =
build_templates_from_records(vec![decoded]).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_build_templates_paired_same_name_hash() {
let r1_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r2_key = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r1 = DecodedRecord::new(
RecordBuilder::new()
.name("read1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.paired(true)
.first_segment(true)
.build(),
r1_key,
);
let r2 = DecodedRecord::new(
RecordBuilder::new()
.name("read1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(200)
.paired(true)
.reverse_complement(true)
.build(),
r2_key,
);
let templates =
build_templates_from_records(vec![r1, r2]).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_build_templates_multiple_qnames() {
let key1 = GroupKey::single(0, 100, 0, 0, 0, 11111);
let key2 = GroupKey::single(0, 100, 0, 0, 0, 22222);
let key3 = GroupKey::single(0, 100, 0, 0, 0, 33333);
let records: Vec<DecodedRecord> = vec![
DecodedRecord::new(
RecordBuilder::new()
.name("readA")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.build(),
key1,
),
DecodedRecord::new(
RecordBuilder::new()
.name("readB")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.build(),
key2,
),
DecodedRecord::new(
RecordBuilder::new()
.name("readC")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.cigar("4M")
.reference_sequence_id(0)
.alignment_start(100)
.build(),
key3,
),
];
let templates =
build_templates_from_records(records).expect("build templates from records");
assert_eq!(templates.len(), 3);
}
#[test]
fn test_build_templates_from_raw_bytes() {
use crate::sort::bam_fields;
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let raw = bam_fields::make_bam_bytes(
0, 100, bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT, b"read1", &[bam_fields::encode_op(0, 4)], 4, 0, 200, &[], );
let decoded = DecodedRecord::from_raw_bytes(raw, key);
let templates =
build_templates_from_records(vec![decoded]).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_build_templates_from_raw_bytes_paired() {
use crate::sort::bam_fields;
use crate::unified_pipeline::{DecodedRecord, GroupKey};
let key1 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let key2 = GroupKey::paired(0, 100, 0, 0, 200, 1, 0, 0, 12345);
let r1 = bam_fields::make_bam_bytes(
0,
100,
bam_fields::flags::PAIRED | bam_fields::flags::FIRST_SEGMENT,
b"read1",
&[bam_fields::encode_op(0, 4)],
4,
0,
200,
&[],
);
let r2 = bam_fields::make_bam_bytes(
0,
200,
bam_fields::flags::PAIRED
| bam_fields::flags::LAST_SEGMENT
| bam_fields::flags::REVERSE,
b"read1",
&[bam_fields::encode_op(0, 4)],
4,
0,
100,
&[],
);
let records =
vec![DecodedRecord::from_raw_bytes(r1, key1), DecodedRecord::from_raw_bytes(r2, key2)];
let templates =
build_templates_from_records(records).expect("build templates from records");
assert_eq!(templates.len(), 1);
}
#[test]
fn test_raw_position_group_batch_weight() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let records = vec![
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None),
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 22222), false, false, None),
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 33333), false, false, None),
make_decoded(GroupKey::single(0, 100, 0, 0, 0, 44444), false, false, None),
];
let group = RawPositionGroup { group_key: key, records };
assert_eq!(group.batch_weight(), 2);
}
#[test]
fn test_raw_position_group_batch_weight_single() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let records =
vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
let group = RawPositionGroup { group_key: key, records };
assert_eq!(group.batch_weight(), 1);
}
#[test]
fn test_raw_position_group_memory_estimate() {
let key = GroupKey::single(0, 100, 0, 0, 0, 12345);
let records =
vec![make_decoded(GroupKey::single(0, 100, 0, 0, 0, 11111), false, false, None)];
let group = RawPositionGroup { group_key: key, records };
assert!(group.estimate_heap_size() > 0);
}
}