use crate::assigner::{PairedUmiAssigner, Strategy, UmiAssigner};
use crate::bam_io::{create_bam_reader_for_pipeline, create_bam_writer, is_stdin_path};
use crate::commands::command::Command;
use crate::commands::common::{
BamIoOptions, CompressionOptions, QueueMemoryOptions, SchedulerOptions, ThreadingOptions,
build_pipeline_config, parse_bool,
};
use crate::grouper::{
FilterMetrics, ProcessedPositionGroup, RawPositionGroup, RecordPositionGrouper,
build_templates_from_records,
};
use crate::logging::{OperationTimer, log_umi_grouping_summary};
use crate::metrics::group::{FamilySizeMetrics, PositionGroupSizeMetrics, UmiGroupingMetrics};
use crate::per_thread_accumulator::PerThreadAccumulator;
use crate::progress::ProgressTracker;
use crate::read_info::LibraryIndex;
use crate::sam::{is_sorted, is_template_coordinate_sorted};
use crate::template::Template;
use crate::umi::parallel_assigner::{
ParallelAdjacencyAssigner, ParallelEditAssigner, ParallelIdentityAssigner,
ParallelPairedAssigner,
};
use crate::umi::{UmiValidation, validate_umi};
use crate::unified_pipeline::DecodedRecord;
use crate::unified_pipeline::compute_group_key_from_raw;
use crate::unified_pipeline::{GroupKeyConfig, Grouper, run_bam_pipeline_from_reader};
use ahash::AHashMap;
use anyhow::{Context, Result, bail};
use clap::Parser;
use fgoxide::io::DelimFile;
use crate::sam::SamTag;
#[cfg(feature = "memory-debug")]
use crate::unified_pipeline::MemoryEstimate;
use crate::validation::validate_file_exists;
use fgumi_raw_bam::{RawBamReader, RawRecord};
use log::{info, warn};
use noodles::sam::Header;
use noodles::sam::alignment::record::data::field::Tag;
use noodles::sam::header::record::value::map::header::sort_order::QUERY_NAME;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[cfg(feature = "memory-debug")]
fn estimate_templates_heap_size(templates: &[Template]) -> usize {
if templates.len() <= 10 {
templates.iter().map(|t| t.estimate_heap_size()).sum()
} else {
let sample_size = 5;
let sample_total: usize =
templates.iter().take(sample_size).map(|t| t.estimate_heap_size()).sum();
(sample_total * templates.len()) / sample_size
}
}
#[derive(Default, Debug)]
struct GroupMetricsAccumulator {
family_sizes: AHashMap<usize, u64>,
position_group_sizes: AHashMap<usize, u64>,
filter_metrics: FilterMetrics,
}
impl GroupMetricsAccumulator {
fn record_group(&mut self, family_sizes: AHashMap<usize, u64>, filter_metrics: &FilterMetrics) {
let position_group_size: u64 = family_sizes.values().sum();
for (size, count) in family_sizes {
*self.family_sizes.entry(size).or_insert(0) += count;
}
if position_group_size > 0 {
*self.position_group_sizes.entry(position_group_size as usize).or_insert(0) += 1;
}
self.filter_metrics.merge(filter_metrics);
}
}
#[derive(Clone)]
struct GroupFilterConfig {
umi_tag: [u8; 2],
min_mapq: u8,
include_non_pf: bool,
min_umi_length: Option<usize>,
no_umi: bool,
allow_unmapped: bool,
}
fn filter_template_raw(
template: &Template,
config: &GroupFilterConfig,
metrics: &mut FilterMetrics,
) -> bool {
use crate::sort::bam_fields;
use fgumi_raw_bam::RawRecordView;
let raw_r1 = template.r1().filter(|r| r.len() >= bam_fields::MIN_BAM_RECORD_LEN);
let raw_r2 = template.r2().filter(|r| r.len() >= bam_fields::MIN_BAM_RECORD_LEN);
let num_primary_reads = raw_r1.is_some() as u64 + raw_r2.is_some() as u64;
metrics.total_templates += num_primary_reads;
if raw_r1.is_none() && raw_r2.is_none() {
metrics.discarded_poor_alignment += num_primary_reads;
return false;
}
let both_unmapped = raw_r1
.is_none_or(|r| (RawRecordView::new(r).flags() & bam_fields::flags::UNMAPPED) != 0)
&& raw_r2
.is_none_or(|r| (RawRecordView::new(r).flags() & bam_fields::flags::UNMAPPED) != 0);
if both_unmapped && !config.allow_unmapped {
metrics.discarded_poor_alignment += num_primary_reads;
return false;
}
for raw in [raw_r1, raw_r2].into_iter().flatten() {
let flg = RawRecordView::new(raw).flags();
if !config.include_non_pf && (flg & bam_fields::flags::QC_FAIL) != 0 {
metrics.discarded_non_pf += num_primary_reads;
return false;
}
if (flg & bam_fields::flags::UNMAPPED) == 0 && bam_fields::mapq(raw) < config.min_mapq {
metrics.discarded_poor_alignment += num_primary_reads;
return false;
}
}
for raw in [raw_r1, raw_r2].into_iter().flatten() {
let flg = RawRecordView::new(raw).flags();
let aux = bam_fields::aux_data_slice(raw);
let check_mq = (flg & bam_fields::flags::MATE_UNMAPPED) == 0;
let check_umi = !config.no_umi;
let mut found_mq: Option<i64> = None;
let mut found_umi: Option<&[u8]> = None;
let mut p = 0;
while p + 3 <= aux.len() {
let t = [aux[p], aux[p + 1]];
let val_type = aux[p + 2];
if check_umi && t == config.umi_tag && val_type == b'Z' {
let start = p + 3;
if let Some(end) = aux[start..].iter().position(|&b| b == 0) {
found_umi = Some(&aux[start..start + end]);
p = start + end + 1;
} else {
break;
}
if !check_mq || found_mq.is_some() {
break;
}
continue;
}
if check_mq && t == *b"MQ" {
found_mq = match val_type {
b'C' if p + 3 < aux.len() => Some(i64::from(aux[p + 3])),
b'c' if p + 3 < aux.len() => Some(i64::from(aux[p + 3] as i8)),
b'S' if p + 5 <= aux.len() => {
Some(i64::from(u16::from_le_bytes([aux[p + 3], aux[p + 4]])))
}
b's' if p + 5 <= aux.len() => {
Some(i64::from(i16::from_le_bytes([aux[p + 3], aux[p + 4]])))
}
b'I' if p + 7 <= aux.len() => Some(i64::from(u32::from_le_bytes([
aux[p + 3],
aux[p + 4],
aux[p + 5],
aux[p + 6],
]))),
b'i' if p + 7 <= aux.len() => Some(i64::from(i32::from_le_bytes([
aux[p + 3],
aux[p + 4],
aux[p + 5],
aux[p + 6],
]))),
_ => None,
};
}
if let Some(size) = bam_fields::tag_value_size(val_type, &aux[p + 3..]) {
p += 3 + size;
} else {
break;
}
if (!check_umi || found_umi.is_some()) && (!check_mq || found_mq.is_some()) {
break;
}
}
if check_mq {
if let Some(mq) = found_mq {
if (mq as u8) < config.min_mapq {
metrics.discarded_poor_alignment += num_primary_reads;
return false;
}
}
}
if config.no_umi {
continue;
}
if let Some(umi_bytes) = found_umi {
match validate_umi(umi_bytes) {
UmiValidation::ContainsN => {
metrics.discarded_ns_in_umi += num_primary_reads;
return false;
}
UmiValidation::Valid(base_count) => {
if let Some(min_len) = config.min_umi_length {
if base_count < min_len {
metrics.discarded_umi_too_short += num_primary_reads;
return false;
}
}
}
}
} else {
metrics.discarded_poor_alignment += num_primary_reads;
return false;
}
}
metrics.accepted_templates += num_primary_reads;
true
}
fn is_r1_genomically_earlier_raw(r1: &[u8], r2: &[u8]) -> bool {
use crate::sort::bam_fields;
let ref1 = bam_fields::ref_id(r1);
let ref2 = bam_fields::ref_id(r2);
if ref1 != ref2 {
return ref1 < ref2;
}
let r1_pos = bam_fields::unclipped_5prime_from_raw_bam(r1);
let r2_pos = bam_fields::unclipped_5prime_from_raw_bam(r2);
r1_pos <= r2_pos
}
fn get_pair_orientation_raw(template: &Template) -> (bool, bool) {
use crate::sort::bam_fields;
use fgumi_raw_bam::RawRecordView;
let r1_positive = template
.r1()
.is_none_or(|r| (RawRecordView::new(r).flags() & bam_fields::flags::REVERSE) == 0);
let r2_positive = template
.r2()
.is_none_or(|r| (RawRecordView::new(r).flags() & bam_fields::flags::REVERSE) == 0);
(r1_positive, r2_positive)
}
fn umi_for_read_impl(umi: &str, is_r1_earlier: bool, assigner: &dyn UmiAssigner) -> Result<String> {
if assigner.split_templates_by_pair_orientation() {
if umi.bytes().all(|b| !b.is_ascii_lowercase()) {
Ok(umi.to_owned())
} else {
Ok(umi.to_uppercase())
}
} else {
let parts: Vec<&str> = umi.split('-').collect();
if parts.len() != 2 {
bail!(
"Paired strategy used but UMI did not contain 2 segments delimited by '-': {umi}"
);
}
if assigner.as_any().downcast_ref::<ParallelPairedAssigner>().is_some() {
return Ok(umi.to_uppercase());
}
let Some(paired) = assigner.as_any().downcast_ref::<PairedUmiAssigner>() else {
bail!("Expected PairedUmiAssigner or ParallelPairedAssigner")
};
let result = if is_r1_earlier {
format!(
"{}:{}-{}:{}",
paired.lower_read_umi_prefix(),
parts[0],
paired.higher_read_umi_prefix(),
parts[1]
)
} else {
format!(
"{}:{}-{}:{}",
paired.higher_read_umi_prefix(),
parts[0],
paired.lower_read_umi_prefix(),
parts[1]
)
};
Ok(result)
}
}
fn truncate_umis_impl(umis: Vec<String>, min_umi_length: Option<usize>) -> Result<Vec<String>> {
match min_umi_length {
None => Ok(umis),
Some(min_len) => {
let min_length = umis.iter().map(String::len).min().unwrap_or(0);
if min_length < min_len {
bail!("UMI found that had shorter length than expected ({min_length} < {min_len})");
}
Ok(umis.into_iter().map(|u| u[..min_len].to_string()).collect())
}
}
}
#[cfg(test)]
fn get_pair_orientation_impl(template: &Template) -> (bool, bool) {
get_pair_orientation_raw(template)
}
fn assign_umi_groups_impl(
templates: &mut [Template],
assigner: &dyn UmiAssigner,
raw_tag: [u8; 2],
min_umi_length: Option<usize>,
no_umi: bool,
) -> Result<()> {
if assigner.split_templates_by_pair_orientation() {
let mut subgroups: AHashMap<(bool, bool), Vec<usize>> = AHashMap::new();
for (idx, template) in templates.iter().enumerate() {
let orientation = get_pair_orientation_raw(template);
subgroups.entry(orientation).or_default().push(idx);
}
for indices in subgroups.values() {
assign_umi_groups_for_indices_impl(
templates,
indices,
assigner,
raw_tag,
min_umi_length,
no_umi,
)?;
}
} else {
let all_indices: Vec<usize> = (0..templates.len()).collect();
assign_umi_groups_for_indices_impl(
templates,
&all_indices,
assigner,
raw_tag,
min_umi_length,
no_umi,
)?;
}
Ok(())
}
fn assign_umi_groups_for_indices_impl(
templates: &mut [Template],
indices: &[usize],
assigner: &dyn UmiAssigner,
raw_tag: [u8; 2],
min_umi_length: Option<usize>,
no_umi: bool,
) -> Result<()> {
if indices.is_empty() {
return Ok(());
}
let mut umis = Vec::with_capacity(indices.len());
for &idx in indices {
let template = &templates[idx];
let processed_umi = if no_umi {
String::new()
} else {
use crate::sort::bam_fields;
let umi_bytes = if let Some(r1_raw) = template.r1() {
let aux = bam_fields::aux_data_slice(r1_raw);
bam_fields::find_string_tag(aux, &raw_tag)
.ok_or_else(|| anyhow::anyhow!("Missing UMI tag"))?
} else if let Some(r2_raw) = template.r2() {
let aux = bam_fields::aux_data_slice(r2_raw);
bam_fields::find_string_tag(aux, &raw_tag)
.ok_or_else(|| anyhow::anyhow!("Missing UMI tag"))?
} else {
bail!("Template has no reads");
};
let umi_s = std::str::from_utf8(umi_bytes)
.map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UMI: {e}"))?;
let is_r1_earlier = if let (Some(r1), Some(r2)) = (template.r1(), template.r2()) {
is_r1_genomically_earlier_raw(r1, r2)
} else {
true
};
umi_for_read_impl(umi_s, is_r1_earlier, assigner)?
};
umis.push(processed_umi);
}
let truncated_umis = if no_umi { umis } else { truncate_umis_impl(umis, min_umi_length)? };
let assignments = assigner.assign(&truncated_umis);
for (i, &idx) in indices.iter().enumerate() {
let template = &mut templates[idx];
template.mi = assignments[i];
}
Ok(())
}
#[derive(Debug, Parser)]
#[command(
about = "\x1b[38;5;151m[GROUP]\x1b[0m \x1b[36mGroup reads by UMI to identify reads from the same original molecule\x1b[0m",
long_about = r#"
Groups reads together that appear to have come from the same original molecule. Reads
are grouped by template, and then templates are sorted by the 5' mapping positions of
the reads from the template, used from earliest mapping position to latest. Reads that
have the same end positions are then sub-grouped by UMI sequence.
Accepts reads in any order (including unsorted) and outputs reads sorted by:
1. The lower genome coordinate of the two outer ends of the templates (strand-aware)
2. The sequencing library
3. The cell barcode (CB tag, if present)
4. The assigned UMI tag
5. Read Name
It is recommended to sort the reads into template-coordinate order prior to running
this tool to avoid re-sorting the input. Use `fgumi sort --order template-coordinate`
for the pre-sorting. The output will always be written
in template-coordinate order.
During grouping, reads and templates are filtered out as follows:
1. Templates are filtered if all reads for the template are unmapped
2. Templates are filtered if any non-secondary, non-supplementary read has mapping quality < min-map-q
3. Templates are filtered if any UMI sequence contains one or more N bases
4. Templates are filtered if --min-umi-length is specified and the UMI does not meet the length requirement
5. Records are filtered out if flagged as either secondary or supplementary
Grouping of UMIs is performed by one of four strategies:
1. **identity**: only reads with identical UMI sequences are grouped together. This strategy
may be useful for evaluating data, but should generally be avoided as it will
generate multiple UMI groups per original molecule in the presence of errors.
2. **edit**: reads are clustered into groups such that each read within a group has at least
one other read in the group with <= edits differences and there are inter-group
pairings with <= edits differences. Effective when there are small numbers of
reads per UMI, but breaks down at very high coverage of UMIs.
3. **adjacency**: a version of the directed adjacency method described in umi_tools
(http://dx.doi.org/10.1101/051755) that allows for errors between UMIs but
only when there is a count gradient.
4. **paired**: similar to adjacency but for methods that produce templates such that a read with
A-B is related to but not identical to a read with B-A. Expects the UMI sequences
to be stored in a single SAM tag separated by a hyphen (e.g. ACGT-CCGG) and allows
for one of the two UMIs to be absent (e.g. ACGT- or -ACGT). The molecular IDs
produced have more structure than for single UMI strategies and are of the form
{base}/{A|B}. E.g. two UMI pairs would be mapped as follows:
AAAA-GGGG -> 1/A, GGGG-AAAA -> 1/B.
Strategies edit, adjacency, and paired make use of the --edits parameter to control the matching of
non-identical UMIs.
By default, all UMIs must be the same length. If --min-umi-length=len is specified then reads that
have a UMI shorter than len will be discarded, and when comparing UMIs of different lengths, the first
len bases will be compared, where len is the length of the shortest UMI. The UMI length is the number
of [ACGT] bases in the UMI (i.e. does not count dashes and other non-ACGT characters). This option is
not implemented for reads with UMI pairs (i.e. using the paired assigner).
Note: the --min-map-q parameter defaults to 0 in duplicate marking mode and 1 otherwise, and is
directly settable on the command line.
# Cell Barcodes
If the input data contains cell barcodes (e.g. from single-cell sequencing), reads at the same
genomic position are partitioned by cell barcode before UMI grouping. This ensures that reads from
different cells are never grouped together, even if they share a UMI sequence and mapping position.
The cell barcode is read from the standard `CB` tag. No correction or
error-handling is performed on cell barcodes; they must be corrected upstream.
Multi-threaded operation is supported via --threads N which spawns exactly N threads.
Threads are allocated based on the command's workload profile to optimize performance.
Example: --threads 8 spawns exactly 8 threads (2 reader, 4 workers, 2 writer)
"#
)]
pub struct GroupReadsByUmi {
#[command(flatten)]
pub io: BamIoOptions,
#[arg(short = 'f', long = "family-size-histogram")]
pub family_size_histogram: Option<PathBuf>,
#[arg(short = 'g', long = "grouping-metrics")]
pub grouping_metrics: Option<PathBuf>,
#[arg(short = 'M', long = "metrics")]
pub metrics: Option<PathBuf>,
#[arg(short = 'm', long = "min-map-q")]
pub min_map_q: Option<u8>,
#[arg(short = 'n', long = "include-non-pf-reads", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub include_non_pf_reads: bool,
#[arg(long = "allow-unmapped", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub allow_unmapped: bool,
#[arg(short = 's', long = "strategy", value_enum)]
pub strategy: Strategy,
#[arg(short = 'e', long = "edits", default_value = "1")]
pub edits: u32,
#[arg(short = 'l', long = "min-umi-length")]
pub min_umi_length: Option<usize>,
#[command(flatten)]
pub threading: ThreadingOptions,
#[command(flatten)]
pub compression: CompressionOptions,
#[arg(long = "index-threshold", default_value = "100")]
pub index_threshold: usize,
#[arg(long = "no-umi", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub no_umi: bool,
#[command(flatten)]
pub scheduler_opts: SchedulerOptions,
#[command(flatten)]
pub queue_memory: QueueMemoryOptions,
#[cfg(feature = "memory-debug")]
#[arg(long, default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub debug_memory: bool,
#[cfg(feature = "memory-debug")]
#[arg(long, default_value = "1", value_parser = clap::value_parser!(u64).range(1..))]
pub memory_report_interval: u64,
}
fn build_grouping_metrics(
filter_metrics: &FilterMetrics,
family_size_counter: &AHashMap<usize, u64>,
) -> UmiGroupingMetrics {
let mut metrics = UmiGroupingMetrics::new();
metrics.total_records = filter_metrics.total_templates;
metrics.accepted_records = filter_metrics.accepted_templates;
metrics.discarded_non_pf = filter_metrics.discarded_non_pf;
metrics.discarded_poor_alignment = filter_metrics.discarded_poor_alignment;
metrics.discarded_ns_in_umi = filter_metrics.discarded_ns_in_umi;
metrics.discarded_umi_too_short = filter_metrics.discarded_umi_too_short;
metrics.total_families = family_size_counter.values().sum::<u64>();
metrics.unique_molecule_ids = metrics.total_families;
if metrics.unique_molecule_ids > 0 {
metrics.avg_reads_per_molecule =
metrics.accepted_records as f64 / metrics.unique_molecule_ids as f64;
}
if !family_size_counter.is_empty() {
let mut sizes: Vec<(usize, u64)> =
family_size_counter.iter().map(|(&size, &count)| (size, count)).collect();
sizes.sort_by_key(|(size, _)| *size);
let total_families = metrics.total_families;
let mut cumulative = 0u64;
let median_target = total_families / 2;
for (size, count) in &sizes {
cumulative += count;
if cumulative >= median_target {
metrics.median_reads_per_molecule = *size as u64;
break;
}
}
if let Some((min_size, _)) = sizes.first() {
metrics.min_reads_per_molecule = *min_size as u64;
}
if let Some((max_size, _)) = sizes.last() {
metrics.max_reads_per_molecule = *max_size as u64;
}
}
metrics
}
impl Command for GroupReadsByUmi {
#[allow(clippy::too_many_lines)]
fn execute(&self, command_line: &str) -> Result<()> {
if self.min_umi_length.is_some() && matches!(self.strategy, Strategy::Paired) {
bail!("Paired strategy cannot be used with --min-umi-length");
}
if self.no_umi && matches!(self.strategy, Strategy::Paired) {
bail!("--no-umi cannot be used with --strategy paired");
}
let (effective_strategy, no_umi_edits_override) = if self.no_umi {
if !matches!(self.strategy, Strategy::Identity) {
info!("--no-umi mode: overriding strategy to identity");
}
(Strategy::Identity, true)
} else {
(self.strategy, false)
};
if !is_stdin_path(&self.io.input) {
validate_file_exists(&self.io.input, "input BAM file")?;
}
let min_mapq: u8 = self.min_map_q.unwrap_or(1);
let effective_edits =
if no_umi_edits_override || matches!(effective_strategy, Strategy::Identity) {
0
} else {
self.edits
};
let timer = OperationTimer::new("Grouping reads by UMI");
info!("Starting group");
info!("Input: {}", self.io.input.display());
info!("Output: {}", self.io.output.display());
info!("Strategy: {effective_strategy:?}");
info!("Edits: {effective_edits}");
if self.no_umi {
info!("No-UMI mode: grouping by position only");
}
if matches!(effective_strategy, Strategy::Adjacency | Strategy::Paired) {
info!("Index threshold: {}", self.index_threshold);
}
if self.allow_unmapped {
info!("Allow unmapped: enabled (unmapped templates will be grouped by UMI only)");
warn!(
"WARNING: All unmapped reads are placed in a single position group. \
Reads with identical/similar UMIs will be grouped together even if they \
originate from different genomic locations."
);
if matches!(self.strategy, Strategy::Edit | Strategy::Adjacency | Strategy::Paired) {
warn!(
"WARNING: For paired UMIs (e.g., ACGT-TGCA), edit distance is computed \
on the concatenated sequence with dashes removed. With --edits {}, \
only {} mismatch(es) allowed across ALL bases.",
self.edits, self.edits
);
}
}
info!("{}", self.threading.log_message());
info!("Reading input BAM");
let (reader, header) = create_bam_reader_for_pipeline(&self.io.input)?;
let is_tc_sorted = is_template_coordinate_sorted(&header);
let is_qname_sorted = is_sorted(&header, QUERY_NAME);
if !(is_tc_sorted || self.allow_unmapped && is_qname_sorted) {
if self.allow_unmapped {
bail!(
"Input BAM must be template-coordinate sorted or queryname sorted \
when --allow-unmapped is enabled.\n\n\
To queryname sort your BAM file, run:\n \
samtools sort -n input.bam -o sorted.bam"
);
} else {
bail!(
"Input BAM must be template-coordinate sorted.\n\n\
To sort your BAM file, run:\n \
fgumi sort -i input.bam -o sorted.bam --order template-coordinate"
);
}
}
if is_tc_sorted {
info!("Input is template-coordinate sorted");
} else {
info!("Input is queryname sorted (accepted with --allow-unmapped)");
info!("All unmapped reads will form a single position group per library/cell");
}
let header = crate::commands::common::add_pg_record(header, command_line)?;
let raw_tag: [u8; 2] = *SamTag::RX;
let cell_tag = Tag::from(SamTag::CB);
let assign_tag_bytes: [u8; 2] = *SamTag::MI;
let filter_config = GroupFilterConfig {
umi_tag: raw_tag,
min_mapq,
include_non_pf: self.include_non_pf_reads,
min_umi_length: self.min_umi_length,
no_umi: self.no_umi,
allow_unmapped: self.allow_unmapped,
};
if self.threading.threads.is_none() {
return self.execute_single_threaded(
reader,
&header,
effective_strategy,
effective_edits,
raw_tag,
assign_tag_bytes,
cell_tag,
&filter_config,
&timer,
);
}
let num_threads = self.threading.num_threads();
let accumulators = PerThreadAccumulator::<GroupMetricsAccumulator>::new(num_threads);
let strategy = effective_strategy;
let index_threshold = self.index_threshold;
let no_umi = self.no_umi;
let allow_unmapped = self.allow_unmapped;
let accumulators_clone = Arc::clone(&accumulators);
#[cfg(feature = "memory-debug")]
let debug_memory_flag = self.debug_memory;
#[cfg(feature = "memory-debug")]
let (memory_monitor_handle, shared_stats) = if self.debug_memory {
use crate::unified_pipeline::{PipelineStats, start_memory_monitor};
use std::sync::atomic::AtomicBool;
info!("Memory debugging enabled - reporting every {}s", self.memory_report_interval);
let stats = Arc::new(PipelineStats::new());
let shutdown_signal = Arc::new(AtomicBool::new(false));
let shutdown_signal_clone = shutdown_signal.clone();
let handle = start_memory_monitor(
stats.clone(),
shutdown_signal_clone,
self.memory_report_interval,
);
(Some((handle, shutdown_signal)), Some(stats))
} else {
(None, None)
};
#[cfg(not(feature = "memory-debug"))]
let shared_stats: Option<Arc<crate::unified_pipeline::PipelineStats>> = None;
#[cfg(feature = "memory-debug")]
let stats_for_tracking = shared_stats.clone();
#[cfg(feature = "memory-debug")]
let stats_for_serialize = shared_stats.clone();
let mut pipeline_config = build_pipeline_config(
&self.scheduler_opts,
&self.compression,
&self.queue_memory,
num_threads,
)?;
if let Some(stats) = shared_stats.as_ref() {
pipeline_config.pipeline = pipeline_config.pipeline.with_shared_stats(stats.clone());
}
info!("Scheduler: {:?}", self.scheduler_opts.strategy());
let library_index = LibraryIndex::from_header(&header);
pipeline_config.group_key_config = Some(GroupKeyConfig::new(library_index, cell_tag));
#[cfg(feature = "memory-debug")]
let short_circuit = std::env::var("FGUMI_SHORT_CIRCUIT").unwrap_or_default();
#[cfg(feature = "memory-debug")]
if !short_circuit.is_empty() {
match short_circuit.as_str() {
"process" | "serialize" | "compress" => {
log::warn!(
"SHORT-CIRCUIT mode: pipeline truncated at '{}' — OUTPUT WILL BE INVALID",
short_circuit
);
}
other => {
bail!(
"Invalid FGUMI_SHORT_CIRCUIT value '{}'. Valid: process, serialize, compress",
other
);
}
}
}
#[cfg(feature = "memory-debug")]
let short_circuit_process = short_circuit == "process";
#[cfg(feature = "memory-debug")]
let short_circuit_serialize = short_circuit == "serialize";
#[cfg(feature = "memory-debug")]
let short_circuit_compress = short_circuit == "compress";
let next_mi_base = std::sync::atomic::AtomicU64::new(0);
let records_processed = run_bam_pipeline_from_reader(
pipeline_config,
reader,
header,
&self.io.output,
None, move |_header: &Header| {
Box::new(RecordPositionGrouper::new())
as Box<dyn Grouper<Group = RawPositionGroup> + Send>
},
move |group: RawPositionGroup| -> std::io::Result<ProcessedPositionGroup> {
#[cfg(feature = "memory-debug")]
if short_circuit_process {
let input_record_count = group.records.len() as u64;
drop(group);
return Ok(ProcessedPositionGroup {
templates: Vec::new(),
family_sizes: AHashMap::new(),
filter_metrics: FilterMetrics::new(),
input_record_count,
distinct_mi_count: 0,
});
}
let mut filter_metrics = FilterMetrics::new();
#[cfg(feature = "memory-debug")]
let initial_group_size = if debug_memory_flag {
let size = group.estimate_heap_size();
if let Some(stats) = stats_for_tracking.as_ref() {
use crate::unified_pipeline::get_or_assign_thread_id;
let thread_id = get_or_assign_thread_id();
let record_count = group.records.len();
stats.track_position_group_memory(size, true);
if record_count > 200 {
let group_size_gb = size as f64 / 1e9;
log::debug!("Processing large position group: {:.2}GB ({} records) on thread {}",
group_size_gb, record_count, thread_id);
}
}
size
} else {
0
};
let all_templates = build_templates_from_records(group.records)?;
let input_record_count: u64 =
all_templates.iter().map(|t| t.read_count() as u64).sum();
let filtered_templates: Vec<Template> = all_templates
.into_iter()
.filter(|t| filter_template_raw(t, &filter_config, &mut filter_metrics))
.collect();
#[cfg(feature = "memory-debug")]
let _template_memory_size = if debug_memory_flag && !filtered_templates.is_empty() {
let estimated_size = estimate_templates_heap_size(&filtered_templates);
if let Some(stats) = stats_for_tracking.as_ref() {
let thread_id = crate::unified_pipeline::get_or_assign_thread_id();
stats.track_template_memory(estimated_size, true);
if filtered_templates.len() > 50 {
let estimated_total_mb = estimated_size as f64 / 1e6;
if estimated_total_mb > 10.0 {
log::debug!("Filtered templates: ~{:.1}MB ({} templates) on thread {}",
estimated_total_mb, filtered_templates.len(), thread_id);
}
}
}
estimated_size
} else {
0
};
if filtered_templates.is_empty() {
#[cfg(feature = "memory-debug")]
if debug_memory_flag {
if let Some(stats) = stats_for_tracking.as_ref() {
stats.track_position_group_memory(initial_group_size, false);
}
}
return Ok(ProcessedPositionGroup {
templates: Vec::new(),
family_sizes: AHashMap::new(),
filter_metrics,
input_record_count,
distinct_mi_count: 0,
});
}
let assigner: Box<dyn UmiAssigner> = if allow_unmapped {
match strategy {
Strategy::Identity => {
Box::new(ParallelIdentityAssigner::new(num_threads))
}
Strategy::Edit => {
Box::new(ParallelEditAssigner::new(effective_edits, num_threads))
}
Strategy::Adjacency => {
Box::new(ParallelAdjacencyAssigner::new(effective_edits, num_threads))
}
Strategy::Paired => {
Box::new(ParallelPairedAssigner::new(effective_edits, num_threads))
}
}
} else {
strategy.new_assigner_full(effective_edits, 1, index_threshold)
};
let mut templates = filtered_templates;
if let Err(e) = assign_umi_groups_impl(
&mut templates,
assigner.as_ref(),
raw_tag,
filter_config.min_umi_length,
no_umi,
) {
log::warn!("UMI assignment failed, returning empty group: {e}");
#[cfg(feature = "memory-debug")]
if debug_memory_flag {
if let Some(stats) = stats_for_tracking.as_ref() {
stats.track_position_group_memory(initial_group_size, false);
stats.track_template_memory(_template_memory_size, false);
}
}
return Ok(ProcessedPositionGroup {
templates: Vec::new(),
family_sizes: AHashMap::new(),
filter_metrics,
input_record_count,
distinct_mi_count: 0,
});
}
let distinct_mi_count: u64 = templates
.iter()
.filter_map(|t| t.mi.id())
.max()
.map(|max_id| max_id + 1)
.unwrap_or(0);
templates.sort_by(|a, b| {
let a_idx = a.mi.to_vec_index();
let b_idx = b.mi.to_vec_index();
a_idx.cmp(&b_idx).then_with(|| a.name.cmp(&b.name))
});
let mut family_sizes: AHashMap<usize, u64> = AHashMap::with_capacity(50);
if !templates.is_empty() {
let mut current_mi = templates[0].mi.to_vec_index();
let mut current_count = 1usize;
for template in templates.iter().skip(1) {
let mi = template.mi.to_vec_index();
if mi == current_mi {
current_count += 1;
} else {
if current_mi.is_some() {
*family_sizes.entry(current_count).or_insert(0) += 1;
}
current_mi = mi;
current_count = 1;
}
}
if current_mi.is_some() {
*family_sizes.entry(current_count).or_insert(0) += 1;
}
}
#[cfg(feature = "memory-debug")]
if debug_memory_flag {
if let Some(stats) = stats_for_tracking.as_ref() {
stats.track_position_group_memory(initial_group_size, false);
}
}
Ok(ProcessedPositionGroup {
templates,
family_sizes,
filter_metrics,
input_record_count,
distinct_mi_count,
})
},
move |processed: ProcessedPositionGroup,
_header: &Header,
output: &mut Vec<u8>|
-> std::io::Result<u64> {
#[cfg(feature = "memory-debug")]
if short_circuit_serialize {
let count = processed.input_record_count;
if debug_memory_flag {
if let Some(stats) = stats_for_serialize.as_ref() {
let tmpl_size = estimate_templates_heap_size(&processed.templates);
stats.track_template_memory(tmpl_size, false);
}
}
drop(processed);
return Ok(count);
}
accumulators_clone.with_slot(|acc| {
acc.record_group(processed.family_sizes, &processed.filter_metrics);
});
let input_record_count = processed.input_record_count;
let base_mi = next_mi_base.fetch_add(
processed.distinct_mi_count,
std::sync::atomic::Ordering::Relaxed,
);
#[cfg(feature = "memory-debug")]
if debug_memory_flag {
if let Some(stats) = stats_for_serialize.as_ref() {
let tmpl_size = estimate_templates_heap_size(&processed.templates);
stats.track_template_memory(tmpl_size, false);
}
}
output.reserve(processed.templates.len() * 2 * 400);
let mut scratch = Vec::with_capacity(512);
let mut mi_buf = String::with_capacity(16);
emit_templates_raw_with_mi(
&processed.templates,
base_mi,
assign_tag_bytes,
&mut scratch,
&mut mi_buf,
|bytes| {
output.extend_from_slice(bytes);
Ok(())
},
)?;
#[cfg(feature = "memory-debug")]
if short_circuit_compress {
output.clear();
}
Ok(input_record_count)
},
)
.context("Pipeline execution failed")?;
#[cfg(feature = "memory-debug")]
if let Some((handle, shutdown_signal)) = memory_monitor_handle {
use crate::unified_pipeline::log_comprehensive_memory_stats;
shutdown_signal.store(true, std::sync::atomic::Ordering::Relaxed);
let _: Result<(), _> = handle.join();
if let Some(stats) = shared_stats.as_ref() {
log_comprehensive_memory_stats(stats);
info!("Memory monitoring stopped - final stats logged above");
}
}
info!("Wrote output to {}", self.io.output.display());
let mut family_size_counter: AHashMap<usize, u64> = AHashMap::with_capacity(50);
let mut position_group_size_counter: AHashMap<usize, u64> = AHashMap::with_capacity(50);
let mut total_filter_metrics = FilterMetrics::new();
for slot in accumulators.slots() {
let acc = slot.lock();
for (&size, &count) in &acc.family_sizes {
*family_size_counter.entry(size).or_insert(0) += count;
}
for (&size, &count) in &acc.position_group_sizes {
*position_group_size_counter.entry(size).or_insert(0) += count;
}
total_filter_metrics.merge(&acc.filter_metrics);
}
let metrics = build_grouping_metrics(&total_filter_metrics, &family_size_counter);
log_umi_grouping_summary(&metrics);
self.write_all_metrics(&metrics, &family_size_counter, &position_group_size_counter)?;
timer.log_completion(metrics.accepted_records);
info!("group completed successfully");
info!("Records processed by pipeline: {records_processed}");
Ok(())
}
}
impl GroupReadsByUmi {
#[allow(clippy::too_many_arguments)]
fn execute_single_threaded(
&self,
reader: Box<dyn std::io::Read + Send>,
header: &Header,
effective_strategy: Strategy,
effective_edits: u32,
raw_tag: [u8; 2],
assign_tag_bytes: [u8; 2],
cell_tag: Tag,
filter_config: &GroupFilterConfig,
timer: &OperationTimer,
) -> Result<()> {
info!("Using single-threaded mode");
let buf_reader = std::io::BufReader::new(reader);
let mut noodles_reader = noodles::bam::io::Reader::new(buf_reader);
let _ = noodles_reader.read_header().context("Failed to skip BAM header")?;
let mut raw_reader = RawBamReader::new(noodles_reader.into_inner());
let mut writer =
create_bam_writer(&self.io.output, header, 1, self.compression.compression_level)?;
let library_index = LibraryIndex::from_header(header);
let mut grouper = RecordPositionGrouper::new();
let mut total_filter_metrics = FilterMetrics::new();
let mut family_size_counter: AHashMap<usize, u64> = AHashMap::with_capacity(50);
let mut position_group_size_counter: AHashMap<usize, u64> = AHashMap::with_capacity(50);
let mut next_mi_base: u64 = 0;
let progress = ProgressTracker::new("Processed records").with_interval(1_000_000);
let mut raw_rec = RawRecord::new();
loop {
let bytes_read = raw_reader.read_record(&mut raw_rec)?;
if bytes_read == 0 {
break; }
let key = compute_group_key_from_raw(&raw_rec, &library_index, Some(cell_tag));
let decoded = DecodedRecord::from_raw_bytes(raw_rec.clone(), key);
if let Some(group) = grouper.add_record(decoded)? {
Self::process_and_write_position_group(
group,
filter_config,
effective_strategy,
effective_edits,
self.index_threshold,
1, raw_tag,
assign_tag_bytes,
&mut total_filter_metrics,
&mut family_size_counter,
&mut position_group_size_counter,
&mut next_mi_base,
header,
&mut writer,
)?;
}
progress.log_if_needed(1);
}
if let Some(final_group) = grouper.finish()? {
Self::process_and_write_position_group(
final_group,
filter_config,
effective_strategy,
effective_edits,
self.index_threshold,
1, raw_tag,
assign_tag_bytes,
&mut total_filter_metrics,
&mut family_size_counter,
&mut position_group_size_counter,
&mut next_mi_base,
header,
&mut writer,
)?;
}
progress.log_final();
writer.into_inner().finish().context("Failed to finish output BAM")?;
info!("Wrote output to {}", self.io.output.display());
let metrics = build_grouping_metrics(&total_filter_metrics, &family_size_counter);
log_umi_grouping_summary(&metrics);
self.write_all_metrics(&metrics, &family_size_counter, &position_group_size_counter)?;
timer.log_completion(metrics.accepted_records);
info!("group completed successfully");
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn process_and_write_position_group(
group: RawPositionGroup,
filter_config: &GroupFilterConfig,
strategy: Strategy,
effective_edits: u32,
index_threshold: usize,
threads: usize,
raw_tag: [u8; 2],
assign_tag_bytes: [u8; 2],
total_filter_metrics: &mut FilterMetrics,
family_size_counter: &mut AHashMap<usize, u64>,
position_group_size_counter: &mut AHashMap<usize, u64>,
next_mi_base: &mut u64,
_header: &Header,
writer: &mut crate::bam_io::BamWriter,
) -> Result<()> {
let all_templates = build_templates_from_records(group.records)?;
let mut filter_metrics = FilterMetrics::new();
let filtered_templates: Vec<Template> = all_templates
.into_iter()
.filter(|t| filter_template_raw(t, filter_config, &mut filter_metrics))
.collect();
total_filter_metrics.merge(&filter_metrics);
if filtered_templates.is_empty() {
return Ok(());
}
let assigner: Box<dyn UmiAssigner> = if filter_config.allow_unmapped {
match strategy {
Strategy::Identity => Box::new(ParallelIdentityAssigner::new(threads)),
Strategy::Edit => Box::new(ParallelEditAssigner::new(effective_edits, threads)),
Strategy::Adjacency => {
Box::new(ParallelAdjacencyAssigner::new(effective_edits, threads))
}
Strategy::Paired => Box::new(ParallelPairedAssigner::new(effective_edits, threads)),
}
} else {
strategy.new_assigner_full(effective_edits, 1, index_threshold)
};
let mut templates = filtered_templates;
if let Err(e) = assign_umi_groups_impl(
&mut templates,
assigner.as_ref(),
raw_tag,
filter_config.min_umi_length,
filter_config.no_umi,
) {
log::warn!("Failed to assign UMI groups: {e}");
return Ok(());
}
templates.sort_by(|a, b| {
let a_idx = a.mi.to_vec_index();
let b_idx = b.mi.to_vec_index();
a_idx.cmp(&b_idx).then_with(|| a.name.cmp(&b.name))
});
if !templates.is_empty() {
let mut current_mi = templates[0].mi.to_vec_index();
let mut current_count = 1usize;
let mut num_families = 0usize;
for template in templates.iter().skip(1) {
let mi = template.mi.to_vec_index();
if mi == current_mi {
current_count += 1;
} else {
if current_mi.is_some() {
*family_size_counter.entry(current_count).or_insert(0) += 1;
num_families += 1;
}
current_mi = mi;
current_count = 1;
}
}
if current_mi.is_some() {
*family_size_counter.entry(current_count).or_insert(0) += 1;
num_families += 1;
}
if num_families > 0 {
*position_group_size_counter.entry(num_families).or_insert(0) += 1;
}
}
let distinct_mi_count: u64 =
templates.iter().filter_map(|t| t.mi.id()).max().map(|max_id| max_id + 1).unwrap_or(0);
let base_mi = *next_mi_base;
*next_mi_base += distinct_mi_count;
use std::io::Write as _;
let mut scratch = Vec::with_capacity(512);
let mut mi_buf = String::with_capacity(16);
let inner = writer.get_mut();
emit_templates_raw_with_mi(
&templates,
base_mi,
assign_tag_bytes,
&mut scratch,
&mut mi_buf,
|bytes| inner.write_all(bytes),
)?;
Ok(())
}
fn write_all_metrics(
&self,
grouping_metrics: &UmiGroupingMetrics,
family_sizes: &AHashMap<usize, u64>,
position_group_sizes: &AHashMap<usize, u64>,
) -> Result<()> {
let family_size_metrics =
FamilySizeMetrics::from_size_counts(family_sizes.iter().map(|(&s, &c)| (s, c)));
let position_group_size_metrics = PositionGroupSizeMetrics::from_size_counts(
position_group_sizes.iter().map(|(&s, &c)| (s, c)),
);
if let Some(path) = &self.family_size_histogram {
write_metrics(path, &family_size_metrics, "family size histogram")?;
}
if let Some(path) = &self.grouping_metrics {
write_metrics(path, std::slice::from_ref(grouping_metrics), "grouping metrics")?;
}
if let Some(prefix) = &self.metrics {
let family_path = with_extension(prefix, "family_sizes.txt");
write_metrics(&family_path, &family_size_metrics, "family size histogram")?;
let grouping_path = with_extension(prefix, "grouping_metrics.txt");
write_metrics(
&grouping_path,
std::slice::from_ref(grouping_metrics),
"grouping metrics",
)?;
let position_path = with_extension(prefix, "position_group_sizes.txt");
write_metrics(
&position_path,
&position_group_size_metrics,
"position group size histogram",
)?;
}
Ok(())
}
}
#[allow(clippy::cast_possible_truncation)]
fn emit_templates_raw_with_mi(
templates: &[Template],
base_mi: u64,
assign_tag_bytes: [u8; 2],
scratch: &mut Vec<u8>,
mi_buf: &mut String,
mut emit: impl FnMut(&[u8]) -> std::io::Result<()>,
) -> std::io::Result<()> {
use crate::sort::bam_fields;
for template in templates {
let mi = template.mi;
let has_mi = mi.is_assigned();
if has_mi {
mi.write_with_offset(base_mi, mi_buf);
}
for raw in [template.r1(), template.r2()].into_iter().flatten() {
if has_mi {
scratch.clear();
scratch.extend_from_slice(raw);
bam_fields::update_string_tag(scratch, &assign_tag_bytes, mi_buf.as_bytes());
let block_size = scratch.len() as u32;
emit(&block_size.to_le_bytes())?;
emit(scratch)?;
} else {
let block_size = raw.len() as u32;
emit(&block_size.to_le_bytes())?;
emit(raw)?;
}
}
}
Ok(())
}
fn write_metrics<S: serde::Serialize>(
path: &Path,
data: impl IntoIterator<Item = S>,
label: &str,
) -> Result<()> {
DelimFile::default()
.write_tsv(path, data)
.with_context(|| format!("Failed to write {label}: {}", path.display()))?;
info!("Wrote {label} to {}", path.display());
Ok(())
}
fn with_extension(prefix: &Path, suffix: &str) -> PathBuf {
let mut s = prefix.as_os_str().to_owned();
s.push(".");
s.push(suffix);
PathBuf::from(s)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::assigner::{IdentityUmiAssigner, PairedUmiAssigner, Strategy};
use bstr::BString;
use fgumi_raw_bam::{
SamBuilder as RawSamBuilder, flags, raw_record_to_record_buf, testutil::encode_op,
};
use noodles::bam;
use noodles::sam;
use noodles::sam::alignment::io::Write as AlignmentWrite;
use rstest::rstest;
use std::num::NonZeroUsize;
use tempfile::{NamedTempFile, TempDir};
struct TestPaths {
#[allow(dead_code)]
dir: TempDir,
pub output: PathBuf,
pub histogram: PathBuf,
pub grouping_metrics: PathBuf,
pub metrics_prefix: PathBuf,
}
impl TestPaths {
fn new() -> Result<Self> {
let dir = TempDir::new()?;
Ok(Self {
output: dir.path().join("output.bam"),
histogram: dir.path().join("histogram.txt"),
grouping_metrics: dir.path().join("grouping_metrics.txt"),
metrics_prefix: dir.path().join("metrics"),
dir,
})
}
}
fn test_group_cmd(strategy: Strategy, edits: u32) -> GroupReadsByUmi {
GroupReadsByUmi {
io: BamIoOptions {
input: std::path::PathBuf::from("/dev/null"),
output: std::path::PathBuf::from("/dev/null"),
async_reader: false,
},
family_size_histogram: None,
grouping_metrics: None,
metrics: None,
min_map_q: None,
include_non_pf_reads: false,
strategy,
edits,
min_umi_length: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
index_threshold: 100,
no_umi: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions {
queue_memory: "768".to_string(),
queue_memory_per_thread: true,
queue_memory_limit_mb: None,
},
allow_unmapped: false,
#[cfg(feature = "memory-debug")]
debug_memory: false,
#[cfg(feature = "memory-debug")]
memory_report_interval: 1,
}
}
fn create_test_header() -> sam::Header {
use noodles::sam::header::record::value::{Map, map::ReferenceSequence};
use noodles::sam::header::record::value::{
Map as HeaderRecordMap,
map::{Header as HeaderRecord, Tag as HeaderTag},
};
let mut builder = sam::Header::builder();
let HeaderTag::Other(so_tag) = HeaderTag::from([b'S', b'O']) else { unreachable!() };
let HeaderTag::Other(go_tag) = HeaderTag::from([b'G', b'O']) else { unreachable!() };
let HeaderTag::Other(ss_tag) = HeaderTag::from([b'S', b'S']) else { unreachable!() };
let map = HeaderRecordMap::<HeaderRecord>::builder()
.insert(so_tag, "unsorted")
.insert(go_tag, "query")
.insert(ss_tag, "template-coordinate")
.build()
.expect("valid header record");
builder = builder.set_header(map);
builder = builder.add_reference_sequence(
BString::from("chr1"),
Map::<ReferenceSequence>::new(
NonZeroUsize::new(248_956_422).expect("non-zero chr1 length"),
),
);
builder = builder.add_reference_sequence(
BString::from("chr2"),
Map::<ReferenceSequence>::new(
NonZeroUsize::new(242_193_529).expect("non-zero chr2 length"),
),
);
builder.build()
}
fn set_reverse(rec: &mut fgumi_raw_bam::RawRecord, reverse: bool) {
let v = rec.as_mut_vec();
let flags = u16::from_le_bytes([v[14], v[15]]);
let new_flags = if reverse { flags | 0x10 } else { flags & !0x10 };
let bytes = new_flags.to_le_bytes();
v[14] = bytes[0];
v[15] = bytes[1];
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn append_str_tag(rec: &mut fgumi_raw_bam::RawRecord, tag: &[u8; 2], value: &[u8]) {
fgumi_raw_bam::RawTagsEditor::from_vec(rec.as_mut_vec()).append_string(tag, value);
}
#[allow(clippy::cast_sign_loss)]
fn build_test_read(
name: &str,
ref_id: usize,
pos: i32,
mapq: u8,
raw_flags: u16,
umi: &str,
) -> fgumi_raw_bam::RawRecord {
let seq = b"ACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT";
let cigar = encode_op(0, 100); let quals = vec![30u8; 100];
let mut b = RawSamBuilder::new();
b.read_name(name.as_bytes())
.flags(raw_flags)
.ref_id(ref_id as i32)
.pos(pos - 1)
.mapq(mapq)
.cigar_ops(&[cigar])
.sequence(seq)
.qualities(&quals);
b.add_string_tag(b"RX", umi.as_bytes());
b.build()
}
#[allow(clippy::cast_sign_loss)]
fn build_test_pair(
name: &str,
ref_id: usize,
pos1: i32,
pos2: i32,
mapq1: u8,
mapq2: u8,
umi: &str,
) -> (fgumi_raw_bam::RawRecord, fgumi_raw_bam::RawRecord) {
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let mut b1 = RawSamBuilder::new();
b1.read_name(name.as_bytes())
.flags(flags::PAIRED | flags::FIRST_SEGMENT | flags::MATE_REVERSE)
.ref_id(ref_id as i32)
.pos(pos1 - 1)
.mapq(mapq1)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(ref_id as i32)
.mate_pos(pos2 - 1);
b1.add_string_tag(b"RX", umi.as_bytes());
b1.add_string_tag(b"MC", b"100M");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(name.as_bytes())
.flags(flags::PAIRED | flags::LAST_SEGMENT | flags::REVERSE)
.ref_id(ref_id as i32)
.pos(pos2 - 1)
.mapq(mapq2)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(ref_id as i32)
.mate_pos(pos1 - 1);
b2.add_string_tag(b"RX", umi.as_bytes());
b2.add_string_tag(b"MC", b"100M");
let r2 = b2.build();
(r1, r2)
}
#[allow(clippy::cast_sign_loss)]
fn build_test_pair_mapped_with_unmapped_mate(
name: &str,
ref_id: usize,
pos: i32,
mapq: u8,
umi: &str,
) -> (fgumi_raw_bam::RawRecord, fgumi_raw_bam::RawRecord) {
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let mut b1 = RawSamBuilder::new();
b1.read_name(name.as_bytes())
.flags(flags::PAIRED | flags::FIRST_SEGMENT | flags::MATE_UNMAPPED)
.ref_id(ref_id as i32)
.pos(pos - 1)
.mapq(mapq)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals);
b1.add_string_tag(b"RX", umi.as_bytes());
b1.add_int_tag(b"MQ", 0i32); let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(name.as_bytes())
.flags(flags::PAIRED | flags::LAST_SEGMENT | flags::UNMAPPED)
.ref_id(ref_id as i32)
.pos(pos - 1)
.mapq(0)
.sequence(&seq)
.qualities(&quals);
b2.add_string_tag(b"RX", umi.as_bytes());
b2.add_int_tag(b"MQ", i32::from(mapq)); let r2 = b2.build();
(r1, r2)
}
fn create_test_bam(records: Vec<fgumi_raw_bam::RawRecord>) -> Result<NamedTempFile> {
let temp_file = NamedTempFile::new()?;
let header = create_test_header();
let mut writer = bam::io::writer::Builder.build_from_path(temp_file.path())?;
writer.write_header(&header)?;
for record in &records {
let record_buf =
raw_record_to_record_buf(record, &header).map_err(std::io::Error::other)?;
writer.write_alignment_record(&header, &record_buf)?;
}
drop(writer);
Ok(temp_file)
}
fn read_bam_records(path: &std::path::Path) -> Result<Vec<sam::alignment::RecordBuf>> {
let mut reader = bam::io::reader::Builder.build_from_path(path)?;
let header = reader.read_header()?;
let mut records = Vec::new();
for result in reader.records() {
let record = result?;
let record_buf =
sam::alignment::RecordBuf::try_from_alignment_record(&header, &record)?;
records.push(record_buf);
}
Ok(records)
}
fn get_mi_tags(records: &[sam::alignment::RecordBuf]) -> Vec<String> {
use sam::alignment::record::data::field::Tag;
let mi_tag = Tag::from([b'M', b'I']);
records
.iter()
.filter_map(|r| {
r.data().get(&mi_tag).and_then(|v| {
if let sam::alignment::record_buf::data::field::Value::String(s) = v {
Some(String::from_utf8_lossy(s).to_string())
} else {
None
}
})
})
.collect()
}
fn count_unique_mi_tags(records: &[sam::alignment::RecordBuf]) -> usize {
use std::collections::HashSet;
let tags: HashSet<_> = get_mi_tags(records).into_iter().collect();
tags.len()
}
fn get_string_tag(record: &sam::alignment::RecordBuf, tag_name: &str) -> Option<String> {
use sam::alignment::record::data::field::Tag;
let tag_bytes = tag_name.as_bytes();
let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
record.data().get(&tag).and_then(|v| {
if let sam::alignment::record_buf::data::field::Value::String(s) = v {
Some(String::from_utf8_lossy(s).to_string())
} else {
None
}
})
}
#[test]
fn test_umi_for_read_assigns_ab_prefixes_by_coordinates() {
let assigner: Box<dyn UmiAssigner> = Box::new(PairedUmiAssigner::new(1));
let umi1 = "AAA-TTT";
let result1 = umi_for_read_impl(umi1, true, assigner.as_ref()).expect("Should succeed");
assert!(result1.contains("AAA"));
assert!(result1.contains("TTT"));
assert!(result1.contains('-'));
let result2 = umi_for_read_impl(umi1, false, assigner.as_ref()).expect("Should succeed");
assert_ne!(result1, result2, "Prefixes should differ based on which read is earlier");
}
#[test]
fn test_umi_for_read_handles_absent_umi_ends() {
let assigner: Box<dyn UmiAssigner> = Box::new(PairedUmiAssigner::new(1));
let result_left = umi_for_read_impl("-TTT", true, assigner.as_ref())
.expect("Should handle absent left UMI");
assert!(result_left.contains('-') && result_left.contains("TTT"));
let result_right = umi_for_read_impl("AAA-", true, assigner.as_ref())
.expect("Should handle absent right UMI");
assert!(result_right.contains("AAA") && result_right.contains('-'));
}
#[test]
fn test_umi_for_read_uppercase_for_non_paired() {
let assigner: Box<dyn UmiAssigner> = Box::new(IdentityUmiAssigner::new());
let result =
umi_for_read_impl("acgtacgt", true, assigner.as_ref()).expect("Should succeed");
assert_eq!(result, "ACGTACGT");
}
#[test]
fn test_truncate_umis_to_minimum_length() {
let tool =
GroupReadsByUmi { min_umi_length: Some(5), ..test_group_cmd(Strategy::Identity, 0) };
let umis = vec!["AAAAAA".to_string(), "AAAAA".to_string(), "AAAAAAA".to_string()];
let truncated =
truncate_umis_impl(umis, tool.min_umi_length).expect("Should truncate successfully");
assert_eq!(truncated.len(), 3);
assert!(truncated.iter().all(|u| u.len() == 5));
}
#[test]
fn test_truncate_umis_none_returns_unchanged() {
let tool = test_group_cmd(Strategy::Identity, 0);
let umis = vec!["AAAAAA".to_string(), "AAAAA".to_string()];
let original = umis.clone();
let result = truncate_umis_impl(umis, tool.min_umi_length).expect("Should succeed");
assert_eq!(result, original);
}
#[test]
fn test_truncate_umis_fails_when_too_short() {
let tool =
GroupReadsByUmi { min_umi_length: Some(6), ..test_group_cmd(Strategy::Identity, 0) };
let umis = vec!["AAAAAA".to_string(), "AAAA".to_string()];
let result = truncate_umis_impl(umis, tool.min_umi_length);
assert!(result.is_err());
}
#[test]
fn test_groups_reads_correctly_basic() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 60, 60, "AAAAgAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a03", 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a04", 0, 100, 300, 60, 60, "AAAAAAAt");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("c01", 0, 100, 300, 5, 5, "AAAAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
min_map_q: Some(30),
..test_group_cmd(Strategy::Edit, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 8, "Should have 8 records after filtering");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Should have 1 UMI group");
Ok(())
}
#[test]
fn test_filtering_excludes_reads_with_n_in_umi() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 60, 60, "AANAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a03", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
grouping_metrics: Some(paths.grouping_metrics.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 4, "Should have 4 records (2 pairs, a02 filtered)");
let metrics: Vec<UmiGroupingMetrics> =
DelimFile::default().read_tsv(&paths.grouping_metrics)?;
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].discarded_ns_in_umi, 2);
Ok(())
}
#[test]
fn test_filtering_excludes_reads_below_min_mapq() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 10, 10, "AAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
min_map_q: Some(30),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should have 2 records (1 pair, low MAPQ filtered)");
Ok(())
}
#[test]
fn test_correctly_groups_single_end_reads() -> Result<()> {
let records = vec![
build_test_read("a01", 0, 100, 60, 0, "AAAAAAAA"),
build_test_read("a02", 0, 100, 60, 0, "AAAAAAAA"),
build_test_read("a03", 0, 100, 60, 0, "CACACACA"),
build_test_read("a04", 0, 100, 60, 0, "CACACACC"),
build_test_read("a05", 0, 105, 60, 0, "GTAGTAGG"),
build_test_read("a06", 0, 105, 60, 0, "GTAGTAGG"),
build_test_read("a07", 0, 107, 60, 0, "AAAAAAAA"),
build_test_read("a08", 0, 107, 60, 0, "AAAAAAAA"),
];
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 8, "Should have all 8 records");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 4, "Should have 4 UMI groups");
Ok(())
}
#[test]
fn test_outputs_family_size_histogram() -> Result<()> {
let mut records = Vec::new();
for i in 1..=3 {
let (r1, r2) = build_test_pair(&format!("a{i:02}"), 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
for i in 1..=2 {
let (r1, r2) = build_test_pair(&format!("b{i:02}"), 0, 200, 400, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("c01", 0, 300, 500, 60, 60, "GGGGGGGG");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
family_size_histogram: Some(paths.histogram.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
assert!(&paths.histogram.exists(), "Histogram file should exist");
let metrics: Vec<FamilySizeMetrics> = DelimFile::default().read_tsv(&paths.histogram)?;
assert_eq!(metrics.len(), 3);
assert_eq!(
metrics[0],
FamilySizeMetrics {
family_size: 1,
count: 1,
fraction: 0.333_333_333_333_333_3,
fraction_gt_or_eq_family_size: 1.0,
}
);
assert_eq!(
metrics[1],
FamilySizeMetrics {
family_size: 2,
count: 1,
fraction: 0.333_333_333_333_333_3,
fraction_gt_or_eq_family_size: 0.666_666_666_666_666_6,
}
);
assert_eq!(
metrics[2],
FamilySizeMetrics {
family_size: 3,
count: 1,
fraction: 0.333_333_333_333_333_3,
fraction_gt_or_eq_family_size: 0.333_333_333_333_333_3,
}
);
Ok(())
}
fn metrics_prefix_paths(prefix: &Path) -> (PathBuf, PathBuf, PathBuf) {
(
with_extension(prefix, "family_sizes.txt"),
with_extension(prefix, "grouping_metrics.txt"),
with_extension(prefix, "position_group_sizes.txt"),
)
}
#[test]
fn test_metrics_prefix_writes_all_files() -> Result<()> {
let mut records = Vec::new();
for i in 1..=3 {
let (r1, r2) = build_test_pair(&format!("a{i:02}"), 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("a04", 0, 100, 300, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
for i in 1..=2 {
let (r1, r2) = build_test_pair(&format!("b{i:02}"), 0, 200, 400, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("c01", 0, 300, 500, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("c02", 0, 300, 500, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("c03", 0, 300, 500, 60, 60, "GGGGGGGG");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
metrics: Some(paths.metrics_prefix.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let (family_path, grouping_path, position_path) =
metrics_prefix_paths(&paths.metrics_prefix);
assert!(family_path.exists(), "Family sizes file should exist");
assert!(grouping_path.exists(), "Grouping metrics file should exist");
assert!(position_path.exists(), "Position group sizes file should exist");
let family_metrics: Vec<FamilySizeMetrics> = DelimFile::default().read_tsv(&family_path)?;
assert_eq!(family_metrics.len(), 3);
assert_eq!(
family_metrics[0],
FamilySizeMetrics {
family_size: 1,
count: 4,
fraction: 4.0 / 6.0,
fraction_gt_or_eq_family_size: 1.0,
}
);
assert_eq!(
family_metrics[1],
FamilySizeMetrics {
family_size: 2,
count: 1,
fraction: 1.0 / 6.0,
fraction_gt_or_eq_family_size: 2.0 / 6.0,
}
);
assert_eq!(
family_metrics[2],
FamilySizeMetrics {
family_size: 3,
count: 1,
fraction: 1.0 / 6.0,
fraction_gt_or_eq_family_size: 1.0 / 6.0,
}
);
let grouping: Vec<UmiGroupingMetrics> = DelimFile::default().read_tsv(&grouping_path)?;
assert_eq!(grouping.len(), 1);
assert_eq!(grouping[0].total_records, 18);
assert_eq!(grouping[0].accepted_records, 18);
assert_eq!(grouping[0].total_families, 6);
let position_metrics: Vec<PositionGroupSizeMetrics> =
DelimFile::default().read_tsv(&position_path)?;
assert_eq!(position_metrics.len(), 3);
assert_eq!(
position_metrics[0],
PositionGroupSizeMetrics {
position_group_size: 1,
count: 1,
fraction: 1.0 / 3.0,
fraction_gt_or_eq_position_group_size: 1.0,
}
);
assert_eq!(
position_metrics[1],
PositionGroupSizeMetrics {
position_group_size: 2,
count: 1,
fraction: 1.0 / 3.0,
fraction_gt_or_eq_position_group_size: 2.0 / 3.0,
}
);
assert_eq!(
position_metrics[2],
PositionGroupSizeMetrics {
position_group_size: 3,
count: 1,
fraction: 1.0 / 3.0,
fraction_gt_or_eq_position_group_size: 1.0 / 3.0,
}
);
Ok(())
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::pipeline_1(ThreadingOptions::new(1))]
#[case::pipeline_4(ThreadingOptions::new(4))]
fn test_metrics_parity_across_threading_modes(
#[case] threading: ThreadingOptions,
) -> Result<()> {
let mut records = Vec::new();
for i in 1..=3 {
let (r1, r2) = build_test_pair(&format!("a{i:02}"), 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("a04", 0, 100, 300, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
for i in 1..=2 {
let (r1, r2) = build_test_pair(&format!("b{i:02}"), 0, 200, 400, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("c01", 0, 300, 500, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("c02", 0, 300, 500, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("c03", 0, 300, 500, 60, 60, "GGGGGGGG");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("d01", 0, 100, 300, 60, 60, "AANAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions::new(input.path().to_path_buf(), paths.output.clone()),
metrics: Some(paths.metrics_prefix.clone()),
threading,
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let (family_path, grouping_path, position_path) =
metrics_prefix_paths(&paths.metrics_prefix);
let family_metrics: Vec<FamilySizeMetrics> = DelimFile::default().read_tsv(&family_path)?;
assert_eq!(
family_metrics,
vec![
FamilySizeMetrics {
family_size: 1,
count: 4,
fraction: 4.0 / 6.0,
fraction_gt_or_eq_family_size: 1.0,
},
FamilySizeMetrics {
family_size: 2,
count: 1,
fraction: 1.0 / 6.0,
fraction_gt_or_eq_family_size: 2.0 / 6.0,
},
FamilySizeMetrics {
family_size: 3,
count: 1,
fraction: 1.0 / 6.0,
fraction_gt_or_eq_family_size: 1.0 / 6.0,
},
],
);
let grouping: Vec<UmiGroupingMetrics> = DelimFile::default().read_tsv(&grouping_path)?;
assert_eq!(grouping.len(), 1);
assert_eq!(grouping[0].total_records, 20);
assert_eq!(grouping[0].accepted_records, 18);
assert_eq!(grouping[0].total_families, 6);
assert_eq!(grouping[0].discarded_non_pf, 0);
assert_eq!(grouping[0].discarded_poor_alignment, 0);
assert_eq!(grouping[0].discarded_ns_in_umi, 2);
assert_eq!(grouping[0].discarded_umi_too_short, 0);
let position_metrics: Vec<PositionGroupSizeMetrics> =
DelimFile::default().read_tsv(&position_path)?;
assert_eq!(
position_metrics,
vec![
PositionGroupSizeMetrics {
position_group_size: 1,
count: 1,
fraction: 1.0 / 3.0,
fraction_gt_or_eq_position_group_size: 1.0,
},
PositionGroupSizeMetrics {
position_group_size: 2,
count: 1,
fraction: 1.0 / 3.0,
fraction_gt_or_eq_position_group_size: 2.0 / 3.0,
},
PositionGroupSizeMetrics {
position_group_size: 3,
count: 1,
fraction: 1.0 / 3.0,
fraction_gt_or_eq_position_group_size: 1.0 / 3.0,
},
],
);
Ok(())
}
#[test]
fn test_metrics_prefix_and_individual_flags_write_same_content() -> Result<()> {
let mut records = Vec::new();
for i in 1..=3 {
let (r1, r2) = build_test_pair(&format!("a{i:02}"), 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("b01", 0, 200, 400, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
family_size_histogram: Some(paths.histogram.clone()),
grouping_metrics: Some(paths.grouping_metrics.clone()),
metrics: Some(paths.metrics_prefix.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let (prefix_family, prefix_grouping, prefix_position) =
metrics_prefix_paths(&paths.metrics_prefix);
let individual_family: Vec<FamilySizeMetrics> =
DelimFile::default().read_tsv(&paths.histogram)?;
let prefix_family: Vec<FamilySizeMetrics> =
DelimFile::default().read_tsv(&prefix_family)?;
assert_eq!(individual_family, prefix_family);
let individual_grouping: Vec<UmiGroupingMetrics> =
DelimFile::default().read_tsv(&paths.grouping_metrics)?;
let prefix_grouping: Vec<UmiGroupingMetrics> =
DelimFile::default().read_tsv(&prefix_grouping)?;
assert_eq!(individual_grouping.len(), 1);
assert_eq!(prefix_grouping.len(), 1);
assert_eq!(individual_grouping[0].total_records, prefix_grouping[0].total_records);
assert_eq!(individual_grouping[0].accepted_records, prefix_grouping[0].accepted_records);
assert_eq!(individual_grouping[0].total_families, prefix_grouping[0].total_families);
let position: Vec<PositionGroupSizeMetrics> =
DelimFile::default().read_tsv(&prefix_position)?;
assert_eq!(position.len(), 1);
assert_eq!(position[0].position_group_size, 1);
assert_eq!(position[0].count, 2);
assert!((position[0].fraction - 1.0).abs() < f64::EPSILON);
Ok(())
}
#[test]
fn test_metrics_position_group_size_with_multi_read_families() -> Result<()> {
let mut records = Vec::new();
for i in 1..=5 {
let (r1, r2) = build_test_pair(&format!("r{i:02}"), 0, 100, 300, 60, 60, "AAAAAAAA");
records.push(r1);
records.push(r2);
}
for i in 6..=7 {
let (r1, r2) = build_test_pair(&format!("r{i:02}"), 0, 100, 300, 60, 60, "CCCCCCCC");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
metrics: Some(paths.metrics_prefix.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let (family_path, _, position_path) = metrics_prefix_paths(&paths.metrics_prefix);
let family_metrics: Vec<FamilySizeMetrics> = DelimFile::default().read_tsv(&family_path)?;
assert_eq!(family_metrics.len(), 2);
assert_eq!(family_metrics[0].family_size, 2);
assert_eq!(family_metrics[0].count, 1);
assert_eq!(family_metrics[1].family_size, 5);
assert_eq!(family_metrics[1].count, 1);
let position_metrics: Vec<PositionGroupSizeMetrics> =
DelimFile::default().read_tsv(&position_path)?;
assert_eq!(position_metrics.len(), 1);
assert_eq!(position_metrics[0].position_group_size, 2);
assert_eq!(position_metrics[0].count, 1);
assert!((position_metrics[0].fraction - 1.0).abs() < f64::EPSILON);
assert!(
(position_metrics[0].fraction_gt_or_eq_position_group_size - 1.0).abs() < f64::EPSILON
);
Ok(())
}
#[test]
fn test_outputs_grouping_metrics() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 60, 60, "AANAAA");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a03", 0, 100, 300, 5, 5, "AAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
grouping_metrics: Some(paths.grouping_metrics.clone()),
min_map_q: Some(30),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let metrics: Vec<UmiGroupingMetrics> =
DelimFile::default().read_tsv(&paths.grouping_metrics)?;
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].accepted_records, 2);
assert_eq!(metrics[0].discarded_ns_in_umi, 2);
assert_eq!(metrics[0].discarded_poor_alignment, 2);
assert_eq!(metrics[0].discarded_non_pf, 0);
assert_eq!(metrics[0].discarded_umi_too_short, 0);
Ok(())
}
#[test]
fn test_rejects_umis_shorter_than_min_length() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "ACTACT");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 60, 60, "ACTAC");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
grouping_metrics: Some(paths.grouping_metrics.clone()),
min_umi_length: Some(6),
..test_group_cmd(Strategy::Edit, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should only have records with UMI length >= 6");
let metrics: Vec<UmiGroupingMetrics> =
DelimFile::default().read_tsv(&paths.grouping_metrics)?;
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].accepted_records, 2);
assert_eq!(metrics[0].discarded_ns_in_umi, 0);
assert_eq!(metrics[0].discarded_poor_alignment, 0);
assert_eq!(metrics[0].discarded_non_pf, 0);
assert_eq!(metrics[0].discarded_umi_too_short, 2);
Ok(())
}
#[test]
fn test_truncates_to_min_length() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "ACTACT");
records.push(r1);
records.push(r2);
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 60, 60, "ACTAC");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
min_umi_length: Some(5),
..test_group_cmd(Strategy::Edit, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 4, "Should have all 4 records");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Should have 1 group after truncation");
Ok(())
}
#[test]
fn test_cross_contig_read_pairs() -> Result<()> {
let mut records = Vec::new();
for i in 1..=4 {
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let name_a = format!("a{i:02}");
let mut b1 = RawSamBuilder::new();
b1.read_name(name_a.as_bytes())
.flags(flags::PAIRED | flags::FIRST_SEGMENT)
.ref_id(0) .pos(99) .mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(1) .mate_pos(299); b1.add_string_tag(b"RX", b"ACT-ACT");
b1.add_string_tag(b"MC", b"100M");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(name_a.as_bytes())
.flags(flags::PAIRED | flags::LAST_SEGMENT)
.ref_id(1) .pos(299) .mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0) .mate_pos(99); b2.add_string_tag(b"RX", b"ACT-ACT");
b2.add_string_tag(b"MC", b"100M");
let r2 = b2.build();
records.push(r1);
records.push(r2);
}
for i in 1..=4 {
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let name_b = format!("b{i:02}");
let mut b1 = RawSamBuilder::new();
b1.read_name(name_b.as_bytes())
.flags(flags::PAIRED | flags::FIRST_SEGMENT)
.ref_id(1) .pos(299) .mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0) .mate_pos(99); b1.add_string_tag(b"RX", b"ACT-ACT");
b1.add_string_tag(b"MC", b"100M");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(name_b.as_bytes())
.flags(flags::PAIRED | flags::LAST_SEGMENT)
.ref_id(0) .pos(99) .mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(1) .mate_pos(299); b2.add_string_tag(b"RX", b"ACT-ACT");
b2.add_string_tag(b"MC", b"100M");
let r2 = b2.build();
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 16, "Should have all 16 records");
let a_mis: Vec<String> = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with('a')))
.filter_map(|r| get_string_tag(r, "MI"))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let b_mis: Vec<String> = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with('b')))
.filter_map(|r| get_string_tag(r, "MI"))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
assert_eq!(a_mis.len(), 1, "Group A should have 1 unique MI");
assert_eq!(b_mis.len(), 1, "Group B should have 1 unique MI");
let a_prefix = a_mis[0].split('/').next().expect("MI tag should contain '/' separator");
let b_prefix = b_mis[0].split('/').next().expect("MI tag should contain '/' separator");
assert_eq!(a_prefix, b_prefix, "Both groups should have same MI prefix");
assert_ne!(a_mis[0], b_mis[0], "Groups should have different MI suffixes");
Ok(())
}
#[test]
fn test_pair_orientation_splitting() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("f1r2", 0, 100, 300, 60, 60, "ACGT-TTGA");
records.push(r1);
records.push(r2);
let (mut r1, mut r2) = build_test_pair("f2r1", 0, 300, 100, 60, 60, "ACGT-TTGA");
set_reverse(&mut r1, true);
set_reverse(&mut r2, false);
records.push(r1);
records.push(r2);
let (r1, mut r2) = build_test_pair("ff", 0, 100, 300, 60, 60, "ACGT-TTGA");
set_reverse(&mut r2, false);
records.push(r1);
records.push(r2);
let (mut r1, mut r2) = build_test_pair("rr", 0, 1, 201, 60, 60, "ACGT-TTGA");
set_reverse(&mut r1, true);
set_reverse(&mut r2, true);
records.push(r1);
records.push(r2);
let (mut r1, mut r2) = build_test_pair("r1f2", 0, 150, 350, 60, 60, "ACGT-TTGA");
set_reverse(&mut r1, true);
set_reverse(&mut r2, false);
records.push(r1);
records.push(r2);
let (r1, mut r2) = build_test_pair("r2f1", 0, 400, 600, 60, 60, "ACGT-TTGA");
set_reverse(&mut r2, true);
records.push(r1);
records.push(r2);
let mut frag = build_test_read("Frag", 0, 100, 60, 0, "ACGT-TTGA");
set_reverse(&mut frag, true);
records.push(frag);
let frag = build_test_read("fRag", 0, 1, 60, 0, "ACGT-TTGA");
records.push(frag);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_mis = count_unique_mi_tags(&output_records);
assert_eq!(unique_mis, 8, "Should have 8 unique MIs (one per template)");
Ok(())
}
#[test]
fn test_missing_raw_tag() -> Result<()> {
let mut records = Vec::new();
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let mut b1 = RawSamBuilder::new();
b1.read_name(b"a01")
.flags(flags::PAIRED | flags::FIRST_SEGMENT | flags::MATE_REVERSE)
.ref_id(0)
.pos(99)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(299);
b1.add_string_tag(b"MC", b"100M");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(b"a01")
.flags(flags::PAIRED | flags::LAST_SEGMENT | flags::REVERSE)
.ref_id(0)
.pos(299)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(99);
b2.add_string_tag(b"MC", b"100M");
let r2 = b2.build();
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 0, "Should have no output records");
Ok(())
}
#[test]
fn test_no_umi_mode_accepts_missing_umi_tag() -> Result<()> {
let mut records = Vec::new();
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let mut b1 = RawSamBuilder::new();
b1.read_name(b"a01")
.flags(flags::PAIRED | flags::FIRST_SEGMENT | flags::MATE_REVERSE)
.ref_id(0)
.pos(99)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(299);
b1.add_string_tag(b"MC", b"100M");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(b"a01")
.flags(flags::PAIRED | flags::LAST_SEGMENT | flags::REVERSE)
.ref_id(0)
.pos(299)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(99);
b2.add_string_tag(b"MC", b"100M");
let r2 = b2.build();
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let mut cmd = test_group_cmd(Strategy::Identity, 0);
cmd.io = BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
};
cmd.no_umi = true;
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should have 2 output records");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "All records at same position should be in 1 group");
Ok(())
}
#[test]
fn test_no_umi_mode_groups_by_position_only() -> Result<()> {
let mut records = Vec::new();
let (r1a, r2a) = build_test_pair("a01", 0, 100, 300, 60, 60, "AAAAAAAA");
let (r1b, r2b) = build_test_pair("a02", 0, 100, 300, 60, 60, "TTTTTTTT");
let (r1c, r2c) = build_test_pair("a03", 0, 100, 300, 60, 60, "CCCCCCCC");
records.push(r1a);
records.push(r2a);
records.push(r1b);
records.push(r2b);
records.push(r1c);
records.push(r2c);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let mut cmd = test_group_cmd(Strategy::Adjacency, 1);
cmd.io = BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
};
cmd.no_umi = true;
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 6, "Should have 6 output records (3 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(
unique_groups, 1,
"All records at same position should be in 1 group (UMI ignored)"
);
Ok(())
}
#[test]
fn test_no_umi_mode_accepts_umi_with_n() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 60, 60, "ACNTNACGT");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let mut cmd = test_group_cmd(Strategy::Identity, 0);
cmd.io = BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
};
cmd.no_umi = true;
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should have 2 output records");
Ok(())
}
#[test]
fn test_no_umi_mode_rejects_paired_strategy() {
let mut cmd = test_group_cmd(Strategy::Paired, 1);
cmd.no_umi = true;
let result = cmd.execute("test");
assert!(result.is_err(), "Should fail when --no-umi is used with --strategy paired");
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("--no-umi cannot be used with --strategy paired"),
"Error message should mention the conflict"
);
}
#[test]
fn test_cell_barcode_grouping() -> Result<()> {
let mut records = Vec::new();
let mut r1 = build_test_read("a01", 0, 100, 60, 0, "AAAAAAAA");
append_str_tag(&mut r1, b"CB", b"AA");
records.push(r1);
let mut r2 = build_test_read("a02", 0, 100, 60, 0, "AAAAAAAA");
append_str_tag(&mut r2, b"CB", b"AA");
records.push(r2);
let mut r3 = build_test_read("a03", 0, 100, 60, 0, "CACACACA");
append_str_tag(&mut r3, b"CB", b"CA");
records.push(r3);
let mut r4 = build_test_read("a04", 0, 100, 60, 0, "CACACACC");
append_str_tag(&mut r4, b"CB", b"NN");
records.push(r4);
let mut r5 = build_test_read("a05", 0, 105, 60, 0, "GTAGTAGG");
append_str_tag(&mut r5, b"CB", b"GT");
records.push(r5);
let mut r6 = build_test_read("a06", 0, 105, 60, 0, "GTAGTAGG");
append_str_tag(&mut r6, b"CB", b"GT");
records.push(r6);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 6, "Should have all 6 records");
let mut groups: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
for record in &output_records {
let mi = get_string_tag(record, "MI").expect("record should have MI tag");
let name = String::from_utf8_lossy(record.name().expect("record should have a name"))
.to_string();
groups.entry(mi).or_default().push(name);
}
assert_eq!(groups.len(), 4, "Should have 4 unique MI groups");
let group_sets: Vec<std::collections::HashSet<String>> =
groups.values().map(|v| v.iter().cloned().collect()).collect();
assert!(
group_sets.contains(&["a01".to_string(), "a02".to_string()].iter().cloned().collect())
);
assert!(group_sets.contains(&["a03".to_string()].iter().cloned().collect()));
assert!(group_sets.contains(&["a04".to_string()].iter().cloned().collect()));
assert!(
group_sets.contains(&["a05".to_string(), "a06".to_string()].iter().cloned().collect())
);
Ok(())
}
#[test]
fn test_paired_mode_with_absent_umi_on_right() -> Result<()> {
let mut records = Vec::new();
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("a{i:02}"), 0, 100, 300, 60, 60, "ACT-");
records.push(r1);
records.push(r2);
}
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("b{i:02}"), 0, 300, 100, 60, 60, "-ACT");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 16, "Should have all 16 records");
let a_mis: Vec<String> = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with('a')))
.filter_map(|r| get_string_tag(r, "MI"))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let b_mis: Vec<String> = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with('b')))
.filter_map(|r| get_string_tag(r, "MI"))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
assert_eq!(a_mis.len(), 1, "Group A should have 1 unique MI");
assert_eq!(b_mis.len(), 1, "Group B should have 1 unique MI");
assert!(a_mis[0].ends_with("/A"), "Group A should have /A suffix");
assert!(b_mis[0].ends_with("/B"), "Group B should have /B suffix");
Ok(())
}
#[test]
fn test_paired_mode_with_absent_umi_on_left() -> Result<()> {
let mut records = Vec::new();
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("a{i:02}"), 0, 100, 300, 60, 60, "-ACT");
records.push(r1);
records.push(r2);
}
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("b{i:02}"), 0, 300, 100, 60, 60, "ACT-");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 16, "Should have all 16 records");
let a_mis: Vec<String> = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with('a')))
.filter_map(|r| get_string_tag(r, "MI"))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let b_mis: Vec<String> = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with('b')))
.filter_map(|r| get_string_tag(r, "MI"))
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
assert_eq!(a_mis.len(), 1, "Group A should have 1 unique MI");
assert_eq!(b_mis.len(), 1, "Group B should have 1 unique MI");
assert!(a_mis[0].ends_with("/A"), "Group A should have /A suffix");
assert!(b_mis[0].ends_with("/B"), "Group B should have /B suffix");
Ok(())
}
#[test]
fn test_discard_secondary_and_supplementary_reads() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("a01", 0, 100, 300, 100, 100, "AAAAAAAA");
records.push(r1);
records.push(r2);
{
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let mut b1 = RawSamBuilder::new();
b1.read_name(b"a01_sec")
.flags(
flags::PAIRED | flags::FIRST_SEGMENT | flags::MATE_REVERSE | flags::SECONDARY,
)
.ref_id(0)
.pos(99)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(299);
b1.add_string_tag(b"RX", b"AAAAAAAA");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(b"a01_sec")
.flags(flags::PAIRED | flags::LAST_SEGMENT | flags::REVERSE | flags::SECONDARY)
.ref_id(0)
.pos(299)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(99);
b2.add_string_tag(b"RX", b"AAAAAAAA");
let r2 = b2.build();
records.push(r1);
records.push(r2);
}
{
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let mut b1 = RawSamBuilder::new();
b1.read_name(b"a01_sup")
.flags(
flags::PAIRED
| flags::FIRST_SEGMENT
| flags::MATE_REVERSE
| flags::SUPPLEMENTARY,
)
.ref_id(0)
.pos(99)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(299);
b1.add_string_tag(b"RX", b"AAAAAAAA");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(b"a01_sup")
.flags(flags::PAIRED | flags::LAST_SEGMENT | flags::REVERSE | flags::SUPPLEMENTARY)
.ref_id(0)
.pos(299)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(0)
.mate_pos(99);
b2.add_string_tag(b"RX", b"AAAAAAAA");
let r2 = b2.build();
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("a02", 0, 100, 300, 10, 10, "AAAAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(
output_records.len(),
4,
"Should have only 4 records (secondary and supplementary filtered out)"
);
for record in &output_records {
assert!(!record.flags().is_secondary(), "Output should not contain secondary reads");
assert!(
!record.flags().is_supplementary(),
"Output should not contain supplementary reads"
);
}
let a01_count = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with("a01")))
.count();
let a02_count = output_records
.iter()
.filter(|r| r.name().is_some_and(|n| String::from_utf8_lossy(n).starts_with("a02")))
.count();
assert_eq!(a01_count, 2, "Should have 2 reads from a01");
assert_eq!(a02_count, 2, "Should have 2 reads from a02");
Ok(())
}
#[test]
fn test_adjacency_single_thread() -> Result<()> {
let mut records = Vec::new();
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("g1_{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
for i in 1..=2 {
let (r1, r2) = build_test_pair(&format!("g1b_{i}"), 0, 100, 300, 60, 60, "AAAAAT");
records.push(r1);
records.push(r2);
}
for i in 1..=9 {
let (r1, r2) = build_test_pair(&format!("g2_{i}"), 0, 100, 300, 60, 60, "GACGAC");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("g2b_1", 0, 100, 300, 60, 60, "GACGAT");
records.push(r1);
records.push(r2);
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("g2c_{i}"), 0, 100, 300, 60, 60, "GACGCC");
records.push(r1);
records.push(r2);
}
for i in 1..=7 {
let (r1, r2) = build_test_pair(&format!("g3_{i}"), 0, 100, 300, 60, 60, "TACGAC");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Adjacency, 2)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 54, "Should have all 54 records (27 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(
unique_groups, 3,
"Should have 3 unique groups with adjacency strategy and 1 thread"
);
Ok(())
}
#[test]
fn test_adjacency_multi_thread() -> Result<()> {
let mut records = Vec::new();
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("g1_{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
for i in 1..=2 {
let (r1, r2) = build_test_pair(&format!("g1b_{i}"), 0, 100, 300, 60, 60, "AAAAAT");
records.push(r1);
records.push(r2);
}
for i in 1..=9 {
let (r1, r2) = build_test_pair(&format!("g2_{i}"), 0, 100, 300, 60, 60, "GACGAC");
records.push(r1);
records.push(r2);
}
let (r1, r2) = build_test_pair("g2b_1", 0, 100, 300, 60, 60, "GACGAT");
records.push(r1);
records.push(r2);
for i in 1..=4 {
let (r1, r2) = build_test_pair(&format!("g2c_{i}"), 0, 100, 300, 60, 60, "GACGCC");
records.push(r1);
records.push(r2);
}
for i in 1..=7 {
let (r1, r2) = build_test_pair(&format!("g3_{i}"), 0, 100, 300, 60, 60, "TACGAC");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
threading: ThreadingOptions::new(4), ..test_group_cmd(Strategy::Adjacency, 2)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 54, "Should have all 54 records (27 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(
unique_groups, 3,
"Should have 3 unique groups with adjacency strategy and 4 threads (same as single thread)"
);
Ok(())
}
#[test]
fn test_adjacency_deep_tree_single_thread() -> Result<()> {
let mut records = Vec::new();
let umis =
vec![("AAAAAA", 256), ("TAAAAA", 128), ("TTAAAA", 64), ("TTTAAA", 32), ("TTTTAA", 16)];
let mut counter = 1;
for (umi, count) in umis {
for _ in 0..count {
let (r1, r2) = build_test_pair(&format!("q{counter}"), 0, 100, 300, 60, 60, umi);
records.push(r1);
records.push(r2);
counter += 1;
}
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 992, "Should have all records");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Should have 1 group (all UMIs connected in deep tree)");
Ok(())
}
#[test]
fn test_adjacency_deep_tree_multi_thread() -> Result<()> {
let mut records = Vec::new();
let umis =
vec![("AAAAAA", 256), ("TAAAAA", 128), ("TTAAAA", 64), ("TTTAAA", 32), ("TTTTAA", 16)];
let mut counter = 1;
for (umi, count) in umis {
for _ in 0..count {
let (r1, r2) = build_test_pair(&format!("q{counter}"), 0, 100, 300, 60, 60, umi);
records.push(r1);
records.push(r2);
counter += 1;
}
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
threading: ThreadingOptions::new(4), ..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 992, "Should have all records");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(
unique_groups, 1,
"Should have 1 group with multi-threading (same as single thread)"
);
Ok(())
}
#[test]
fn test_paired_assigner_explicit_ab_ba_symmetry() -> Result<()> {
let mut records = Vec::new();
let (r1_ab, r2_ab) = build_test_pair("read_ab_1", 0, 100, 300, 60, 60, "AAAA-TTTT");
records.push(r1_ab);
records.push(r2_ab);
let (r1_ab2, r2_ab2) = build_test_pair("read_ab_2", 0, 100, 300, 60, 60, "AAAA-TTTT");
records.push(r1_ab2);
records.push(r2_ab2);
let (r1_ba, r2_ba) = build_test_pair("read_ba_1", 0, 100, 300, 60, 60, "TTTT-AAAA");
records.push(r1_ba);
records.push(r2_ba);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 6, "Should have all 6 records");
let mi_tags = get_mi_tags(&output_records);
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Should have 2 groups (A-B vs B-A orientation)");
assert!(
mi_tags.iter().all(|t| t.contains("/A") || t.contains("/B")),
"All MI tags should have /A or /B suffix. Tags: {mi_tags:?}"
);
Ok(())
}
#[test]
fn test_edit_strategy_with_exact_edit_distance() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
let (r1_b, r2_b) = build_test_pair("read2", 0, 100, 300, 60, 60, "TAAAAA");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Should group UMIs within edit distance");
Ok(())
}
#[test]
fn test_edit_strategy_outside_edit_distance() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
let (r1_b, r2_b) = build_test_pair("read2", 0, 100, 300, 60, 60, "TTAAAA");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Should keep UMIs outside edit distance separate");
Ok(())
}
#[test]
fn test_identity_strategy_no_grouping() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
let (r1_b, r2_b) = build_test_pair("read2", 0, 100, 300, 60, 60, "AAAAAC");
let (r1_c, r2_c) = build_test_pair("read3", 0, 100, 300, 60, 60, "AAAAAG");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
records.push(r1_c);
records.push(r2_c);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 3, "Identity strategy should not group similar UMIs");
Ok(())
}
#[test]
fn test_adjacency_with_count_gradient() -> Result<()> {
let mut records = Vec::new();
for i in 0..50 {
let (r1, r2) = build_test_pair(&format!("high_{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
let (r1_low, r2_low) = build_test_pair("low_1", 0, 100, 300, 60, 60, "TAAAAA");
records.push(r1_low);
records.push(r2_low);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Low count UMI should be absorbed by high count");
Ok(())
}
#[test]
fn test_adjacency_no_gradient_keeps_separate() -> Result<()> {
let mut records = Vec::new();
for i in 0..10 {
let (r1, r2) = build_test_pair(&format!("umi1_{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
for i in 0..10 {
let (r1, r2) = build_test_pair(&format!("umi2_{i}"), 0, 100, 300, 60, 60, "TAAAAA");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Similar counts should not merge in adjacency");
Ok(())
}
#[test]
fn test_very_long_umis() -> Result<()> {
let mut records = Vec::new();
let long_umi = "ACGTACGTACGTACGTACGT"; let (r1, r2) = build_test_pair("read1", 0, 100, 300, 60, 60, long_umi);
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should process long UMIs correctly");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Should have 1 group");
Ok(())
}
#[test]
fn test_minimum_umi_length_filtering() -> Result<()> {
let mut records = Vec::new();
let (r1_short, r2_short) = build_test_pair("short", 0, 100, 300, 60, 60, "ACGT");
records.push(r1_short);
records.push(r2_short);
let (r1_long, r2_long) = build_test_pair("long", 0, 100, 300, 60, 60, "ACGTACGTAC");
records.push(r1_long);
records.push(r2_long);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
min_umi_length: Some(8), ..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should filter short UMIs");
let rx_tags: Vec<String> =
output_records.iter().filter_map(|r| get_string_tag(r, "RX")).collect();
assert!(rx_tags.iter().all(|umi| umi.len() >= 8), "UMIs should be at least min length");
Ok(())
}
#[test]
fn test_unmapped_templates_filtered() -> Result<()> {
let mut records = Vec::new();
let (r1_mapped, r2_mapped) = build_test_pair("mapped", 0, 200, 400, 60, 60, "TTTTTT");
records.push(r1_mapped);
records.push(r2_mapped);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should process mapped templates");
Ok(())
}
#[test]
fn test_multiple_edit_distances() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
let (r1_b, r2_b) = build_test_pair("read2", 0, 100, 300, 60, 60, "TAAAAA"); let (r1_c, r2_c) = build_test_pair("read3", 0, 100, 300, 60, 60, "TTAAAA");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
records.push(r1_c);
records.push(r2_c);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 2)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "With edits=2, all UMIs should group");
Ok(())
}
#[test]
fn test_paired_umi_missing_both_ends() -> Result<()> {
let r1 = build_test_read("test", 0, 100, 60, 0x41, "-");
let r2 = build_test_read("test", 0, 300, 60, 0x81, "-");
let records = vec![r1, r2];
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 0)
};
let _result = cmd.execute("test");
Ok(())
}
#[test]
fn test_adjacency_with_edits_0() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
let (r1_b, r2_b) = build_test_pair("read2", 0, 100, 300, 60, 60, "AAAAAA"); let (r1_c, r2_c) = build_test_pair("read3", 0, 100, 300, 60, 60, "AAAAAC");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
records.push(r1_c);
records.push(r2_c);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Adjacency, 0) };
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Adjacency with edits=0 should keep different UMIs separate");
Ok(())
}
#[test]
fn test_edit_strategy_with_high_edit_threshold() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
let (r1_b, r2_b) = build_test_pair("read2", 0, 100, 300, 60, 60, "TTAAAA"); let (r1_c, r2_c) = build_test_pair("read3", 0, 100, 300, 60, 60, "TTTAAA");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
records.push(r1_c);
records.push(r2_c);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Edit, 3) };
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Edit strategy with edits=3 should group all UMIs");
Ok(())
}
#[test]
fn test_paired_strategy_single_end_reads() -> Result<()> {
let mut records = Vec::new();
let r1 = build_test_read("frag1", 0, 100, 60, 0x0, "AAAA-TTTT");
let r2 = build_test_read("frag2", 0, 200, 60, 0x0, "AAAA-TTTT");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2);
Ok(())
}
#[test]
fn test_family_size_histogram_generation() -> Result<()> {
let mut records = Vec::new();
for i in 0..5 {
let (r1, r2) = build_test_pair(&format!("read{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
for i in 0..2 {
let (r1, r2) = build_test_pair(&format!("other{i}"), 0, 100, 300, 60, 60, "TTTTTT");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
family_size_histogram: Some(paths.histogram.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
assert!(&paths.histogram.exists());
Ok(())
}
#[test]
fn test_grouping_metrics_generation() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("read1", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
grouping_metrics: Some(paths.grouping_metrics.clone()),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
assert!(&paths.grouping_metrics.exists());
Ok(())
}
#[test]
fn test_min_mapq_filtering() -> Result<()> {
let mut records = Vec::new();
let (r1_high, r2_high) = build_test_pair("high", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1_high);
records.push(r2_high);
let (r1_low, r2_low) = build_test_pair("low", 0, 100, 300, 10, 10, "TTTTTT");
records.push(r1_low);
records.push(r2_low);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
min_map_q: Some(20), ..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should filter low mapq reads");
Ok(())
}
#[test]
fn test_umi_with_only_n_bases() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("readn", 0, 100, 300, 60, 60, "NNNNNN");
records.push(r1);
records.push(r2);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 0, "Should filter UMIs with all N bases");
Ok(())
}
#[test]
fn test_mixed_single_and_paired_end() -> Result<()> {
let mut records = Vec::new();
let (r1_paired, r2_paired) = build_test_pair("paired", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1_paired);
records.push(r2_paired);
let r_single = build_test_read("single", 0, 200, 60, 0x0, "TTTTTT");
records.push(r_single);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 3, "Should handle mixed single and paired end");
Ok(())
}
#[test]
fn test_large_family_grouping() -> Result<()> {
let mut records = Vec::new();
for i in 0..50 {
let (r1, r2) = build_test_pair(&format!("read{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "Large family should group correctly");
assert_eq!(output_records.len(), 100, "Should have all 100 reads");
Ok(())
}
#[test]
fn test_filtering_low_mapq_read_with_unmapped_mate() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("good", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let (r1_bad, r2_bad) =
build_test_pair_mapped_with_unmapped_mate("bad", 0, 200, 10, "CCCCCC");
records.push(r1_bad);
records.push(r2_bad);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
min_map_q: Some(30), ..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(
output_records.len(),
2,
"Should have 2 records (good pair only); bad pair with low MAPQ R1 and unmapped R2 should be filtered"
);
for record in &output_records {
let name = String::from_utf8_lossy(record.name().expect("record should have a name"))
.to_string();
assert_eq!(
name, "good",
"Only 'good' reads should remain, but found read with name: {name}"
);
}
Ok(())
}
#[allow(clippy::cast_sign_loss)]
fn build_test_pair_with_orientation(
name: &str,
ref_id: usize,
pos1: i32,
pos2: i32,
umi: &str,
r1_reverse: bool,
r2_reverse: bool,
) -> (fgumi_raw_bam::RawRecord, fgumi_raw_bam::RawRecord) {
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let cigar = encode_op(0, 100);
let r1_flags = flags::PAIRED
| flags::FIRST_SEGMENT
| (if r1_reverse { flags::REVERSE } else { 0 })
| (if r2_reverse { flags::MATE_REVERSE } else { 0 });
let r2_flags = flags::PAIRED
| flags::LAST_SEGMENT
| (if r2_reverse { flags::REVERSE } else { 0 })
| (if r1_reverse { flags::MATE_REVERSE } else { 0 });
let mut b1 = RawSamBuilder::new();
b1.read_name(name.as_bytes())
.flags(r1_flags)
.ref_id(ref_id as i32)
.pos(pos1 - 1)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(ref_id as i32)
.mate_pos(pos2 - 1);
b1.add_string_tag(b"RX", umi.as_bytes());
b1.add_string_tag(b"MC", b"100M");
let r1 = b1.build();
let mut b2 = RawSamBuilder::new();
b2.read_name(name.as_bytes())
.flags(r2_flags)
.ref_id(ref_id as i32)
.pos(pos2 - 1)
.mapq(60)
.cigar_ops(&[cigar])
.sequence(&seq)
.qualities(&quals)
.mate_ref_id(ref_id as i32)
.mate_pos(pos1 - 1);
b2.add_string_tag(b"RX", umi.as_bytes());
b2.add_string_tag(b"MC", b"100M");
let r2 = b2.build();
(r1, r2)
}
#[test]
fn test_get_pair_orientation_f1r2() -> Result<()> {
let (r1, r2) = build_test_pair_with_orientation("test", 0, 100, 200, "AAAAAA", false, true);
let template = Template::from_records(vec![r1, r2])?;
let orientation = get_pair_orientation_impl(&template);
assert_eq!(orientation, (true, false), "F1R2 should have orientation (true, false)");
Ok(())
}
#[test]
fn test_get_pair_orientation_f2r1() -> Result<()> {
let (r1, r2) = build_test_pair_with_orientation("test", 0, 100, 200, "AAAAAA", true, false);
let template = Template::from_records(vec![r1, r2])?;
let orientation = get_pair_orientation_impl(&template);
assert_eq!(orientation, (false, true), "F2R1 should have orientation (false, true)");
Ok(())
}
#[test]
fn test_get_pair_orientation_forward_tandem() -> Result<()> {
let (r1, r2) =
build_test_pair_with_orientation("test", 0, 100, 200, "AAAAAA", false, false);
let template = Template::from_records(vec![r1, r2])?;
let orientation = get_pair_orientation_impl(&template);
assert_eq!(
orientation,
(true, true),
"Forward tandem should have orientation (true, true)"
);
Ok(())
}
#[test]
fn test_get_pair_orientation_reverse_tandem() -> Result<()> {
let (r1, r2) = build_test_pair_with_orientation("test", 0, 100, 200, "AAAAAA", true, true);
let template = Template::from_records(vec![r1, r2])?;
let orientation = get_pair_orientation_impl(&template);
assert_eq!(
orientation,
(false, false),
"Reverse tandem should have orientation (false, false)"
);
Ok(())
}
#[test]
fn test_identity_assigner_splits_by_pair_orientation() -> Result<()> {
let mut records = Vec::new();
let (r1_f1r2, r2_f1r2) =
build_test_pair_with_orientation("f1r2", 0, 100, 100, "AAAAAA", false, true);
records.push(r1_f1r2);
records.push(r2_f1r2);
let (r1_f2r1, r2_f2r1) =
build_test_pair_with_orientation("f2r1", 0, 100, 100, "AAAAAA", true, false);
records.push(r1_f2r1);
records.push(r2_f2r1);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 4, "Should have 4 records");
let mi_tag = sam::alignment::record::data::field::Tag::from([b'M', b'I']);
let mut mi_by_name: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for record in &output_records {
let name = String::from_utf8_lossy(record.name().expect("record should have a name"))
.to_string();
if let Some(noodles::sam::alignment::record_buf::data::field::Value::String(mi)) =
record.data().get(&mi_tag)
{
mi_by_name.insert(name.clone(), String::from_utf8_lossy(mi).to_string());
}
}
let mi_f1r2 = mi_by_name.get("f1r2").expect("f1r2 should have MI tag");
let mi_f2r1 = mi_by_name.get("f2r1").expect("f2r1 should have MI tag");
assert_ne!(
mi_f1r2, mi_f2r1,
"F1R2 and F2R1 pairs with same UMI should get DIFFERENT MI values with Identity assigner"
);
Ok(())
}
#[test]
fn test_paired_assigner_groups_same_orientation_templates() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) =
build_test_pair_with_orientation("pair_a", 0, 100, 100, "AA-TT", false, true);
records.push(r1_a);
records.push(r2_a);
let (r1_b, r2_b) =
build_test_pair_with_orientation("pair_b", 0, 100, 100, "AA-TT", false, true);
records.push(r1_b);
records.push(r2_b);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
..test_group_cmd(Strategy::Paired, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 4, "Should have 4 records");
let mi_tag = sam::alignment::record::data::field::Tag::from([b'M', b'I']);
let mut mi_by_name: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for record in &output_records {
let name = String::from_utf8_lossy(record.name().expect("record should have a name"))
.to_string();
if let Some(noodles::sam::alignment::record_buf::data::field::Value::String(mi)) =
record.data().get(&mi_tag)
{
mi_by_name.insert(name.clone(), String::from_utf8_lossy(mi).to_string());
}
}
let mi_a = mi_by_name.get("pair_a").expect("pair_a should have MI tag");
let mi_b = mi_by_name.get("pair_b").expect("pair_b should have MI tag");
assert_eq!(
mi_a, mi_b,
"Two F1R2 pairs with same UMI should get SAME MI value with Paired assigner"
);
Ok(())
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::pipeline_1(ThreadingOptions::new(1))]
#[case::pipeline_2(ThreadingOptions::new(2))]
fn test_threading_modes(#[case] threading: ThreadingOptions) -> Result<()> {
let mut records = Vec::new();
for i in 1..=3 {
let (r1, r2) = build_test_pair(&format!("read_{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
threading,
..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 6, "Should have 6 records (3 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "All records with same UMI should be in 1 group");
Ok(())
}
#[rstest]
#[case::identity(Strategy::Identity, 0, 3)]
#[case::edit(Strategy::Edit, 1, 2)]
#[case::adjacency(Strategy::Adjacency, 1, 2)]
fn test_multi_thread_strategies_raw_byte(
#[case] strategy: Strategy,
#[case] edits: u32,
#[case] expected_groups: usize,
) -> Result<()> {
let mut records = Vec::new();
for i in 0..5 {
let (r1, r2) = build_test_pair(&format!("a{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
for i in 0..3 {
let (r1, r2) = build_test_pair(&format!("b{i}"), 0, 100, 300, 60, 60, "AAAAAC");
records.push(r1);
records.push(r2);
}
for i in 0..2 {
let (r1, r2) = build_test_pair(&format!("c{i}"), 0, 100, 300, 60, 60, "TTTTTT");
records.push(r1);
records.push(r2);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
threading: ThreadingOptions::new(4),
..test_group_cmd(strategy, edits)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 20, "Should have all 20 records (10 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(
unique_groups, expected_groups,
"Strategy {strategy:?} with edits={edits}: expected {expected_groups} groups, got {unique_groups}"
);
Ok(())
}
#[test]
fn test_multi_thread_raw_byte_with_unmapped_mate() -> Result<()> {
let mut records = Vec::new();
let (r1, r2) = build_test_pair("good", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
let (r1_bad, r2_bad) =
build_test_pair_mapped_with_unmapped_mate("bad", 0, 200, 10, "CCCCCC");
records.push(r1_bad);
records.push(r2_bad);
let (r1b, r2b) = build_test_pair("good2", 0, 400, 600, 60, 60, "GGGGGG");
records.push(r1b);
records.push(r2b);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
threading: ThreadingOptions::new(4),
min_map_q: Some(30),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(
output_records.len(),
4,
"Should have 4 records (2 good pairs); bad pair filtered by MAPQ"
);
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Should have 2 groups (different positions)");
Ok(())
}
#[test]
fn test_multi_thread_raw_byte_mixed_single_paired() -> Result<()> {
let mut records = Vec::new();
for i in 0..4 {
let (r1, r2) = build_test_pair(&format!("pair{i}"), 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1);
records.push(r2);
}
for i in 0..3 {
let r = build_test_read(&format!("frag{i}"), 0, 500, 60, 0x0, "TTTTTT");
records.push(r);
}
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
threading: ThreadingOptions::new(4),
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 11, "Should have all 11 records");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Should have 2 groups (paired vs single-end positions)");
Ok(())
}
fn make_raw_bam_for_group(
name: &[u8],
flag: u16,
mapq: u8,
umi: &[u8],
) -> fgumi_raw_bam::RawRecord {
use crate::sort::bam_fields;
let seq_len = 4usize;
let l_read_name = (name.len() + 1) as u8;
let cigar_ops: &[u32] = if (flag & bam_fields::flags::UNMAPPED) == 0 {
&[(seq_len as u32) << 4] } else {
&[]
};
let n_cigar_op = cigar_ops.len() as u16;
let seq_bytes = seq_len.div_ceil(2);
let total = 32 + l_read_name as usize + cigar_ops.len() * 4 + seq_bytes + seq_len;
let mut buf = vec![0u8; total];
buf[0..4].copy_from_slice(&0i32.to_le_bytes()); buf[4..8].copy_from_slice(&100i32.to_le_bytes()); buf[8] = l_read_name;
buf[9] = mapq;
buf[12..14].copy_from_slice(&n_cigar_op.to_le_bytes());
buf[14..16].copy_from_slice(&flag.to_le_bytes());
buf[16..20].copy_from_slice(&(seq_len as u32).to_le_bytes());
buf[20..24].copy_from_slice(&(-1i32).to_le_bytes()); buf[24..28].copy_from_slice(&(-1i32).to_le_bytes());
let name_start = 32;
buf[name_start..name_start + name.len()].copy_from_slice(name);
buf[name_start + name.len()] = 0;
let cigar_start = name_start + l_read_name as usize;
for (i, &op) in cigar_ops.iter().enumerate() {
let off = cigar_start + i * 4;
buf[off..off + 4].copy_from_slice(&op.to_le_bytes());
}
buf.extend_from_slice(b"RXZ");
buf.extend_from_slice(umi);
buf.push(0);
fgumi_raw_bam::RawRecord::from(buf)
}
#[test]
fn test_group_filter_template_raw_accepts_valid() {
let raw = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
b"ACGTACGT",
);
let template =
Template::from_records(vec![raw]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.accepted_templates, 1);
}
#[test]
fn test_group_filter_template_raw_rejects_low_mapq() {
let raw = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
10,
b"ACGTACGT",
);
let template =
Template::from_records(vec![raw]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 20,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_group_filter_template_raw_rejects_qc_fail() {
let raw = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED
| crate::sort::bam_fields::flags::QC_FAIL,
30,
b"ACGT",
);
let template =
Template::from_records(vec![raw]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_non_pf, 1);
}
#[test]
fn test_group_filter_template_raw_rejects_umi_with_n() {
let raw = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
b"ANGT",
);
let template =
Template::from_records(vec![raw]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_ns_in_umi, 1);
}
#[test]
fn test_group_filter_template_raw_rejects_short_umi() {
let raw = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
b"AC",
);
let template =
Template::from_records(vec![raw]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 0,
include_non_pf: false,
min_umi_length: Some(6),
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_umi_too_short, 1);
}
#[test]
fn test_group_filter_template_raw_rejects_unmapped() {
let raw = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::UNMAPPED
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
0,
b"ACGT",
);
let template =
Template::from_records(vec![raw]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
assert_eq!(metrics.discarded_poor_alignment, 1);
}
#[test]
fn test_group_filter_template_raw_truncated_record_treated_as_missing() {
let short_rec = [0u8; 16]; let valid_rec = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
b"ACGT",
);
assert_eq!(crate::sort::bam_fields::MIN_BAM_RECORD_LEN, 32);
assert!(valid_rec.len() >= crate::sort::bam_fields::MIN_BAM_RECORD_LEN);
assert!(short_rec.len() < crate::sort::bam_fields::MIN_BAM_RECORD_LEN);
}
#[test]
fn test_group_filter_template_raw_no_primary_reads() {
let supp = make_raw_bam_for_group(
b"rea",
crate::sort::bam_fields::flags::PAIRED
| crate::sort::bam_fields::flags::FIRST_SEGMENT
| crate::sort::bam_fields::flags::SUPPLEMENTARY
| crate::sort::bam_fields::flags::MATE_UNMAPPED,
30,
b"ACGT",
);
let template =
Template::from_records(vec![supp]).expect("Template::from_raw_records should succeed");
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 0,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
assert!(!filter_template_raw(&template, &config, &mut metrics));
}
fn create_queryname_sorted_header() -> sam::Header {
use noodles::sam::header::record::value::{Map, map::ReferenceSequence};
use noodles::sam::header::record::value::{
Map as HeaderRecordMap,
map::{Header as HeaderRecord, Tag as HeaderTag},
};
let mut builder = sam::Header::builder();
let HeaderTag::Other(so_tag) = HeaderTag::from([b'S', b'O']) else { unreachable!() };
let map = HeaderRecordMap::<HeaderRecord>::builder()
.insert(so_tag, "queryname")
.build()
.expect("valid header record");
builder = builder.set_header(map);
builder = builder.add_reference_sequence(
BString::from("chr1"),
Map::<ReferenceSequence>::new(
NonZeroUsize::new(248_956_422).expect("non-zero chr1 length"),
),
);
builder.build()
}
fn build_unmapped_test_pair(
name: &str,
umi: &str,
) -> (fgumi_raw_bam::RawRecord, fgumi_raw_bam::RawRecord) {
use fgumi_raw_bam::UnmappedSamBuilder;
let seq = vec![b'A'; 100];
let quals = vec![30u8; 100];
let r1_flags =
flags::PAIRED | flags::FIRST_SEGMENT | flags::UNMAPPED | flags::MATE_UNMAPPED;
let mut b1 = UnmappedSamBuilder::new();
b1.build_record(name.as_bytes(), r1_flags, &seq, &quals);
b1.append_string_tag(b"RX", umi.as_bytes());
let r1 = b1.build();
let r2_flags = flags::PAIRED | flags::LAST_SEGMENT | flags::UNMAPPED | flags::MATE_UNMAPPED;
let mut b2 = UnmappedSamBuilder::new();
b2.build_record(name.as_bytes(), r2_flags, &seq, &quals);
b2.append_string_tag(b"RX", umi.as_bytes());
let r2 = b2.build();
(r1, r2)
}
fn create_queryname_sorted_test_bam(
records: Vec<fgumi_raw_bam::RawRecord>,
) -> Result<NamedTempFile> {
let temp_file = NamedTempFile::new()?;
let header = create_queryname_sorted_header();
let mut writer = bam::io::writer::Builder.build_from_path(temp_file.path())?;
writer.write_header(&header)?;
for record in &records {
let record_buf =
raw_record_to_record_buf(record, &header).map_err(std::io::Error::other)?;
writer.write_alignment_record(&header, &record_buf)?;
}
drop(writer);
Ok(temp_file)
}
#[test]
fn test_filter_template_allows_unmapped_when_enabled() -> Result<()> {
let (r1, r2) = build_unmapped_test_pair("unmapped", "AAAAAA");
let template = Template::from_records(vec![r1, r2])?;
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 1,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: true,
};
let mut metrics = FilterMetrics::new();
let should_keep = filter_template_raw(&template, &config, &mut metrics);
assert!(should_keep, "Unmapped template should be kept when allow_unmapped=true");
assert_eq!(metrics.total_templates, 2, "Should count 2 records");
assert_eq!(metrics.discarded_poor_alignment, 0, "Should not discard for poor alignment");
Ok(())
}
#[test]
fn test_filter_template_rejects_unmapped_by_default() -> Result<()> {
let (r1, r2) = build_unmapped_test_pair("unmapped", "AAAAAA");
let template = Template::from_records(vec![r1, r2])?;
let config = GroupFilterConfig {
umi_tag: [b'R', b'X'],
min_mapq: 1,
include_non_pf: false,
min_umi_length: None,
no_umi: false,
allow_unmapped: false,
};
let mut metrics = FilterMetrics::new();
let should_keep = filter_template_raw(&template, &config, &mut metrics);
assert!(!should_keep, "Unmapped template should be rejected when allow_unmapped=false");
assert_eq!(metrics.total_templates, 2, "Should count 2 records");
assert_eq!(
metrics.discarded_poor_alignment, 2,
"Should discard both reads for poor alignment"
);
Ok(())
}
#[test]
fn test_allow_unmapped_groups_unmapped_reads() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_unmapped_test_pair("read_a", "AAAAAA");
let (r1_b, r2_b) = build_unmapped_test_pair("read_b", "AAAAAA");
let (r1_c, r2_c) = build_unmapped_test_pair("read_c", "AAAAAA");
records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
records.push(r1_c);
records.push(r2_c);
let (r1_d, r2_d) = build_unmapped_test_pair("read_d", "TTTTTT");
records.push(r1_d);
records.push(r2_d);
let input = create_queryname_sorted_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
allow_unmapped: true,
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 8, "Should have all 8 records (4 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 2, "Should have 2 unique UMI groups");
Ok(())
}
#[test]
fn test_allow_unmapped_adjacency_strategy() -> Result<()> {
let mut records = Vec::new();
let (r1_a, r2_a) = build_unmapped_test_pair("read_a", "AAAAAA");
let (r1_b, r2_b) = build_unmapped_test_pair("read_b", "TAAAAA"); records.push(r1_a);
records.push(r2_a);
records.push(r1_b);
records.push(r2_b);
let input = create_queryname_sorted_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
allow_unmapped: true,
..test_group_cmd(Strategy::Adjacency, 1)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 4, "Should have all 4 records (2 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "UMIs within 1 edit should be in same group");
Ok(())
}
#[test]
fn test_without_allow_unmapped_rejects_unmapped_reads() -> Result<()> {
let mut records = Vec::new();
let (r1_unmapped, r2_unmapped) = build_unmapped_test_pair("unmapped", "AAAAAA");
records.push(r1_unmapped);
records.push(r2_unmapped);
let (r1_mapped, r2_mapped) = build_test_pair("mapped", 0, 100, 300, 60, 60, "TTTTTT");
records.push(r1_mapped);
records.push(r2_mapped);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
allow_unmapped: false,
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 2, "Should only have mapped pair (unmapped filtered)");
Ok(())
}
#[test]
fn test_allow_unmapped_mixed_mapped_unmapped() -> Result<()> {
let mut records = Vec::new();
let (r1_unmapped, r2_unmapped) = build_unmapped_test_pair("unmapped", "AAAAAA");
records.push(r1_unmapped);
records.push(r2_unmapped);
let (r1_mapped, r2_mapped) = build_test_pair("mapped", 0, 100, 300, 60, 60, "AAAAAA");
records.push(r1_mapped);
records.push(r2_mapped);
let input = create_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
allow_unmapped: true,
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 4, "Should have all 4 records");
let unique_groups = count_unique_mi_tags(&output_records);
assert!((1..=2).contains(&unique_groups), "Should have 1-2 MI groups");
Ok(())
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::pipeline_1(ThreadingOptions::new(1))]
#[case::pipeline_2(ThreadingOptions::new(2))]
fn test_allow_unmapped_threading_modes(#[case] threading: ThreadingOptions) -> Result<()> {
let mut records = Vec::new();
for i in 1..=3 {
let (r1, r2) = build_unmapped_test_pair(&format!("read_{i}"), "AAAAAA");
records.push(r1);
records.push(r2);
}
let input = create_queryname_sorted_test_bam(records)?;
let paths = TestPaths::new()?;
let cmd = GroupReadsByUmi {
io: BamIoOptions {
input: input.path().to_path_buf(),
output: paths.output.clone(),
async_reader: false,
},
allow_unmapped: true,
threading,
..test_group_cmd(Strategy::Identity, 0)
};
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
assert_eq!(output_records.len(), 6, "Should have 6 records (3 pairs)");
let unique_groups = count_unique_mi_tags(&output_records);
assert_eq!(unique_groups, 1, "All records with same UMI should be in 1 group");
Ok(())
}
}