use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use crate::assigner::{PairedUmiAssigner, Strategy, UmiAssigner};
use crate::bam_io::{create_bam_reader_for_pipeline, is_stdin_path};
use crate::grouper::{
FilterMetrics, RawPositionGroup, RecordPositionGrouper, build_templates_from_records,
};
use crate::logging::OperationTimer;
use crate::metrics::group::FamilySizeMetrics;
use crate::read_info::LibraryIndex;
use crate::sam::SamTag;
use crate::sam::{is_template_coordinate_sorted, unclipped_five_prime_position};
use crate::template::{MoleculeId, Template};
use crate::umi::{UmiValidation, validate_umi};
use crate::unified_pipeline::{
BatchWeight, GroupKeyConfig, Grouper, MemoryEstimate, run_bam_pipeline_from_reader,
serialize_bam_records_into,
};
use crate::validation::validate_file_exists;
use ahash::AHashMap;
use anyhow::{Context, Result, bail};
use bstr::BString;
use clap::Parser;
use fgoxide::io::DelimFile;
use log::info;
use noodles::sam::Header;
use noodles::sam::alignment::record::Flags;
use noodles::sam::alignment::record::data::field::Tag;
use noodles::sam::alignment::record_buf::data::field::value::Value as DataValue;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::commands::command::Command;
use crate::commands::common::{
BamIoOptions, CompressionOptions, QueueMemoryOptions, SchedulerOptions, ThreadingOptions,
build_pipeline_config, parse_bool,
};
use crate::sort::PA_TAG;
use crate::sort::bam_fields;
const DUPLICATE_FLAG: u16 = 0x400;
#[derive(Debug, Default, Clone)]
pub struct DedupMetrics {
pub total_templates: u64,
pub duplicate_templates: u64,
pub unique_templates: u64,
pub total_reads: u64,
pub duplicate_reads: u64,
pub unique_reads: u64,
pub secondary_reads: u64,
pub supplementary_reads: u64,
pub missing_pa_tag: u64,
pub filter_metrics: FilterMetrics,
}
impl DedupMetrics {
pub fn merge(&mut self, other: &DedupMetrics) {
self.total_templates += other.total_templates;
self.duplicate_templates += other.duplicate_templates;
self.unique_templates += other.unique_templates;
self.total_reads += other.total_reads;
self.duplicate_reads += other.duplicate_reads;
self.unique_reads += other.unique_reads;
self.secondary_reads += other.secondary_reads;
self.supplementary_reads += other.supplementary_reads;
self.missing_pa_tag += other.missing_pa_tag;
self.filter_metrics.merge(&other.filter_metrics);
}
#[must_use]
pub fn duplicate_rate(&self) -> f64 {
if self.total_templates == 0 {
0.0
} else {
self.duplicate_templates as f64 / self.total_templates as f64
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DedupMetricsOutput {
total_templates: u64,
unique_templates: u64,
duplicate_templates: u64,
duplicate_rate: f64,
total_reads: u64,
unique_reads: u64,
duplicate_reads: u64,
secondary_reads: u64,
supplementary_reads: u64,
missing_pa_tag: u64,
}
impl From<&DedupMetrics> for DedupMetricsOutput {
fn from(m: &DedupMetrics) -> Self {
Self {
total_templates: m.total_templates,
unique_templates: m.unique_templates,
duplicate_templates: m.duplicate_templates,
duplicate_rate: m.duplicate_rate(),
total_reads: m.total_reads,
unique_reads: m.unique_reads,
duplicate_reads: m.duplicate_reads,
secondary_reads: m.secondary_reads,
supplementary_reads: m.supplementary_reads,
missing_pa_tag: m.missing_pa_tag,
}
}
}
#[derive(Default, Debug)]
struct CollectedDedupMetrics {
dedup_metrics: DedupMetrics,
family_sizes: AHashMap<usize, u64>,
}
pub struct ProcessedDedupGroup {
pub templates: Vec<Template>,
pub family_sizes: AHashMap<usize, u64>,
pub dedup_metrics: DedupMetrics,
pub input_record_count: u64,
}
impl BatchWeight for ProcessedDedupGroup {
fn batch_weight(&self) -> usize {
self.templates.len()
}
}
impl MemoryEstimate for ProcessedDedupGroup {
fn estimate_heap_size(&self) -> usize {
self.templates.iter().map(MemoryEstimate::estimate_heap_size).sum::<usize>()
+ self.templates.capacity() * std::mem::size_of::<Template>()
+ self.family_sizes.capacity() * std::mem::size_of::<(usize, u64)>()
+ std::mem::size_of::<DedupMetrics>()
}
}
#[derive(Clone)]
struct DedupFilterConfig {
umi_tag: [u8; 2],
min_mapq: u8,
include_non_pf: bool,
min_umi_length: Option<usize>,
no_umi: bool,
}
#[inline]
fn score_template(template: &Template) -> i64 {
if template.is_raw_byte_mode() {
return score_template_raw(template);
}
let mut score: u32 = 0;
if let Some(r1) = template.r1() {
score += sum_base_qualities(r1);
}
if let Some(r2) = template.r2() {
score += sum_base_qualities(r2);
}
i64::from(score)
}
#[inline]
fn score_template_raw(template: &Template) -> i64 {
use crate::sort::bam_fields;
let mut score: u32 = 0;
for raw in [template.raw_r1(), template.raw_r2()].into_iter().flatten() {
if raw.len() < 32 {
continue;
}
let qo = bam_fields::qual_offset(raw);
let seq_len = bam_fields::l_seq(raw) as usize;
if qo >= raw.len() {
continue;
}
let max_len = std::cmp::min(seq_len, raw.len() - qo);
score +=
raw[qo..qo + max_len].iter().map(|&q| u32::from(std::cmp::min(q, 15))).sum::<u32>();
}
i64::from(score)
}
#[inline]
fn sum_base_qualities(record: &noodles::sam::alignment::RecordBuf) -> u32 {
record.quality_scores().as_ref().iter().map(|&q| u32::from(std::cmp::min(q, 15))).sum()
}
fn filter_template_raw(
template: &Template,
config: &DedupFilterConfig,
metrics: &mut FilterMetrics,
) -> bool {
use crate::sort::bam_fields;
let raw_r1 = template.raw_r1().filter(|r| r.len() >= bam_fields::MIN_BAM_RECORD_LEN);
let raw_r2 = template.raw_r2().filter(|r| r.len() >= bam_fields::MIN_BAM_RECORD_LEN);
metrics.total_templates += 1;
if raw_r1.is_none() && raw_r2.is_none() {
metrics.discarded_poor_alignment += 1;
return false;
}
let both_unmapped = raw_r1
.is_none_or(|r| (bam_fields::flags(r) & bam_fields::flags::UNMAPPED) != 0)
&& raw_r2.is_none_or(|r| (bam_fields::flags(r) & bam_fields::flags::UNMAPPED) != 0);
if both_unmapped {
metrics.discarded_poor_alignment += 1;
return false;
}
for raw in [raw_r1, raw_r2].into_iter().flatten() {
let flg = bam_fields::flags(raw);
if !config.include_non_pf && (flg & bam_fields::flags::QC_FAIL) != 0 {
metrics.discarded_non_pf += 1;
return false;
}
if (flg & bam_fields::flags::UNMAPPED) == 0 {
let mapq = bam_fields::mapq(raw);
if mapq < config.min_mapq {
metrics.discarded_poor_alignment += 1;
return false;
}
}
}
for raw in [raw_r1, raw_r2].into_iter().flatten() {
let flg = bam_fields::flags(raw);
let aux = bam_fields::aux_data_slice(raw);
let check_mq = (flg & bam_fields::flags::MATE_UNMAPPED) == 0;
let check_umi = !config.no_umi;
let mut found_mq: Option<i64> = None;
let mut found_umi: Option<&[u8]> = None;
let mut p = 0;
while p + 3 <= aux.len() {
let t = [aux[p], aux[p + 1]];
let val_type = aux[p + 2];
if check_umi && t == config.umi_tag && val_type == b'Z' {
let start = p + 3;
if let Some(end) = aux[start..].iter().position(|&b| b == 0) {
found_umi = Some(&aux[start..start + end]);
p = start + end + 1;
} else {
break;
}
if !check_mq || found_mq.is_some() {
break;
}
continue;
}
if check_mq && t == *b"MQ" {
found_mq = match val_type {
b'C' if p + 3 < aux.len() => Some(i64::from(aux[p + 3])),
b'c' if p + 3 < aux.len() => Some(i64::from(aux[p + 3] as i8)),
b'S' if p + 5 <= aux.len() => {
Some(i64::from(u16::from_le_bytes([aux[p + 3], aux[p + 4]])))
}
b's' if p + 5 <= aux.len() => {
Some(i64::from(i16::from_le_bytes([aux[p + 3], aux[p + 4]])))
}
b'I' if p + 7 <= aux.len() => Some(i64::from(u32::from_le_bytes([
aux[p + 3],
aux[p + 4],
aux[p + 5],
aux[p + 6],
]))),
b'i' if p + 7 <= aux.len() => Some(i64::from(i32::from_le_bytes([
aux[p + 3],
aux[p + 4],
aux[p + 5],
aux[p + 6],
]))),
_ => None,
};
}
if let Some(size) = bam_fields::tag_value_size(val_type, &aux[p + 3..]) {
p += 3 + size;
} else {
break;
}
if (!check_umi || found_umi.is_some()) && (!check_mq || found_mq.is_some()) {
break;
}
}
if check_mq {
if let Some(mq) = found_mq {
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
if (mq as u8) < config.min_mapq {
metrics.discarded_poor_alignment += 1;
return false;
}
}
}
if config.no_umi {
continue;
}
if let Some(umi_bytes) = found_umi {
match validate_umi(umi_bytes) {
UmiValidation::ContainsN => {
metrics.discarded_ns_in_umi += 1;
return false;
}
UmiValidation::Valid(base_count) => {
if let Some(min_len) = config.min_umi_length {
if base_count < min_len {
metrics.discarded_umi_too_short += 1;
return false;
}
}
}
}
} else {
metrics.discarded_poor_alignment += 1;
return false;
}
}
metrics.accepted_templates += 1;
true
}
fn filter_template(
template: &Template,
config: &DedupFilterConfig,
metrics: &mut FilterMetrics,
) -> bool {
let r1 = template.r1();
let r2 = template.r2();
metrics.total_templates += 1;
if r1.is_none() && r2.is_none() {
metrics.discarded_poor_alignment += 1;
return false;
}
let both_unmapped =
r1.is_none_or(|r| r.flags().is_unmapped()) && r2.is_none_or(|r| r.flags().is_unmapped());
if both_unmapped {
metrics.discarded_poor_alignment += 1;
return false;
}
for record in [r1, r2].into_iter().flatten() {
let flags = record.flags();
if !config.include_non_pf && flags.is_qc_fail() {
metrics.discarded_non_pf += 1;
return false;
}
if !flags.is_unmapped() {
let mapq = record.mapping_quality().map_or(0, u8::from);
if mapq < config.min_mapq {
metrics.discarded_poor_alignment += 1;
return false;
}
}
}
for record in [r1, r2].into_iter().flatten() {
let flags = record.flags();
if !flags.is_mate_unmapped() {
if let Some(data) = record.data().get(b"MQ") {
if let Some(mq) = data.as_int() {
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
if (mq as u8) < config.min_mapq {
metrics.discarded_poor_alignment += 1;
return false;
}
}
}
}
if config.no_umi {
continue;
}
if let Some(DataValue::String(umi)) = record.data().get(&config.umi_tag) {
match validate_umi(umi) {
UmiValidation::ContainsN => {
metrics.discarded_ns_in_umi += 1;
return false;
}
UmiValidation::Valid(base_count) => {
if let Some(min_len) = config.min_umi_length {
if base_count < min_len {
metrics.discarded_umi_too_short += 1;
return false;
}
}
}
}
} else {
metrics.discarded_poor_alignment += 1;
return false;
}
}
metrics.accepted_templates += 1;
true
}
fn umi_for_read(umi: &str, is_r1_earlier: bool, assigner: &dyn UmiAssigner) -> Result<String> {
if assigner.split_templates_by_pair_orientation() {
if umi.bytes().all(|b| !b.is_ascii_lowercase()) {
Ok(umi.to_owned())
} else {
Ok(umi.to_uppercase())
}
} else {
let parts: Vec<&str> = umi.split('-').collect();
if parts.len() != 2 {
bail!("Paired strategy used but UMI did not contain 2 segments: {umi}");
}
let Some(paired) = assigner.as_any().downcast_ref::<PairedUmiAssigner>() else {
bail!("Expected PairedUmiAssigner")
};
let result = if is_r1_earlier {
format!(
"{}:{}-{}:{}",
paired.lower_read_umi_prefix(),
parts[0],
paired.higher_read_umi_prefix(),
parts[1]
)
} else {
format!(
"{}:{}-{}:{}",
paired.higher_read_umi_prefix(),
parts[0],
paired.lower_read_umi_prefix(),
parts[1]
)
};
Ok(result)
}
}
fn get_pair_orientation(template: &Template) -> (bool, bool) {
if template.is_raw_byte_mode() {
return get_pair_orientation_raw(template);
}
let r1_positive = template.r1().is_none_or(|r| !r.flags().is_reverse_complemented());
let r2_positive = template.r2().is_none_or(|r| !r.flags().is_reverse_complemented());
(r1_positive, r2_positive)
}
fn get_pair_orientation_raw(template: &Template) -> (bool, bool) {
let r1_positive =
template.raw_r1().is_none_or(|r| (bam_fields::flags(r) & bam_fields::flags::REVERSE) == 0);
let r2_positive =
template.raw_r2().is_none_or(|r| (bam_fields::flags(r) & bam_fields::flags::REVERSE) == 0);
(r1_positive, r2_positive)
}
fn is_r1_genomically_earlier(
r1: &noodles::sam::alignment::RecordBuf,
r2: &noodles::sam::alignment::RecordBuf,
) -> Result<bool> {
let ref1 = r1.reference_sequence_id().map_or(-1, |id| i32::try_from(id).unwrap_or(i32::MAX));
let ref2 = r2.reference_sequence_id().map_or(-1, |id| i32::try_from(id).unwrap_or(i32::MAX));
if ref1 != ref2 {
return Ok(ref1 < ref2);
}
let r1_pos = unclipped_five_prime_position(r1).unwrap_or(0);
let r2_pos = unclipped_five_prime_position(r2).unwrap_or(0);
Ok(r1_pos <= r2_pos)
}
fn is_r1_genomically_earlier_raw(r1: &[u8], r2: &[u8]) -> bool {
let ref1 = bam_fields::ref_id(r1);
let ref2 = bam_fields::ref_id(r2);
if ref1 != ref2 {
return ref1 < ref2;
}
let r1_pos = bam_fields::unclipped_5prime_from_raw_bam(r1);
let r2_pos = bam_fields::unclipped_5prime_from_raw_bam(r2);
r1_pos <= r2_pos
}
fn truncate_umis(umis: Vec<String>, min_umi_length: Option<usize>) -> Result<Vec<String>> {
match min_umi_length {
None => Ok(umis),
Some(min_len) => {
let min_length = umis.iter().map(String::len).min().unwrap_or(0);
if min_length < min_len {
bail!("UMI found shorter than expected ({min_length} < {min_len})");
}
Ok(umis.into_iter().map(|u| u[..min_len].to_string()).collect())
}
}
}
fn assign_umi_groups_for_indices(
templates: &mut [Template],
indices: &[usize],
assigner: &dyn UmiAssigner,
raw_tag: [u8; 2],
min_umi_length: Option<usize>,
no_umi: bool,
) -> Result<()> {
if indices.is_empty() {
return Ok(());
}
let mut umis = Vec::with_capacity(indices.len());
let raw_mode = templates[indices[0]].is_raw_byte_mode();
for &idx in indices {
let template = &templates[idx];
let processed_umi = if no_umi {
String::new()
} else {
let (umi_str_ref, is_r1_earlier) = if raw_mode {
let umi_bytes = if let Some(r1_raw) = template.raw_r1() {
let aux = bam_fields::aux_data_slice(r1_raw);
bam_fields::find_string_tag(aux, &raw_tag)
} else if let Some(r2_raw) = template.raw_r2() {
let aux = bam_fields::aux_data_slice(r2_raw);
bam_fields::find_string_tag(aux, &raw_tag)
} else {
None
};
let umi_bytes = umi_bytes.ok_or_else(|| anyhow::anyhow!("No UMI tag found"))?;
let umi_str = std::str::from_utf8(umi_bytes)
.map_err(|e| anyhow::anyhow!("Invalid UTF-8: {e}"))?;
let earlier = if let (Some(r1), Some(r2)) = (template.raw_r1(), template.raw_r2()) {
is_r1_genomically_earlier_raw(r1, r2)
} else {
true
};
(umi_str, earlier)
} else {
let umi_bytes = if let Some(r1) = template.r1() {
if let Some(DataValue::String(bytes)) = r1.data().get(&raw_tag) {
bytes
} else {
bail!("UMI tag is not a string");
}
} else if let Some(r2) = template.r2() {
if let Some(DataValue::String(bytes)) = r2.data().get(&raw_tag) {
bytes
} else {
bail!("UMI tag is not a string");
}
} else {
bail!("Template has no reads");
};
let umi_str = std::str::from_utf8(umi_bytes)
.map_err(|e| anyhow::anyhow!("Invalid UTF-8: {e}"))?;
let earlier = if let (Some(r1), Some(r2)) = (template.r1(), template.r2()) {
is_r1_genomically_earlier(r1, r2)?
} else {
true
};
(umi_str, earlier)
};
umi_for_read(umi_str_ref, is_r1_earlier, assigner)?
};
umis.push(processed_umi);
}
let truncated_umis = if no_umi { umis } else { truncate_umis(umis, min_umi_length)? };
let assignments = assigner.assign(&truncated_umis);
for (i, &idx) in indices.iter().enumerate() {
templates[idx].mi = assignments[i];
}
Ok(())
}
fn assign_umi_groups(
templates: &mut [Template],
assigner: &dyn UmiAssigner,
raw_tag: [u8; 2],
min_umi_length: Option<usize>,
no_umi: bool,
) -> Result<()> {
if assigner.split_templates_by_pair_orientation() {
let mut subgroups: AHashMap<(bool, bool), Vec<usize>> = AHashMap::new();
for (idx, template) in templates.iter().enumerate() {
let orientation = get_pair_orientation(template);
subgroups.entry(orientation).or_default().push(idx);
}
for indices in subgroups.values() {
assign_umi_groups_for_indices(
templates,
indices,
assigner,
raw_tag,
min_umi_length,
no_umi,
)?;
}
} else {
let all_indices: Vec<usize> = (0..templates.len()).collect();
assign_umi_groups_for_indices(
templates,
&all_indices,
assigner,
raw_tag,
min_umi_length,
no_umi,
)?;
}
Ok(())
}
fn mark_duplicates_in_family(templates: &mut [&mut Template], dedup_metrics: &mut DedupMetrics) {
match templates.len() {
0 => return,
1 => {
dedup_metrics.unique_templates += 1;
return;
}
2 => {
let s0 = score_template(templates[0]);
let s1 = score_template(templates[1]);
dedup_metrics.unique_templates += 1;
dedup_metrics.duplicate_templates += 1;
if s0 >= s1 {
mark_template_as_duplicate(templates[1], dedup_metrics);
} else {
mark_template_as_duplicate(templates[0], dedup_metrics);
}
return;
}
3 => {
let s0 = score_template(templates[0]);
let s1 = score_template(templates[1]);
let s2 = score_template(templates[2]);
dedup_metrics.unique_templates += 1;
dedup_metrics.duplicate_templates += 2;
if s0 >= s1 && s0 >= s2 {
mark_template_as_duplicate(templates[1], dedup_metrics);
mark_template_as_duplicate(templates[2], dedup_metrics);
} else if s1 >= s0 && s1 >= s2 {
mark_template_as_duplicate(templates[0], dedup_metrics);
mark_template_as_duplicate(templates[2], dedup_metrics);
} else {
mark_template_as_duplicate(templates[0], dedup_metrics);
mark_template_as_duplicate(templates[1], dedup_metrics);
}
return;
}
_ => {} }
let mut scores: Vec<(usize, i64)> =
templates.iter().enumerate().map(|(i, t)| (i, score_template(t))).collect();
scores.sort_by(|a, b| b.1.cmp(&a.1));
dedup_metrics.unique_templates += 1;
for (idx, _score) in scores.iter().skip(1) {
mark_template_as_duplicate(templates[*idx], dedup_metrics);
dedup_metrics.duplicate_templates += 1;
}
}
fn mark_template_as_duplicate(template: &mut Template, dedup_metrics: &mut DedupMetrics) {
if let Some(raw_records) = template.all_raw_records_mut() {
for raw in raw_records.iter_mut() {
let flg = bam_fields::flags(raw);
bam_fields::set_flags(raw, flg | DUPLICATE_FLAG);
dedup_metrics.duplicate_reads += 1;
}
} else {
for record in &mut template.records {
let current_flags = u16::from(record.flags());
let new_flags = Flags::from(current_flags | DUPLICATE_FLAG);
*record.flags_mut() = new_flags;
dedup_metrics.duplicate_reads += 1;
}
}
}
fn process_position_group(
group: RawPositionGroup,
filter_config: &DedupFilterConfig,
assigner: &dyn UmiAssigner,
raw_tag: [u8; 2],
min_umi_length: Option<usize>,
no_umi: bool,
) -> io::Result<ProcessedDedupGroup> {
let mut dedup_metrics = DedupMetrics::default();
let input_record_count = group.records.len() as u64;
let all_templates = build_templates_from_records(group.records)?;
let mut filter_metrics = FilterMetrics::new();
let raw_mode = all_templates.first().is_some_and(Template::is_raw_byte_mode);
let filtered_templates: Vec<Template> = all_templates
.into_iter()
.filter(|t| {
if raw_mode {
filter_template_raw(t, filter_config, &mut filter_metrics)
} else {
filter_template(t, filter_config, &mut filter_metrics)
}
})
.collect();
dedup_metrics.filter_metrics = filter_metrics;
if filtered_templates.is_empty() {
return Ok(ProcessedDedupGroup {
templates: Vec::new(),
family_sizes: AHashMap::new(),
dedup_metrics,
input_record_count,
});
}
let mut templates: Vec<Template> = filtered_templates
.into_iter()
.map(|mut t| {
if let Some(raw_records) = t.all_raw_records_mut() {
for raw in raw_records.iter_mut() {
let flg = bam_fields::flags(raw);
bam_fields::set_flags(raw, flg & !DUPLICATE_FLAG);
}
} else {
for record in &mut t.records {
let current_flags = u16::from(record.flags());
let new_flags = Flags::from(current_flags & !DUPLICATE_FLAG);
*record.flags_mut() = new_flags;
}
}
t
})
.collect();
if let Err(e) = assign_umi_groups(&mut templates, assigner, raw_tag, min_umi_length, no_umi) {
return Err(io::Error::new(io::ErrorKind::InvalidData, e));
}
templates.sort_by(|a, b| {
let a_idx = a.mi.to_vec_index();
let b_idx = b.mi.to_vec_index();
a_idx.cmp(&b_idx).then_with(|| a.name.cmp(&b.name))
});
let mut family_sizes: AHashMap<usize, u64> = AHashMap::with_capacity(50);
if !templates.is_empty() {
let mut family_boundaries: Vec<usize> = vec![0];
let mut current_mi = templates[0].mi.to_vec_index();
for (i, template) in templates.iter().enumerate().skip(1) {
let mi = template.mi.to_vec_index();
if mi != current_mi {
family_boundaries.push(i);
current_mi = mi;
}
}
family_boundaries.push(templates.len());
for window in family_boundaries.windows(2) {
let start = window[0];
let end = window[1];
let family_size = end - start;
if templates[start].mi.to_vec_index().is_none() {
continue;
}
*family_sizes.entry(family_size).or_insert(0) += 1;
let mut family_refs: Vec<&mut Template> = templates[start..end].iter_mut().collect();
mark_duplicates_in_family(&mut family_refs, &mut dedup_metrics);
}
}
let pa_tag_bytes: [u8; 2] = *PA_TAG.as_ref();
for template in &templates {
dedup_metrics.total_templates += 1;
if let Some(raw_records) = template.all_raw_records() {
for raw in raw_records {
dedup_metrics.total_reads += 1;
let flg = bam_fields::flags(raw);
let is_secondary = (flg & bam_fields::flags::SECONDARY) != 0;
let is_supplementary = (flg & bam_fields::flags::SUPPLEMENTARY) != 0;
if is_secondary {
dedup_metrics.secondary_reads += 1;
}
if is_supplementary {
dedup_metrics.supplementary_reads += 1;
}
if is_secondary || is_supplementary {
let aux = bam_fields::aux_data_slice(raw);
if bam_fields::find_tag_type(aux, &pa_tag_bytes).is_none() {
dedup_metrics.missing_pa_tag += 1;
}
}
}
} else {
for record in &template.records {
dedup_metrics.total_reads += 1;
let flags = record.flags();
let is_secondary = flags.is_secondary();
let is_supplementary = flags.is_supplementary();
if is_secondary {
dedup_metrics.secondary_reads += 1;
}
if is_supplementary {
dedup_metrics.supplementary_reads += 1;
}
if (is_secondary || is_supplementary) && record.data().get(&PA_TAG).is_none() {
dedup_metrics.missing_pa_tag += 1;
}
}
}
}
dedup_metrics.unique_reads = dedup_metrics.total_reads - dedup_metrics.duplicate_reads;
Ok(ProcessedDedupGroup { templates, family_sizes, dedup_metrics, input_record_count })
}
#[inline]
fn set_mi_tag_on_record(
record: &mut noodles::sam::alignment::RecordBuf,
mi: MoleculeId,
assign_tag: Tag,
base_mi: u64,
) {
if !mi.is_assigned() {
return;
}
let mi_string = mi.to_string_with_offset(base_mi);
record.data_mut().insert(assign_tag, DataValue::String(BString::from(mi_string)));
}
#[derive(Debug, Parser)]
#[command(
name = "dedup",
about = "\x1b[38;5;151m[DEDUP]\x1b[0m \x1b[36mMark or remove PCR duplicates using UMI information\x1b[0m",
long_about = r#"
Marks or removes PCR duplicates from a BAM file using UMI information.
Requires template-coordinate sorted input with `pa` tags on secondary/supplementary
reads (added by `fgumi zipper`).
Within each UMI family, the template with the highest sum of base qualities
is selected as the representative; all others are marked as duplicates.
# Input Requirements
- Must be processed with `fgumi zipper` (adds `pa` tag for secondary/supplementary reads)
- Must be sorted with `fgumi sort --order template-coordinate`
- UMI tags on reads (RX tag), unless `--no-umi` is specified
Note: Using `samtools sort` will NOT work correctly because it doesn't use the
`pa` tag for template-coordinate ordering of secondary/supplementary reads.
# Output Modes
- Mark only (default): Set duplicate flag (0x400) on non-representative reads
- Remove (--remove-duplicates): Exclude duplicate reads from output entirely
# Cell Barcodes
If the input data contains cell barcodes (e.g. from single-cell sequencing), reads at the same
genomic position are partitioned by cell barcode before deduplication. This ensures that reads from
different cells are never marked as duplicates of each other, even if they share a UMI sequence and
mapping position. The cell barcode is read from the standard `CB` tag. No
correction or error-handling is performed on cell barcodes; they must be corrected upstream.
"#
)]
pub struct MarkDuplicates {
#[command(flatten)]
pub io: BamIoOptions,
#[arg(short = 'm', long = "metrics")]
pub metrics: Option<PathBuf>,
#[arg(short = 'H', long = "family-size-histogram")]
pub family_size_histogram: Option<PathBuf>,
#[arg(short = 'r', long = "remove-duplicates", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub remove_duplicates: bool,
#[arg(short = 'q', long = "min-map-q")]
pub min_map_q: Option<u8>,
#[arg(short = 'n', long = "include-non-pf-reads", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub include_non_pf_reads: bool,
#[arg(short = 's', long = "strategy", value_enum, default_value = "adjacency")]
pub strategy: Strategy,
#[arg(short = 'e', long = "edits", default_value = "1")]
pub edits: u32,
#[arg(short = 'l', long = "min-umi-length")]
pub min_umi_length: Option<usize>,
#[command(flatten)]
pub threading: ThreadingOptions,
#[command(flatten)]
pub compression: CompressionOptions,
#[arg(long = "index-threshold", default_value = "100")]
pub index_threshold: usize,
#[arg(long = "no-umi", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub no_umi: bool,
#[command(flatten)]
pub scheduler_opts: SchedulerOptions,
#[command(flatten)]
pub queue_memory: QueueMemoryOptions,
}
impl Command for MarkDuplicates {
fn execute(&self, command_line: &str) -> Result<()> {
if self.min_umi_length.is_some() && matches!(self.strategy, Strategy::Paired) {
bail!("Paired strategy cannot be used with --min-umi-length");
}
if self.no_umi && matches!(self.strategy, Strategy::Paired) {
bail!("--no-umi cannot be used with --strategy paired");
}
let (effective_strategy, no_umi_edits_override) = if self.no_umi {
if !matches!(self.strategy, Strategy::Identity) {
info!("--no-umi mode: overriding strategy to identity");
}
(Strategy::Identity, true)
} else {
(self.strategy, false)
};
if !is_stdin_path(&self.io.input) {
validate_file_exists(&self.io.input, "input BAM file")?;
}
let min_mapq: u8 = self.min_map_q.unwrap_or(0);
let effective_edits =
if no_umi_edits_override || matches!(effective_strategy, Strategy::Identity) {
0
} else {
self.edits
};
let timer = OperationTimer::new("Marking duplicates");
info!("Starting dedup");
info!("Input: {}", self.io.input.display());
info!("Output: {}", self.io.output.display());
info!("Strategy: {effective_strategy:?}");
info!("Edits: {effective_edits}");
info!("Remove duplicates: {}", self.remove_duplicates);
if self.no_umi {
info!("No-UMI mode: deduplicating by position only");
}
if matches!(effective_strategy, Strategy::Adjacency | Strategy::Paired) {
info!("Index threshold: {}", self.index_threshold);
}
info!("{}", self.threading.log_message());
let (reader, header) = create_bam_reader_for_pipeline(&self.io.input)?;
if !is_template_coordinate_sorted(&header) {
bail!(
"Input BAM must be template-coordinate sorted.\n\n\
To prepare your BAM file, run:\n \
fgumi zipper -i mapped.bam -u unmapped.bam -r reference.fa -o merged.bam\n \
fgumi sort -i merged.bam -o sorted.bam --order template-coordinate"
);
}
info!("Template-coordinate sorted");
let header = crate::commands::common::add_pg_record(header, command_line)?;
let raw_tag: [u8; 2] = *SamTag::RX;
let cell_tag = Tag::from(SamTag::CB);
let assign_tag_bytes: [u8; 2] = *SamTag::MI;
let filter_config = DedupFilterConfig {
umi_tag: raw_tag,
min_mapq,
include_non_pf: self.include_non_pf_reads,
min_umi_length: self.min_umi_length,
no_umi: self.no_umi,
};
let collected_metrics: Arc<Mutex<CollectedDedupMetrics>> =
Arc::new(Mutex::new(CollectedDedupMetrics::default()));
let strategy = effective_strategy;
let index_threshold = self.index_threshold;
let min_umi_length = self.min_umi_length;
let no_umi = self.no_umi;
let remove_duplicates = self.remove_duplicates;
let collected_metrics_clone = Arc::clone(&collected_metrics);
let num_threads = self.threading.num_threads();
let mut pipeline_config = build_pipeline_config(
&self.scheduler_opts,
&self.compression,
&self.queue_memory,
num_threads,
)?;
info!("Scheduler: {:?}", self.scheduler_opts.strategy());
info!("Using pipeline with {num_threads} threads");
let library_index = LibraryIndex::from_header(&header);
pipeline_config.group_key_config = Some(GroupKeyConfig::new_raw(library_index, cell_tag));
let next_mi_base = std::sync::atomic::AtomicU64::new(0);
let _records_processed = run_bam_pipeline_from_reader(
pipeline_config,
reader,
header,
&self.io.output,
None,
move |_header: &Header| {
Box::new(RecordPositionGrouper::with_secondary_supplementary())
as Box<dyn Grouper<Group = RawPositionGroup> + Send>
},
move |group: RawPositionGroup| -> io::Result<ProcessedDedupGroup> {
let assigner = strategy.new_assigner_full(effective_edits, 1, index_threshold);
process_position_group(
group,
&filter_config,
assigner.as_ref(),
raw_tag,
min_umi_length,
no_umi,
)
},
move |processed: ProcessedDedupGroup,
header: &Header,
output: &mut Vec<u8>|
-> io::Result<u64> {
{
let mut agg = collected_metrics_clone.lock();
agg.dedup_metrics.merge(&processed.dedup_metrics);
for (size, count) in &processed.family_sizes {
*agg.family_sizes.entry(*size).or_insert(0) += count;
}
}
let input_record_count = processed.input_record_count;
let base_mi = next_mi_base.fetch_add(
processed.templates.len() as u64,
std::sync::atomic::Ordering::Relaxed,
);
let assign_tag = Tag::from(assign_tag_bytes);
output.reserve(processed.templates.len() * 2 * 400);
let raw_mode = processed.templates.first().is_some_and(Template::is_raw_byte_mode);
if raw_mode {
let mut scratch = Vec::with_capacity(512);
let mut mi_buf = String::with_capacity(16);
for template in &processed.templates {
let mi = template.mi;
let has_mi = mi.is_assigned();
if has_mi {
mi.write_with_offset(base_mi, &mut mi_buf);
}
let raw_records = template.all_raw_records();
debug_assert!(
raw_records.is_some(),
"raw_mode template missing raw_records"
);
if let Some(raw_records) = raw_records {
for raw in raw_records {
if remove_duplicates
&& (bam_fields::flags(raw) & DUPLICATE_FLAG) != 0
{
continue;
}
if has_mi {
scratch.clear();
scratch.extend_from_slice(raw);
bam_fields::update_string_tag(
&mut scratch,
&assign_tag_bytes,
mi_buf.as_bytes(),
);
let block_size = scratch.len() as u32;
output.extend_from_slice(&block_size.to_le_bytes());
output.extend_from_slice(&scratch);
} else {
let block_size = raw.len() as u32;
output.extend_from_slice(&block_size.to_le_bytes());
output.extend_from_slice(raw);
}
}
}
}
} else {
for template in processed.templates {
let mi = template.mi;
for mut record in template.records {
if remove_duplicates
&& (u16::from(record.flags()) & DUPLICATE_FLAG) != 0
{
continue;
}
set_mi_tag_on_record(&mut record, mi, assign_tag, base_mi);
serialize_bam_records_into(&[record], header, output)?;
}
}
}
Ok(input_record_count)
},
)?;
let aggregated = Arc::try_unwrap(collected_metrics)
.expect("bug: metrics Arc still shared after pipeline join")
.into_inner();
let final_metrics = aggregated.dedup_metrics;
let final_family_sizes = aggregated.family_sizes;
if let Some(metrics_path) = &self.metrics {
write_dedup_metrics(&final_metrics, metrics_path)?;
}
if let Some(histogram_path) = &self.family_size_histogram {
write_family_size_histogram(&final_family_sizes, histogram_path)?;
}
info!(
"Deduplication complete: {} templates ({} unique, {} duplicates, {:.2}% duplicate rate)",
final_metrics.total_templates,
final_metrics.unique_templates,
final_metrics.duplicate_templates,
final_metrics.duplicate_rate() * 100.0
);
if final_metrics.missing_pa_tag > 0 {
bail!(
"{} secondary/supplementary reads are missing the `pa` tag.\n\n\
The `pa` tag is required for correct UMI-aware deduplication of \
secondary and supplementary alignments. This tag is added by \
`fgumi zipper` during the merge of unmapped and mapped BAMs.\n\n\
To fix this, re-run your pipeline starting from `fgumi zipper`:\n \
fgumi zipper -i aligned.bam --unmapped unmapped.bam -r reference.fa -o merged.bam\n \
fgumi sort -i merged.bam -o sorted.bam --order template-coordinate\n \
fgumi dedup -i sorted.bam -o deduped.bam",
final_metrics.missing_pa_tag
);
}
timer.log_completion(final_metrics.total_reads);
Ok(())
}
}
fn write_dedup_metrics(metrics: &DedupMetrics, path: &PathBuf) -> Result<()> {
let output: DedupMetricsOutput = metrics.into();
DelimFile::default()
.write_tsv(path, [output])
.with_context(|| format!("Failed to write dedup metrics: {}", path.display()))?;
Ok(())
}
fn write_family_size_histogram(family_sizes: &AHashMap<usize, u64>, path: &PathBuf) -> Result<()> {
let metrics = FamilySizeMetrics::from_size_counts(family_sizes.iter().map(|(&s, &c)| (s, c)));
DelimFile::default()
.write_tsv(path, metrics)
.with_context(|| format!("Failed to write family size histogram: {}", path.display()))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sam::builder::RecordBuilder;
fn create_test_template(name: &str, qualities: &[u8]) -> Template {
let record = RecordBuilder::new()
.name(name)
.sequence("ACGT")
.qualities(qualities)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
Template::from_records(vec![record]).expect("test template construction should not fail")
}
#[test]
fn test_score_template() {
let template = create_test_template("q1", &[20, 20, 20, 20]);
let score = score_template(&template);
assert_eq!(score, 60);
}
#[test]
fn test_score_template_low_quality() {
let template = create_test_template("q1", &[10, 10, 10, 10]);
let score = score_template(&template);
assert_eq!(score, 40);
}
fn create_paired_test_template(
name: &str,
r1_qualities: &[u8],
r2_qualities: &[u8],
) -> Template {
let r1 = RecordBuilder::new()
.name(name)
.sequence("ACGT")
.qualities(r1_qualities)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
let r2 = RecordBuilder::new()
.name(name)
.sequence("TGCA")
.qualities(r2_qualities)
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.build();
Template::from_records(vec![r1, r2]).expect("test template construction should not fail")
}
#[test]
fn test_score_template_paired_end() {
let template = create_paired_test_template("q1", &[10, 10, 10, 10], &[10, 10, 10, 10]);
let score = score_template(&template);
assert_eq!(score, 80);
}
#[test]
fn test_score_template_paired_end_capped() {
let template = create_paired_test_template("q1", &[40, 40, 40, 40], &[40, 40, 40, 40]);
let score = score_template(&template);
assert_eq!(score, 120);
}
#[test]
fn test_score_template_paired_end_asymmetric() {
let template = create_paired_test_template("q1", &[5, 5, 5, 5], &[15, 15, 15, 15]);
let score = score_template(&template);
assert_eq!(score, 80);
}
#[test]
fn test_score_template_paired_end_mixed_capping() {
let template = create_paired_test_template("q1", &[10, 20, 5, 30], &[8, 12, 25, 3]);
let score = score_template(&template);
assert_eq!(score, 83);
}
#[test]
fn test_duplicate_rate_calculation() {
let metrics =
DedupMetrics { total_templates: 100, duplicate_templates: 25, ..Default::default() };
assert!((metrics.duplicate_rate() - 0.25).abs() < 0.001);
}
#[test]
fn test_duplicate_rate_zero_templates() {
let metrics = DedupMetrics::default();
assert!((metrics.duplicate_rate() - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_metrics_merge() {
let mut m1 =
DedupMetrics { total_templates: 10, duplicate_templates: 2, ..Default::default() };
let m2 = DedupMetrics { total_templates: 20, duplicate_templates: 5, ..Default::default() };
m1.merge(&m2);
assert_eq!(m1.total_templates, 30);
assert_eq!(m1.duplicate_templates, 7);
}
fn is_duplicate(template: &Template) -> bool {
template.records.iter().any(|r| (u16::from(r.flags()) & DUPLICATE_FLAG) != 0)
}
#[test]
fn test_mark_duplicates_empty_family() {
let mut metrics = DedupMetrics::default();
let mut templates: Vec<&mut Template> = vec![];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 0);
assert_eq!(metrics.duplicate_templates, 0);
}
#[test]
fn test_mark_duplicates_single_template() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[30, 30, 30, 30]);
let mut templates: Vec<&mut Template> = vec![&mut t1];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 0);
assert!(!is_duplicate(&t1));
}
#[test]
fn test_mark_duplicates_size_2_first_higher() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[15, 15, 15, 15]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 1);
assert!(!is_duplicate(&t1)); assert!(is_duplicate(&t2)); }
#[test]
fn test_mark_duplicates_size_2_second_higher() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[5, 5, 5, 5]); let mut t2 = create_test_template("q2", &[15, 15, 15, 15]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 1);
assert!(is_duplicate(&t1)); assert!(!is_duplicate(&t2)); }
#[test]
fn test_mark_duplicates_size_3_first_highest() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[15, 15, 15, 15]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut t3 = create_test_template("q3", &[5, 5, 5, 5]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 2);
assert!(!is_duplicate(&t1)); assert!(is_duplicate(&t2));
assert!(is_duplicate(&t3));
}
#[test]
fn test_mark_duplicates_size_3_second_highest() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[5, 5, 5, 5]); let mut t2 = create_test_template("q2", &[15, 15, 15, 15]); let mut t3 = create_test_template("q3", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 2);
assert!(is_duplicate(&t1));
assert!(!is_duplicate(&t2)); assert!(is_duplicate(&t3));
}
#[test]
fn test_mark_duplicates_size_3_third_highest() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[5, 5, 5, 5]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut t3 = create_test_template("q3", &[15, 15, 15, 15]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 2);
assert!(is_duplicate(&t1));
assert!(is_duplicate(&t2));
assert!(!is_duplicate(&t3)); }
#[test]
fn test_mark_duplicates_size_4_general_case() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[5, 5, 5, 5]); let mut t2 = create_test_template("q2", &[15, 15, 15, 15]); let mut t3 = create_test_template("q3", &[10, 10, 10, 10]); let mut t4 = create_test_template("q4", &[8, 8, 8, 8]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3, &mut t4];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 3);
assert!(is_duplicate(&t1));
assert!(!is_duplicate(&t2)); assert!(is_duplicate(&t3));
assert!(is_duplicate(&t4));
}
#[test]
fn test_mark_duplicates_counts_duplicate_reads() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[15, 15, 15, 15]); let mut t2 = create_test_template("q2", &[5, 5, 5, 5]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.duplicate_reads, 1); }
#[test]
fn test_mark_duplicates_tie_breaking_size_2() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[10, 10, 10, 10]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 1);
assert!(!is_duplicate(&t1));
assert!(is_duplicate(&t2));
}
#[test]
fn test_mark_duplicates_tie_breaking_size_3_all_equal() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[10, 10, 10, 10]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut t3 = create_test_template("q3", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 2);
assert!(!is_duplicate(&t1));
assert!(is_duplicate(&t2));
assert!(is_duplicate(&t3));
}
#[test]
fn test_mark_duplicates_tie_breaking_size_3_second_and_third_tie() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[5, 5, 5, 5]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut t3 = create_test_template("q3", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 2);
assert!(is_duplicate(&t1));
assert!(!is_duplicate(&t2));
assert!(is_duplicate(&t3));
}
#[test]
fn test_mark_duplicates_tie_breaking_size_4_all_equal() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[10, 10, 10, 10]); let mut t2 = create_test_template("q2", &[10, 10, 10, 10]); let mut t3 = create_test_template("q3", &[10, 10, 10, 10]); let mut t4 = create_test_template("q4", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3, &mut t4];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 3);
assert!(!is_duplicate(&t1));
assert!(is_duplicate(&t2));
assert!(is_duplicate(&t3));
assert!(is_duplicate(&t4));
}
#[test]
fn test_mark_duplicates_tie_breaking_size_4_partial_tie() {
let mut metrics = DedupMetrics::default();
let mut t1 = create_test_template("q1", &[5, 5, 5, 5]); let mut t2 = create_test_template("q2", &[8, 8, 8, 8]); let mut t3 = create_test_template("q3", &[10, 10, 10, 10]); let mut t4 = create_test_template("q4", &[10, 10, 10, 10]); let mut templates: Vec<&mut Template> = vec![&mut t1, &mut t2, &mut t3, &mut t4];
mark_duplicates_in_family(&mut templates, &mut metrics);
assert_eq!(metrics.unique_templates, 1);
assert_eq!(metrics.duplicate_templates, 3);
assert!(is_duplicate(&t1));
assert!(is_duplicate(&t2));
assert!(!is_duplicate(&t3));
assert!(is_duplicate(&t4));
}
fn default_filter_config() -> DedupFilterConfig {
DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
}
}
fn create_mapped_template_with_umi(name: &str, umi: &str, mapq: u8) -> Template {
let record = RecordBuilder::new()
.name(name)
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(mapq)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.tag("RX", umi.to_string())
.build();
Template::from_records(vec![record]).expect("test template construction should not fail")
}
#[test]
fn test_filter_template_accepts_valid_template() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACGTACGT", 30);
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_rejects_low_mapq() {
let config = default_filter_config(); let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACGTACGT", 10);
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_filter_template_rejects_umi_with_n() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACNTACGT", 30);
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_ns_in_umi, 1);
}
#[test]
fn test_filter_template_rejects_short_umi() {
let mut config = default_filter_config();
config.min_umi_length = Some(8); let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACGT", 30);
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_umi_too_short, 1);
}
#[test]
fn test_filter_template_rejects_missing_umi_tag() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let record = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
let template = Template::from_records(vec![record])
.expect("test template construction should not fail");
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_filter_template_rejects_qc_fail() {
let config = default_filter_config(); let mut metrics = FilterMetrics::new();
let record = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT | Flags::QC_FAIL)
.tag("RX", "ACGTACGT".to_string())
.build();
let template = Template::from_records(vec![record])
.expect("test template construction should not fail");
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_non_pf, 1);
}
#[test]
fn test_filter_template_accepts_qc_fail_when_included() {
let mut config = default_filter_config();
config.include_non_pf = true; let mut metrics = FilterMetrics::new();
let record = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT | Flags::QC_FAIL)
.tag("RX", "ACGTACGT".to_string())
.build();
let template = Template::from_records(vec![record])
.expect("test template construction should not fail");
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_rejects_unmapped() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let record = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT | Flags::UNMAPPED)
.tag("RX", "ACGTACGT".to_string())
.build();
let template = Template::from_records(vec![record])
.expect("test template construction should not fail");
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_filter_template_accepts_paired_umi_with_dash() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACGT-TGCA", 30);
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_umi_for_read_identity_uppercase() {
let assigner = Strategy::Identity.new_assigner_full(0, 1, 100);
let result =
umi_for_read("ACGTACGT", true, assigner.as_ref()).expect("valid UMI should not fail");
assert_eq!(result, "ACGTACGT");
}
#[test]
fn test_umi_for_read_identity_lowercase_gets_uppercased() {
let assigner = Strategy::Identity.new_assigner_full(0, 1, 100);
let result = umi_for_read("acgtacgt", true, assigner.as_ref())
.expect("valid lowercase UMI should not fail");
assert_eq!(result, "ACGTACGT");
}
#[test]
fn test_umi_for_read_paired_r1_earlier() {
let assigner = Strategy::Paired.new_assigner_full(1, 1, 100);
let result = umi_for_read("ACGT-TGCA", true, assigner.as_ref())
.expect("valid paired UMI should not fail");
assert_eq!(result, "aa:ACGT-bb:TGCA");
}
#[test]
fn test_umi_for_read_paired_r2_earlier() {
let assigner = Strategy::Paired.new_assigner_full(1, 1, 100);
let result = umi_for_read("ACGT-TGCA", false, assigner.as_ref())
.expect("valid paired UMI should not fail");
assert_eq!(result, "bb:ACGT-aa:TGCA");
}
#[test]
fn test_umi_for_read_paired_missing_dash_error() {
let assigner = Strategy::Paired.new_assigner_full(1, 1, 100);
let result = umi_for_read("ACGTACGT", true, assigner.as_ref());
assert!(result.is_err());
assert!(
result
.expect_err("should fail for missing dash")
.to_string()
.contains("did not contain 2 segments"),
"Error message should mention missing segments"
);
}
#[test]
fn test_get_pair_orientation_both_forward() {
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.tag("RX", "ACGT-TGCA".to_string())
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.tag("RX", "ACGT-TGCA".to_string())
.build();
let template = Template::from_records(vec![r1, r2])
.expect("test template construction should not fail");
let (r1_positive, r2_positive) = get_pair_orientation(&template);
assert!(r1_positive);
assert!(r2_positive);
}
#[test]
fn test_get_pair_orientation_r1_reverse() {
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT | Flags::REVERSE_COMPLEMENTED)
.tag("RX", "ACGT-TGCA".to_string())
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.tag("RX", "ACGT-TGCA".to_string())
.build();
let template = Template::from_records(vec![r1, r2])
.expect("test template construction should not fail");
let (r1_positive, r2_positive) = get_pair_orientation(&template);
assert!(!r1_positive);
assert!(r2_positive);
}
#[test]
fn test_get_pair_orientation_both_reverse() {
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT | Flags::REVERSE_COMPLEMENTED)
.tag("RX", "ACGT-TGCA".to_string())
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT | Flags::REVERSE_COMPLEMENTED)
.tag("RX", "ACGT-TGCA".to_string())
.build();
let template = Template::from_records(vec![r1, r2])
.expect("test template construction should not fail");
let (r1_positive, r2_positive) = get_pair_orientation(&template);
assert!(!r1_positive);
assert!(!r2_positive);
}
#[test]
fn test_is_r1_earlier_true() {
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.build();
assert!(is_r1_genomically_earlier(&r1, &r2).expect("mapped records should have positions"));
}
#[test]
fn test_is_r1_earlier_false() {
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.build();
assert!(
!is_r1_genomically_earlier(&r1, &r2).expect("mapped records should have positions")
);
}
#[test]
fn test_is_r1_earlier_equal_position() {
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.build();
assert!(is_r1_genomically_earlier(&r1, &r2).expect("mapped records should have positions"));
}
#[test]
fn test_truncate_umis_no_min_length() {
let umis = vec!["ACGTACGT".to_string(), "TGCATGCA".to_string()];
let result = truncate_umis(umis.clone(), None)
.expect("truncation with no min length should not fail");
assert_eq!(result, umis);
}
#[test]
fn test_truncate_umis_truncates() {
let umis = vec!["ACGTACGT".to_string(), "TGCATGCA".to_string()];
let result =
truncate_umis(umis, Some(4)).expect("truncation of 8-base UMIs to 4 should not fail");
assert_eq!(result, vec!["ACGT".to_string(), "TGCA".to_string()]);
}
#[test]
fn test_truncate_umis_error_too_short() {
let umis = vec!["ACG".to_string(), "TGCATGCA".to_string()];
let result = truncate_umis(umis, Some(4));
assert!(result.is_err());
assert!(
result
.expect_err("should fail for too-short UMI")
.to_string()
.contains("shorter than expected"),
"Error message should mention UMI being too short"
);
}
#[test]
fn test_set_mi_tag_assigned() {
let mut record =
RecordBuilder::new().name("q1").sequence("ACGT").qualities(&[30, 30, 30, 30]).build();
let mi = MoleculeId::Single(42);
let assign_tag = Tag::from(SamTag::MI);
set_mi_tag_on_record(&mut record, mi, assign_tag, 0);
let Some(DataValue::String(val)) = record.data().get(&assign_tag) else {
unreachable!("MI tag should be a string value");
};
assert_eq!(AsRef::<[u8]>::as_ref(val), b"42");
}
#[test]
fn test_set_mi_tag_unassigned() {
let mut record =
RecordBuilder::new().name("q1").sequence("ACGT").qualities(&[30, 30, 30, 30]).build();
let mi = MoleculeId::None;
let assign_tag = Tag::from(SamTag::MI);
set_mi_tag_on_record(&mut record, mi, assign_tag, 0);
let mi_value = record.data().get(&assign_tag);
assert!(mi_value.is_none(), "Unassigned MoleculeId should not set MI tag");
}
#[test]
fn test_set_mi_tag_with_base_offset() {
let mut record =
RecordBuilder::new().name("q1").sequence("ACGT").qualities(&[30, 30, 30, 30]).build();
let mi = MoleculeId::Single(42);
let assign_tag = Tag::from(SamTag::MI);
set_mi_tag_on_record(&mut record, mi, assign_tag, 100);
let Some(DataValue::String(val)) = record.data().get(&assign_tag) else {
unreachable!("MI tag should be a string value");
};
assert_eq!(AsRef::<[u8]>::as_ref(val), b"142");
}
fn create_paired_mapped_template_with_umi(
name: &str,
umi: &str,
r1_mapq: u8,
r2_mapq: u8,
) -> Template {
let r1 = RecordBuilder::new()
.name(name)
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(r1_mapq)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.tag("RX", umi.to_string())
.build();
let r2 = RecordBuilder::new()
.name(name)
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.mapping_quality(r2_mapq)
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.tag("RX", umi.to_string())
.build();
Template::from_records(vec![r1, r2]).expect("test template construction should not fail")
}
#[test]
fn test_filter_paired_template_accepts_valid() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let template = create_paired_mapped_template_with_umi("q1", "ACGTACGT", 30, 30);
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_paired_template_rejects_r2_low_mapq() {
let config = default_filter_config(); let mut metrics = FilterMetrics::new();
let template = create_paired_mapped_template_with_umi("q1", "ACGTACGT", 30, 10);
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_filter_paired_template_rejects_r2_umi_with_n() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let r1 = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.tag("RX", "ACGTACGT".to_string())
.build();
let r2 = RecordBuilder::new()
.name("q1")
.sequence("TGCA")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(200)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::LAST_SEGMENT)
.tag("RX", "ACNTACGT".to_string())
.build();
let template = Template::from_records(vec![r1, r2])
.expect("test template construction should not fail");
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_ns_in_umi, 1);
}
#[test]
fn test_filter_paired_no_reads_rejected() {
let config = default_filter_config();
let mut metrics = FilterMetrics::new();
let template = Template::new(b"empty".to_vec());
assert!(!filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_metrics_merge_all_fields() {
let mut m1 = DedupMetrics {
total_templates: 10,
duplicate_templates: 2,
unique_templates: 8,
total_reads: 20,
duplicate_reads: 4,
unique_reads: 16,
secondary_reads: 3,
supplementary_reads: 1,
missing_pa_tag: 1,
..Default::default()
};
let m2 = DedupMetrics {
total_templates: 5,
duplicate_templates: 1,
unique_templates: 4,
total_reads: 10,
duplicate_reads: 2,
unique_reads: 8,
secondary_reads: 2,
supplementary_reads: 3,
missing_pa_tag: 2,
..Default::default()
};
m1.merge(&m2);
assert_eq!(m1.total_templates, 15);
assert_eq!(m1.duplicate_templates, 3);
assert_eq!(m1.unique_templates, 12);
assert_eq!(m1.total_reads, 30);
assert_eq!(m1.duplicate_reads, 6);
assert_eq!(m1.unique_reads, 24);
assert_eq!(m1.secondary_reads, 5);
assert_eq!(m1.supplementary_reads, 4);
assert_eq!(m1.missing_pa_tag, 3);
}
#[test]
fn test_duplicate_rate_all_duplicates() {
let metrics =
DedupMetrics { total_templates: 50, duplicate_templates: 50, ..Default::default() };
assert!((metrics.duplicate_rate() - 1.0).abs() < f64::EPSILON);
}
#[test]
fn test_metrics_default() {
let metrics = DedupMetrics::default();
assert_eq!(metrics.total_templates, 0);
assert_eq!(metrics.duplicate_templates, 0);
assert_eq!(metrics.unique_templates, 0);
assert_eq!(metrics.total_reads, 0);
assert_eq!(metrics.duplicate_reads, 0);
assert_eq!(metrics.unique_reads, 0);
assert_eq!(metrics.secondary_reads, 0);
assert_eq!(metrics.supplementary_reads, 0);
assert_eq!(metrics.missing_pa_tag, 0);
}
#[test]
fn test_dedup_metrics_output_from() {
let metrics = DedupMetrics {
total_templates: 100,
duplicate_templates: 25,
unique_templates: 75,
total_reads: 200,
duplicate_reads: 50,
unique_reads: 150,
secondary_reads: 10,
supplementary_reads: 5,
missing_pa_tag: 2,
..Default::default()
};
let output = DedupMetricsOutput::from(&metrics);
assert_eq!(output.total_templates, 100);
assert_eq!(output.duplicate_templates, 25);
assert_eq!(output.unique_templates, 75);
assert!((output.duplicate_rate - 0.25).abs() < 0.001);
assert_eq!(output.total_reads, 200);
assert_eq!(output.duplicate_reads, 50);
assert_eq!(output.unique_reads, 150);
assert_eq!(output.secondary_reads, 10);
assert_eq!(output.supplementary_reads, 5);
assert_eq!(output.missing_pa_tag, 2);
}
fn make_raw_bam_record_truncated_aux() -> Vec<u8> {
let mut rec = vec![0u8; 40];
rec[8] = 4; rec[9] = 30; rec[12..14].copy_from_slice(&0u16.to_le_bytes()); rec[16..20].copy_from_slice(&1000u32.to_le_bytes()); rec[32..36].copy_from_slice(b"tst\0");
rec
}
#[test]
fn test_filter_template_raw_truncated_record_no_panic() {
let raw = make_raw_bam_record_truncated_aux();
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
let result = filter_template_raw(&template, &config, &mut metrics);
assert!(!result, "Truncated record should be rejected");
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_filter_template_raw_mq_tag_type_mismatch_bypasses_filter() {
let mut rec = vec![0u8; 46];
rec[0..4].copy_from_slice(&0i32.to_le_bytes()); rec[4..8].copy_from_slice(&100i32.to_le_bytes()); rec[8] = 4; rec[9] = 30; rec[12..14].copy_from_slice(&1u16.to_le_bytes()); rec[14..16].copy_from_slice(&(crate::sort::bam_fields::flags::PAIRED).to_le_bytes());
rec[16..20].copy_from_slice(&4u32.to_le_bytes()); rec[32..36].copy_from_slice(b"tst\0"); rec[36..40].copy_from_slice(&(4u32 << 4).to_le_bytes());
rec.extend_from_slice(b"RXZACGTACGT\x00");
rec.extend_from_slice(b"MQc"); rec.push(10);
let template = Template::from_raw_records(vec![rec])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
let result = filter_template_raw(&template, &config, &mut metrics);
assert!(!result, "Template with low mate MAPQ should be rejected");
assert_eq!(metrics.accepted_templates, 0);
assert_eq!(metrics.discarded_poor_alignment, 1);
}
fn make_raw_bam_for_dedup(
name: &[u8],
flag: u16,
mapq: u8,
seq_len: usize,
qualities: &[u8],
umi: &[u8],
) -> Vec<u8> {
use crate::sort::bam_fields;
let l_read_name = (name.len() + 1) as u8;
let cigar_ops: &[u32] = if (flag & bam_fields::flags::UNMAPPED) == 0 {
&[(seq_len as u32) << 4] } else {
&[]
};
let n_cigar_op = cigar_ops.len() as u16;
let seq_bytes = seq_len.div_ceil(2);
let total = 32 + l_read_name as usize + cigar_ops.len() * 4 + seq_bytes + seq_len;
let mut buf = vec![0u8; total];
buf[0..4].copy_from_slice(&0i32.to_le_bytes()); buf[4..8].copy_from_slice(&100i32.to_le_bytes()); buf[8] = l_read_name;
buf[9] = mapq;
buf[12..14].copy_from_slice(&n_cigar_op.to_le_bytes());
buf[14..16].copy_from_slice(&flag.to_le_bytes());
buf[16..20].copy_from_slice(&(seq_len as u32).to_le_bytes());
buf[20..24].copy_from_slice(&(-1i32).to_le_bytes()); buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
let name_start = 32;
buf[name_start..name_start + name.len()].copy_from_slice(name);
buf[name_start + name.len()] = 0;
let cigar_start = name_start + l_read_name as usize;
for (i, &op) in cigar_ops.iter().enumerate() {
let off = cigar_start + i * 4;
buf[off..off + 4].copy_from_slice(&op.to_le_bytes());
}
let qo = bam_fields::qual_offset(&buf);
let copy_len = std::cmp::min(qualities.len(), seq_len);
buf[qo..qo + copy_len].copy_from_slice(&qualities[..copy_len]);
buf.extend_from_slice(b"RXZ");
buf.extend_from_slice(umi);
buf.push(0);
buf
}
#[test]
fn test_score_template_raw_basic() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED | crate::sort::bam_fields::flags::FIRST_SEGMENT,
30,
4,
&[20, 20, 20, 20],
b"ACGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
assert_eq!(score_template_raw(&template), 60);
}
#[test]
fn test_score_template_raw_low_quality() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED | crate::sort::bam_fields::flags::FIRST_SEGMENT,
30,
4,
&[10, 10, 10, 10],
b"ACGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
assert_eq!(score_template_raw(&template), 40);
}
#[test]
fn test_score_template_raw_paired() {
let r1 = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED | crate::sort::bam_fields::flags::FIRST_SEGMENT,
30,
4,
&[20, 20, 20, 20],
b"ACGT",
);
let r2 = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED | crate::sort::bam_fields::flags::LAST_SEGMENT,
30,
4,
&[10, 10, 10, 10],
b"ACGT",
);
let template = Template::from_raw_records(vec![r1, r2])
.expect("test template construction should not fail");
assert_eq!(score_template_raw(&template), 100);
}
#[test]
fn test_score_template_raw_matches_recordbuf() {
let qualities = &[20, 15, 10, 5];
let template_rb = create_test_template("q1", qualities);
let template_raw = {
let raw = make_raw_bam_for_dedup(
b"q01",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT,
30,
4,
qualities,
b"ACGT",
);
Template::from_raw_records(vec![raw])
.expect("test template construction should not fail")
};
assert_eq!(score_template(&template_rb), score_template(&template_raw));
}
#[test]
fn test_filter_template_raw_accepts_valid() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
4,
&[20, 20, 20, 20],
b"ACGTACGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
assert!(filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_raw_rejects_low_mapq() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
10, 4,
&[20, 20, 20, 20],
b"ACGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_filter_template_raw_rejects_qc_fail() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED
| crate::sort::bam_fields::flags::QC_FAIL,
30,
4,
&[20, 20, 20, 20],
b"ACGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_non_pf, 1);
}
#[test]
fn test_filter_template_raw_rejects_umi_with_n() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
4,
&[20, 20, 20, 20],
b"ANGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_ns_in_umi, 1);
}
#[test]
fn test_filter_template_raw_rejects_short_umi() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
4,
&[20, 20, 20, 20],
b"AC",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 0,
include_non_pf: false,
min_umi_length: Some(6),
no_umi: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_umi_too_short, 1);
}
#[test]
fn test_filter_template_raw_rejects_unmapped() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::UNMAPPED
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
0,
4,
&[20, 20, 20, 20],
b"ACGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_set_duplicate_flag_on_raw_record() {
use crate::sort::bam_fields;
let raw =
make_raw_bam_for_dedup(b"rea", bam_fields::flags::PAIRED, 30, 4, &[20; 4], b"ACGT");
let mut rec = raw;
let orig_flags = bam_fields::flags(&rec);
assert_eq!(orig_flags & bam_fields::flags::DUPLICATE, 0);
bam_fields::set_flags(&mut rec, orig_flags | bam_fields::flags::DUPLICATE);
assert_ne!(bam_fields::flags(&rec) & bam_fields::flags::DUPLICATE, 0);
let flags_with_dup = bam_fields::flags(&rec);
bam_fields::set_flags(&mut rec, flags_with_dup & !bam_fields::flags::DUPLICATE);
assert_eq!(bam_fields::flags(&rec) & bam_fields::flags::DUPLICATE, 0);
}
#[test]
fn test_processed_dedup_group_memory_estimate() {
let template = create_test_template("test", &[30, 30, 30, 30]);
let mut templates = Vec::with_capacity(10);
templates.push(template);
let batch = ProcessedDedupGroup {
templates,
family_sizes: AHashMap::new(),
dedup_metrics: DedupMetrics::default(),
input_record_count: 1,
};
let estimate = batch.estimate_heap_size();
let vec_overhead = 10 * std::mem::size_of::<Template>();
assert!(
estimate >= vec_overhead,
"estimate {estimate} should include Vec<Template> overhead {vec_overhead}"
);
}
#[test]
fn test_filter_template_accepts_missing_umi_when_no_umi_mode() {
let mut config = default_filter_config();
config.no_umi = true;
let mut metrics = FilterMetrics::new();
let record = RecordBuilder::new()
.name("q1")
.sequence("ACGT")
.qualities(&[30, 30, 30, 30])
.reference_sequence_id(0)
.alignment_start(100)
.cigar("4M")
.mapping_quality(30)
.flags(Flags::SEGMENTED | Flags::FIRST_SEGMENT)
.build();
let template = Template::from_records(vec![record])
.expect("test template construction should not fail");
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_accepts_umi_with_n_when_no_umi_mode() {
let mut config = default_filter_config();
config.no_umi = true;
let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACNTACGT", 30);
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_accepts_short_umi_when_no_umi_mode() {
let mut config = default_filter_config();
config.min_umi_length = Some(8);
config.no_umi = true;
let mut metrics = FilterMetrics::new();
let template = create_mapped_template_with_umi("q1", "ACGT", 30);
assert!(filter_template(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
fn make_raw_bam_for_dedup_no_tags(
name: &[u8],
flag: u16,
mapq: u8,
seq_len: usize,
qualities: &[u8],
) -> Vec<u8> {
use crate::sort::bam_fields;
let l_read_name = (name.len() + 1) as u8;
let cigar_ops: &[u32] =
if (flag & bam_fields::flags::UNMAPPED) == 0 { &[(seq_len as u32) << 4] } else { &[] };
let n_cigar_op = cigar_ops.len() as u16;
let seq_bytes = seq_len.div_ceil(2);
let total = 32 + l_read_name as usize + cigar_ops.len() * 4 + seq_bytes + seq_len;
let mut buf = vec![0u8; total];
buf[0..4].copy_from_slice(&0i32.to_le_bytes());
buf[4..8].copy_from_slice(&100i32.to_le_bytes());
buf[8] = l_read_name;
buf[9] = mapq;
buf[12..14].copy_from_slice(&n_cigar_op.to_le_bytes());
buf[14..16].copy_from_slice(&flag.to_le_bytes());
buf[16..20].copy_from_slice(&(seq_len as u32).to_le_bytes());
buf[20..24].copy_from_slice(&(-1i32).to_le_bytes());
buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
let name_start = 32;
buf[name_start..name_start + name.len()].copy_from_slice(name);
buf[name_start + name.len()] = 0;
let cigar_start = name_start + l_read_name as usize;
for (i, &op) in cigar_ops.iter().enumerate() {
let off = cigar_start + i * 4;
buf[off..off + 4].copy_from_slice(&op.to_le_bytes());
}
let qo = bam_fields::qual_offset(&buf);
let copy_len = std::cmp::min(qualities.len(), seq_len);
buf[qo..qo + copy_len].copy_from_slice(&qualities[..copy_len]);
buf
}
#[test]
fn test_filter_template_raw_accepts_missing_umi_when_no_umi_mode() {
let raw = make_raw_bam_for_dedup_no_tags(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
4,
&[20, 20, 20, 20],
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: true,
};
let mut metrics = FilterMetrics::new();
assert!(filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_raw_accepts_umi_with_n_when_no_umi_mode() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
4,
&[20, 20, 20, 20],
b"ANGT",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: true,
};
let mut metrics = FilterMetrics::new();
assert!(filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_filter_template_raw_accepts_short_umi_when_no_umi_mode() {
let raw = make_raw_bam_for_dedup(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
4,
&[20, 20, 20, 20],
b"AC",
);
let template = Template::from_raw_records(vec![raw])
.expect("test template construction should not fail");
let config = DedupFilterConfig {
umi_tag: *SamTag::RX,
min_mapq: 0,
include_non_pf: false,
min_umi_length: Some(6),
no_umi: true,
};
let mut metrics = FilterMetrics::new();
assert!(filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_dedup_no_umi_large_position_group() {
use noodles::bam;
use noodles::sam::Header;
use noodles::sam::alignment::io::Write as AlignmentWrite;
use noodles::sam::alignment::record::data::field::Tag;
use noodles::sam::alignment::record_buf::RecordBuf;
use noodles::sam::header::record::value::map::Map as HeaderRecordMap;
use noodles::sam::header::record::value::map::header::tag::Tag as HeaderTag;
use noodles::sam::header::record::value::{
Map, map::Header as HeaderRecord, map::ReferenceSequence,
};
use std::collections::HashSet;
use std::num::NonZeroUsize;
const NUM_TEMPLATES: usize = 5_000;
let temp_dir = tempfile::TempDir::new().unwrap();
let input_bam = temp_dir.path().join("input.bam");
let output_bam = temp_dir.path().join("output.bam");
let header = {
let mut builder = HeaderRecordMap::<HeaderRecord>::builder();
for (tag_bytes, value) in
[(*b"SO", "unsorted"), (*b"GO", "query"), (*b"SS", "template-coordinate")]
{
let HeaderTag::Other(tag) = HeaderTag::from(tag_bytes) else { unreachable!() };
builder = builder.insert(tag, value);
}
Header::builder()
.set_header(builder.build().expect("valid header map"))
.add_reference_sequence(
BString::from("chr1"),
Map::<ReferenceSequence>::new(
NonZeroUsize::new(10_000).expect("non-zero length"),
),
)
.build()
};
{
let mut writer =
bam::io::Writer::new(std::fs::File::create(&input_bam).expect("create input BAM"));
writer.write_header(&header).expect("write header");
for i in 0..NUM_TEMPLATES {
let name = format!("read_{i:05}");
let r1 = RecordBuilder::new()
.name(&name)
.sequence("ACGTACGT")
.qualities(&[30; 8])
.paired(true)
.first_segment(true)
.properly_paired(true)
.reference_sequence_id(0)
.alignment_start(100)
.mapping_quality(60)
.cigar("8M")
.mate_reference_sequence_id(0)
.mate_alignment_start(200)
.template_length(108)
.tag("MC", "8M")
.build();
let r2 = RecordBuilder::new()
.name(&name)
.sequence("ACGTACGT")
.qualities(&[30; 8])
.paired(true)
.first_segment(false)
.properly_paired(true)
.reverse_complement(true)
.reference_sequence_id(0)
.alignment_start(200)
.mapping_quality(60)
.cigar("8M")
.mate_reference_sequence_id(0)
.mate_alignment_start(100)
.template_length(-108)
.tag("MC", "8M")
.build();
writer.write_alignment_record(&header, &r1).expect("write R1");
writer.write_alignment_record(&header, &r2).expect("write R2");
}
writer.try_finish().expect("finish BAM");
}
let cmd = MarkDuplicates::try_parse_from([
"dedup",
"--input",
input_bam.to_str().unwrap(),
"--output",
output_bam.to_str().unwrap(),
"--no-umi",
"--compression-level",
"1",
])
.expect("failed to parse dedup args");
cmd.execute("test").expect("dedup --no-umi should succeed with large position group");
assert!(output_bam.exists(), "output BAM should be created");
let mut reader =
bam::io::Reader::new(std::fs::File::open(&output_bam).expect("open output BAM"));
let out_header = reader.read_header().expect("read output header");
let mut total_records = 0usize;
let mut duplicate_records = 0usize;
let mut non_duplicate_records = 0usize;
let mut mi_values = HashSet::new();
let mut mi_count = 0usize;
let mut non_dup_names = HashSet::new();
let mi_tag = Tag::from(SamTag::MI);
for result in reader.record_bufs(&out_header) {
let record: RecordBuf = result.expect("read record");
total_records += 1;
let is_dup = record.flags().is_duplicate();
if is_dup {
duplicate_records += 1;
} else {
non_duplicate_records += 1;
if let Some(name) = record.name() {
non_dup_names.insert(name.to_owned());
}
}
if let Some(DataValue::String(mi)) = record.data().get(&mi_tag) {
mi_values.insert(mi.to_owned());
mi_count += 1;
}
}
assert_eq!(total_records, NUM_TEMPLATES * 2, "all records should be present in output");
assert_eq!(
non_duplicate_records, 2,
"exactly one pair should be non-duplicate (the best-scoring template)"
);
assert_eq!(
duplicate_records,
(NUM_TEMPLATES - 1) * 2,
"all other pairs should be marked as duplicates"
);
assert_eq!(non_dup_names.len(), 1, "non-duplicate records should be from one template");
assert_eq!(mi_count, NUM_TEMPLATES * 2, "all records should have an MI tag");
assert_eq!(mi_values.len(), 1, "all records should share a single MI tag value");
}
}