use crate::alignment_tags::regenerate_alignment_tags_raw;
use crate::bam_io::create_bam_reader_for_pipeline;
use crate::consensus_filter::{
FilterConfig, FilterResult, compute_read_stats_raw, filter_duplex_read_raw, filter_read_raw,
is_duplex_consensus_raw, mask_bases_raw, mask_duplex_bases_raw, template_passes_raw,
};
use crate::grouper::{SingleRawRecordGrouper, TemplateGrouper};
use crate::logging::OperationTimer;
use crate::read_info::LibraryIndex;
use crate::reference::ReferenceReader;
use crate::sort::bam_fields;
use crate::tag_reversal::reverse_per_base_tags_raw;
use crate::template::TemplateBatch;
use crate::unified_pipeline::{
BamPipelineConfig, BatchWeight, GroupKeyConfig, Grouper, MemoryEstimate,
run_bam_pipeline_from_reader, run_bam_pipeline_from_reader_with_secondary,
};
use crate::validation::validate_file_exists;
use ahash::AHashMap;
use anyhow::{Result, bail};
use clap::Parser;
use crossbeam_queue::SegQueue;
use log::info;
use noodles::sam::Header;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use crate::commands::command::Command;
use crate::commands::common::{
BamIoOptions, CompressionOptions, QueueMemoryOptions, SchedulerOptions, ThreadingOptions,
build_pipeline_config, parse_bool,
};
#[derive(Debug, Parser)]
#[command(
name = "filter",
about = "\x1b[38;5;173m[POST-CONSENSUS]\x1b[0m \x1b[36mFilter consensus reads based on quality metrics\x1b[0m",
long_about = r#"
Filters consensus reads generated by simplex or duplex commands.
Two kinds of filtering are performed:
1. Masking/filtering of individual bases in reads
2. Filtering out of reads (i.e. not writing them to the output file)
Base-level filtering/masking is only applied if per-base tags are present (see duplex and simplex for
descriptions of these tags). Read-level filtering is always applied. When filtering reads, secondary alignments
and supplementary records may be removed independently if they fail one or more filters; if either R1 or R2
primary alignments fail a filter then all records for the template will be filtered out.
The filters applied are as follows:
1. Reads with fewer than min-reads contributing reads are filtered out
2. Reads with an average consensus error rate higher than max-read-error-rate are filtered out
3. Reads with mean base quality of the consensus read, prior to any masking, less than min-mean-base-quality
are filtered out (if specified)
4. Bases with quality scores below min-base-quality are masked to Ns
5. Bases with fewer than min-reads contributing raw reads are masked to Ns
6. Bases with a consensus error rate (defined as the fraction of contributing reads that voted for a different
base than the consensus call) higher than max-base-error-rate are masked to Ns
7. Reads with a fraction or count of Ns higher than max-no-call-fraction after per-base filtering are filtered out.
When filtering single-umi consensus reads generated by simplex, a single value each
should be supplied for --min-reads, --max-read-error-rate, and --max-base-error-rate.
When filtering duplex consensus reads generated by duplex, each of the three parameters
may independently take 1-3 values. For example:
fgumi filter ... --min-reads 10,5,3 --max-base-error-rate 0.1
In each case if fewer than three values are supplied, the last value is repeated (i.e. `80,40` -> `80 40 40`
and `0.1` -> `0.1 0.1 0.1`). The first value applies to the final consensus read, the second value to one
single-strand consensus, and the last value to the other single-strand consensus. It is required that if
values two and three differ, the more stringent value comes earlier.
In order to correctly filter reads in or out by template, the input BAM must be either queryname sorted or
query grouped. If your BAM is not already in an appropriate order, this can be done in streaming fashion with:
fgumi sort -i in.bam --order queryname | fgumi filter -i /dev/stdin ...
The output sort order may be specified with --sort-order. If not given, then the output will be in the same
order as input.
The --reverse-per-base-tags option controls whether per-base tags should be reversed before being used on reads
marked as being mapped to the negative strand. This is necessary if the reads have been mapped and the
bases/quals reversed but the consensus tags have not. If true, the tags written to the output BAM will be
reversed where necessary in order to line up with the bases and quals.
"#
)]
#[allow(clippy::struct_excessive_bools)]
pub struct Filter {
#[command(flatten)]
pub io: BamIoOptions,
#[arg(short = 'r', long = "ref")]
pub reference: Option<PathBuf>,
#[arg(short = 'M', long = "min-reads", value_delimiter = ',')]
pub min_reads: Vec<usize>,
#[arg(
short = 'E',
long = "max-read-error-rate",
value_delimiter = ',',
default_value = "0.025"
)]
pub max_read_error_rate: Vec<f64>,
#[arg(short = 'e', long = "max-base-error-rate", value_delimiter = ',', default_value = "0.1")]
pub max_base_error_rate: Vec<f64>,
#[arg(short = 'N', long = "min-base-quality")]
pub min_base_quality: Option<u8>,
#[arg(short = 'q', long = "min-mean-base-quality")]
pub min_mean_base_quality: Option<f64>,
#[arg(short = 'n', long = "max-no-call-fraction", default_value = "0.2")]
pub max_no_call_fraction: f64,
#[arg(short = 'R', long = "reverse-per-base-tags", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub reverse_per_base_tags: bool,
#[command(flatten)]
pub threading: ThreadingOptions,
#[arg(long = "filter-by-template", default_value = "true", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub filter_by_template: bool,
#[arg(long = "rejects")]
pub rejects: Option<PathBuf>,
#[arg(long = "stats")]
pub stats: Option<PathBuf>,
#[arg(short = 's', long = "require-single-strand-agreement", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub require_single_strand_agreement: bool,
#[command(flatten)]
pub compression: CompressionOptions,
#[command(flatten)]
pub scheduler_opts: SchedulerOptions,
#[command(flatten)]
pub queue_memory: QueueMemoryOptions,
}
struct FilterProcessedBatchRaw {
kept_records: Vec<Vec<u8>>,
rejected_records: Vec<Vec<u8>>,
records_count: u64,
passed_count: u64,
bases_masked: u64,
}
impl MemoryEstimate for FilterProcessedBatchRaw {
fn estimate_heap_size(&self) -> usize {
let vec_overhead = std::mem::size_of::<Vec<u8>>();
let kept_outer = self.kept_records.capacity() * vec_overhead;
let kept_inner: usize = self.kept_records.iter().map(|v| v.capacity()).sum();
let rejected_outer = self.rejected_records.capacity() * vec_overhead;
let rejected_inner: usize = self.rejected_records.iter().map(|v| v.capacity()).sum();
kept_outer + kept_inner + rejected_outer + rejected_inner
}
}
fn serialize_raw_records(records: &[Vec<u8>], output: &mut Vec<u8>) -> io::Result<u64> {
for record in records {
let len = u32::try_from(record.len()).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("BAM record too large ({} bytes) for u32 block_size", record.len()),
)
})?;
output.extend_from_slice(&len.to_le_bytes());
output.extend_from_slice(record);
}
Ok(records.len() as u64)
}
#[derive(Default)]
struct CollectedFilterMetrics {
total_records: u64,
passed_records: u64,
failed_records: u64,
total_bases_masked: u64,
}
struct FilterPipelineSetup {
pipeline_config: BamPipelineConfig,
config: Arc<FilterConfig>,
reference: Option<Arc<ReferenceReader>>,
collected_metrics: Arc<SegQueue<CollectedFilterMetrics>>,
progress_counter: Arc<AtomicU64>,
}
struct FilterProcessCaptures {
config: Arc<FilterConfig>,
reference: Option<Arc<ReferenceReader>>,
min_base_quality: Option<u8>,
should_reverse_tags: bool,
min_mean_base_quality: Option<f64>,
max_no_call_fraction: f64,
require_single_strand_agreement: bool,
progress: Arc<AtomicU64>,
header: Header,
}
impl Command for Filter {
fn execute(&self, command_line: &str) -> Result<()> {
validate_file_exists(&self.io.input, "Input BAM")?;
if let Some(ref reference) = self.reference {
validate_file_exists(reference, "Reference FASTA")?;
}
self.validate_parameters()?;
let timer = OperationTimer::new("Filtering consensus reads");
info!("Starting Filter");
info!("Input: {}", self.io.input.display());
info!("Output: {}", self.io.output.display());
match &self.reference {
Some(r) => info!("Reference: {}", r.display()),
None => info!("Reference: <none> (tag regeneration disabled)"),
}
info!("Min reads: {:?}", self.min_reads);
info!("Max read error rate: {:?}", self.max_read_error_rate);
info!("Max base error rate: {:?}", self.max_base_error_rate);
if let Some(q) = self.min_base_quality {
info!("Min base quality: {q}");
}
if let Some(q) = self.min_mean_base_quality {
info!("Min mean base quality: {q}");
}
info!("Max no-call fraction: {}", self.max_no_call_fraction);
let (reader, header) = create_bam_reader_for_pipeline(&self.io.input)?;
let header = crate::commands::common::add_pg_record(header, command_line)?;
let track_rejects = self.rejects.is_some();
let threads = self.threading.threads.unwrap_or(1);
let total_reads = if self.filter_by_template {
self.execute_threads_mode_template(threads, reader, header, track_rejects)?
} else {
self.execute_threads_mode_single_read(threads, reader, header, track_rejects)?
};
timer.log_completion(total_reads);
Ok(())
}
}
impl Filter {
fn setup_pipeline(&self, num_threads: usize, header: &Header) -> Result<FilterPipelineSetup> {
let mut pipeline_config = build_pipeline_config(
&self.scheduler_opts,
&self.compression,
&self.queue_memory,
num_threads,
)?;
let library_index = LibraryIndex::from_header(header);
pipeline_config.group_key_config = Some(GroupKeyConfig::new_raw_no_cell(library_index));
let config = Arc::new(FilterConfig::new(
&self.min_reads,
&self.max_read_error_rate,
&self.max_base_error_rate,
self.min_base_quality,
self.min_mean_base_quality,
self.max_no_call_fraction,
));
let reference: Option<Arc<ReferenceReader>> = match &self.reference {
Some(ref_path) => {
info!("Loading reference genome into memory...");
let ref_load_start = Instant::now();
let r = Arc::new(ReferenceReader::new(ref_path)?);
info!("Reference loaded in {:.1}s", ref_load_start.elapsed().as_secs_f64());
Some(r)
}
None => None,
};
let collected_metrics: Arc<SegQueue<CollectedFilterMetrics>> = Arc::new(SegQueue::new());
let progress_counter = Arc::new(AtomicU64::new(0));
Ok(FilterPipelineSetup {
pipeline_config,
config,
reference,
collected_metrics,
progress_counter,
})
}
fn process_captures(
&self,
setup: &FilterPipelineSetup,
header: &Header,
) -> FilterProcessCaptures {
FilterProcessCaptures {
config: Arc::clone(&setup.config),
reference: setup.reference.clone(),
min_base_quality: self.min_base_quality,
should_reverse_tags: self.reverse_per_base_tags,
min_mean_base_quality: self.min_mean_base_quality,
max_no_call_fraction: self.max_no_call_fraction,
require_single_strand_agreement: self.require_single_strand_agreement,
progress: Arc::clone(&setup.progress_counter),
header: header.clone(),
}
}
fn run_filter_pipeline<G, GrouperFn, ProcessFn>(
&self,
setup: FilterPipelineSetup,
reader: Box<dyn std::io::Read + Send>,
header: Header,
grouper_fn: GrouperFn,
process_fn: ProcessFn,
) -> Result<u64>
where
G: Send + BatchWeight + MemoryEstimate + 'static,
GrouperFn: FnOnce(&Header) -> Box<dyn Grouper<Group = G> + Send>,
ProcessFn: Fn(G) -> io::Result<FilterProcessedBatchRaw> + Send + Sync + 'static,
{
let collected_for_serialize = Arc::clone(&setup.collected_metrics);
let serialize_fn = move |processed: FilterProcessedBatchRaw,
_header: &Header,
output: &mut Vec<u8>|
-> io::Result<u64> {
collected_for_serialize.push(CollectedFilterMetrics {
total_records: processed.records_count,
passed_records: processed.passed_count,
failed_records: processed.records_count - processed.passed_count,
total_bases_masked: processed.bases_masked,
});
serialize_raw_records(&processed.kept_records, output)
};
if let Some(rejects_path) = &self.rejects {
let secondary_serialize_fn =
|batch: &FilterProcessedBatchRaw, buf: &mut Vec<u8>| -> io::Result<u64> {
serialize_raw_records(&batch.rejected_records, buf)
};
run_bam_pipeline_from_reader_with_secondary(
setup.pipeline_config,
reader,
header,
&self.io.output,
None,
rejects_path,
grouper_fn,
process_fn,
serialize_fn,
secondary_serialize_fn,
)?;
} else {
run_bam_pipeline_from_reader(
setup.pipeline_config,
reader,
header,
&self.io.output,
None,
grouper_fn,
process_fn,
serialize_fn,
)?;
}
let mut total_reads = 0u64;
let mut passed_reads = 0u64;
let mut failed_reads = 0u64;
let mut total_bases_masked = 0u64;
while let Some(metrics) = setup.collected_metrics.pop() {
total_reads += metrics.total_records;
passed_reads += metrics.passed_records;
failed_reads += metrics.failed_records;
total_bases_masked += metrics.total_bases_masked;
}
if let Some(stats_path) = &self.stats {
self.write_filter_stats(stats_path, total_reads, passed_reads, failed_reads)?;
}
info!("Processed {total_reads} reads; kept {passed_reads} and rejected {failed_reads}");
if self.rejects.is_some() && failed_reads > 0 {
info!("Wrote {failed_reads} rejected records to rejects file");
}
info!("Total bases masked: {total_bases_masked}");
Ok(total_reads)
}
fn execute_threads_mode_single_read(
&self,
num_threads: usize,
reader: Box<dyn std::io::Read + Send>,
header: Header,
track_rejects: bool,
) -> Result<u64> {
let setup = self.setup_pipeline(num_threads, &header)?;
let ctx = self.process_captures(&setup, &header);
let grouper_fn = move |_header: &Header| {
Box::new(SingleRawRecordGrouper::new()) as Box<dyn Grouper<Group = Vec<u8>> + Send>
};
let process_fn = move |mut record: Vec<u8>| -> io::Result<FilterProcessedBatchRaw> {
let mut kept_records = Vec::new();
let mut rejected_records = Vec::new();
let mut passed_count = 0u64;
let (bases_masked, pass) = Self::process_record_raw(
&mut record,
&ctx.config,
ctx.reference.as_deref(),
&ctx.header,
ctx.should_reverse_tags,
ctx.min_base_quality,
ctx.require_single_strand_agreement,
ctx.min_mean_base_quality,
ctx.max_no_call_fraction,
)
.map_err(io::Error::other)?;
if pass {
passed_count = 1;
kept_records.push(record);
} else if track_rejects {
rejected_records.push(record);
}
let count = ctx.progress.fetch_add(1, Ordering::Relaxed);
if (count + 1).is_multiple_of(1_000_000) {
info!("Processed {} records", count + 1);
}
Ok(FilterProcessedBatchRaw {
kept_records,
rejected_records,
records_count: 1,
passed_count,
bases_masked,
})
};
self.run_filter_pipeline(setup, reader, header, grouper_fn, process_fn)
}
fn execute_threads_mode_template(
&self,
num_threads: usize,
reader: Box<dyn std::io::Read + Send>,
header: Header,
track_rejects: bool,
) -> Result<u64> {
let setup = self.setup_pipeline(num_threads, &header)?;
let ctx = self.process_captures(&setup, &header);
#[cfg(test)]
const BATCH_SIZE: usize = 50;
#[cfg(not(test))]
const BATCH_SIZE: usize = 1000;
let grouper_fn = move |_header: &Header| {
Box::new(TemplateGrouper::new(BATCH_SIZE))
as Box<dyn Grouper<Group = TemplateBatch> + Send>
};
let process_fn = move |batch: TemplateBatch| -> io::Result<FilterProcessedBatchRaw> {
let mut kept_records: Vec<Vec<u8>> = Vec::new();
let mut rejected_records: Vec<Vec<u8>> = Vec::new();
let mut total_records = 0u64;
let mut passed_count = 0u64;
let mut bases_masked = 0u64;
for template in batch {
let mut template_records = template
.into_raw_records()
.ok_or_else(|| io::Error::other("template has no raw records"))?;
let mut pass_map: AHashMap<usize, bool> = AHashMap::new();
for (idx, record) in template_records.iter_mut().enumerate() {
total_records += 1;
let (masked, pass) = Self::process_record_raw(
record,
&ctx.config,
ctx.reference.as_deref(),
&ctx.header,
ctx.should_reverse_tags,
ctx.min_base_quality,
ctx.require_single_strand_agreement,
ctx.min_mean_base_quality,
ctx.max_no_call_fraction,
)
.map_err(io::Error::other)?;
bases_masked += masked;
pass_map.insert(idx, pass);
}
let template_pass = template_passes_raw(&template_records, &pass_map);
for (idx, record) in template_records.into_iter().enumerate() {
let flags = bam_fields::flags(&record);
let is_primary = (flags & bam_fields::flags::SECONDARY) == 0
&& (flags & bam_fields::flags::SUPPLEMENTARY) == 0;
if is_primary {
if template_pass {
passed_count += 1;
kept_records.push(record);
} else if track_rejects {
rejected_records.push(record);
}
} else {
let record_pass = pass_map.get(&idx).copied().unwrap_or(false);
if template_pass && record_pass {
passed_count += 1;
kept_records.push(record);
} else if track_rejects {
rejected_records.push(record);
}
}
}
}
let count = ctx.progress.fetch_add(total_records, Ordering::Relaxed);
if (count + total_records) / 1_000_000 > count / 1_000_000 {
info!("Processed {} records", count + total_records);
}
Ok(FilterProcessedBatchRaw {
kept_records,
rejected_records,
records_count: total_records,
passed_count,
bases_masked,
})
};
self.run_filter_pipeline(setup, reader, header, grouper_fn, process_fn)
}
fn write_filter_stats(
&self,
path: &std::path::Path,
total: u64,
passed: u64,
failed: u64,
) -> Result<()> {
use std::fs::File;
use std::io::Write;
let mut file = File::create(path)?;
writeln!(file, "total_reads\t{total}")?;
writeln!(file, "passed_reads\t{passed}")?;
writeln!(file, "failed_reads\t{failed}")?;
#[allow(clippy::cast_precision_loss)]
let pass_rate = if total > 0 { passed as f64 / total as f64 } else { 0.0 };
writeln!(file, "pass_rate\t{pass_rate:.4}")?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn process_record_raw(
record: &mut Vec<u8>,
config: &FilterConfig,
reference: Option<&ReferenceReader>,
header: &Header,
reverse_tags: bool,
min_base_quality: Option<u8>,
require_ss_agreement: bool,
min_mean_base_quality: Option<f64>,
max_no_call_fraction: f64,
) -> Result<(u64, bool)> {
if reference.is_none() {
let flags = bam_fields::flags(record);
if (flags & bam_fields::flags::UNMAPPED) == 0 {
bail!(
"--ref is required when filtering mapped reads \
to keep NM/UQ/MD tags consistent"
);
}
}
if reverse_tags {
reverse_per_base_tags_raw(record)?;
}
let is_duplex = {
let aux = bam_fields::aux_data_slice(record);
is_duplex_consensus_raw(aux)
};
let masked_count = if is_duplex {
let (cc_thresh, ab_thresh, ba_thresh) = config
.duplex_thresholds()
.ok_or_else(|| anyhow::anyhow!("No duplex thresholds configured"))?;
mask_duplex_bases_raw(
record,
cc_thresh,
ab_thresh,
ba_thresh,
min_base_quality,
require_ss_agreement,
)?
} else {
let thresholds = config
.effective_single_strand_thresholds()
.ok_or_else(|| anyhow::anyhow!("No thresholds configured"))?;
mask_bases_raw(record, thresholds, min_base_quality)?
};
if let Some(reference) = reference {
regenerate_alignment_tags_raw(record, header, reference)?;
}
let pass = {
let aux = bam_fields::aux_data_slice(record);
if is_duplex {
let (cc_thresh, ab_thresh, ba_thresh) = config
.duplex_thresholds()
.ok_or_else(|| anyhow::anyhow!("No duplex thresholds configured"))?;
Self::check_duplex_filters_raw(
record,
aux,
cc_thresh,
ab_thresh,
ba_thresh,
min_mean_base_quality,
max_no_call_fraction,
)?
} else {
let thresholds = config
.effective_single_strand_thresholds()
.ok_or_else(|| anyhow::anyhow!("No thresholds configured"))?;
Self::check_filters_raw(
record,
aux,
thresholds,
min_mean_base_quality,
max_no_call_fraction,
)?
}
};
Ok((masked_count as u64, pass))
}
fn check_no_call_and_quality(
bam: &[u8],
min_mean_qual: Option<f64>,
max_no_call_frac: f64,
) -> bool {
let (no_calls, mean_qual) = compute_read_stats_raw(bam);
if let Some(min_qual) = min_mean_qual {
if mean_qual < min_qual {
return false;
}
}
let seq_len = bam_fields::l_seq(bam) as usize;
if max_no_call_frac >= 1.0 {
(no_calls as f64) <= max_no_call_frac
} else {
let no_call_frac = if seq_len > 0 { no_calls as f64 / seq_len as f64 } else { 0.0 };
no_call_frac <= max_no_call_frac
}
}
fn check_filters_raw(
bam: &[u8],
aux_data: &[u8],
thresholds: &crate::consensus_filter::FilterThresholds,
min_mean_qual: Option<f64>,
max_no_call_frac: f64,
) -> Result<bool> {
let filter_result = filter_read_raw(aux_data, thresholds)?;
if filter_result != FilterResult::Pass {
return Ok(false);
}
Ok(Self::check_no_call_and_quality(bam, min_mean_qual, max_no_call_frac))
}
fn check_duplex_filters_raw(
bam: &[u8],
aux_data: &[u8],
cc_thresholds: &crate::consensus_filter::FilterThresholds,
ab_thresholds: &crate::consensus_filter::FilterThresholds,
ba_thresholds: &crate::consensus_filter::FilterThresholds,
min_mean_qual: Option<f64>,
max_no_call_frac: f64,
) -> Result<bool> {
let filter_result =
filter_duplex_read_raw(aux_data, cc_thresholds, ab_thresholds, ba_thresholds)?;
if filter_result != FilterResult::Pass {
return Ok(false);
}
Ok(Self::check_no_call_and_quality(bam, min_mean_qual, max_no_call_frac))
}
fn validate_parameters(&self) -> Result<()> {
if self.min_reads.is_empty() || self.min_reads.len() > 3 {
bail!("--min-reads must have 1-3 values, got {}", self.min_reads.len());
}
if self.max_read_error_rate.len() > 3 {
bail!(
"--max-read-error-rate must have 1-3 values, got {}",
self.max_read_error_rate.len()
);
}
for &rate in &self.max_read_error_rate {
if !(0.0..=1.0).contains(&rate) {
bail!("--max-read-error-rate must be between 0.0 and 1.0, got {rate}");
}
}
if self.max_base_error_rate.len() > 3 {
bail!(
"--max-base-error-rate must have 1-3 values, got {}",
self.max_base_error_rate.len()
);
}
for &rate in &self.max_base_error_rate {
if !(0.0..=1.0).contains(&rate) {
bail!("--max-base-error-rate must be between 0.0 and 1.0, got {rate}");
}
}
if self.max_no_call_fraction < 0.0 {
bail!("--max-no-call-fraction must be >= 0.0, got {}", self.max_no_call_fraction);
}
if self.max_no_call_fraction >= 1.0 && self.max_no_call_fraction.fract() != 0.0 {
bail!(
"--max-no-call-fraction >= 1.0 must be an integer (count of bases), got {}",
self.max_no_call_fraction
);
}
if self.min_reads.len() >= 2 {
let cc_min = self.min_reads[0];
let ab_min = self.min_reads[1];
if ab_min > cc_min {
bail!(
"min-reads values must be specified high to low (duplex >= AB), got {cc_min} < {ab_min}"
);
}
}
if self.min_reads.len() == 3 {
let ab_min = self.min_reads[1];
let ba_min = self.min_reads[2];
if ba_min > ab_min {
bail!(
"min-reads values must be specified high to low (AB >= BA), got {ab_min} < {ba_min}"
);
}
}
if self.max_read_error_rate.len() >= 3 {
let ab_error = self.max_read_error_rate[1];
let ba_error = self.max_read_error_rate[2];
if ab_error > ba_error {
bail!(
"max-read-error-rate for AB must be <= BA (more stringent), got AB={ab_error} > BA={ba_error}"
);
}
}
if self.max_base_error_rate.len() >= 3 {
let ab_error = self.max_base_error_rate[1];
let ba_error = self.max_base_error_rate[2];
if ab_error > ba_error {
bail!(
"max-base-error-rate for AB must be <= BA (more stringent), got AB={ab_error} > BA={ba_error}"
);
}
}
Ok(())
}
}
#[cfg(test)]
#[allow(clippy::float_cmp)]
mod tests {
use super::*;
use noodles::sam::alignment::io::Write as AlignmentWrite;
use noodles::sam::alignment::record_buf::RecordBuf;
use rstest::rstest;
fn create_filter_with_paths(input: PathBuf, output: PathBuf, reference: PathBuf) -> Filter {
Filter {
io: BamIoOptions { input, output },
reference: Some(reference),
min_reads: vec![1],
max_read_error_rate: vec![0.025],
max_base_error_rate: vec![0.1],
min_base_quality: None,
min_mean_base_quality: None,
max_no_call_fraction: 0.2,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
}
}
#[test]
fn test_parameter_validation() {
let mut cmd = create_filter_with_paths(
PathBuf::from("input.bam"),
PathBuf::from("output.bam"),
PathBuf::from("ref.fa"),
);
cmd.min_reads = vec![3];
cmd.max_read_error_rate = vec![0.1];
cmd.max_base_error_rate = vec![0.2];
cmd.min_base_quality = Some(13);
cmd.reverse_per_base_tags = true;
cmd.max_no_call_fraction = 0.1;
assert!(cmd.validate_parameters().is_ok());
}
#[test]
fn test_invalid_error_rate() {
let mut cmd = create_filter_with_paths(
PathBuf::from("input.bam"),
PathBuf::from("output.bam"),
PathBuf::from("ref.fa"),
);
cmd.min_reads = vec![3];
cmd.max_read_error_rate = vec![1.5]; cmd.max_base_error_rate = vec![0.2];
cmd.min_base_quality = Some(13);
cmd.reverse_per_base_tags = true;
cmd.max_no_call_fraction = 0.1;
assert!(cmd.validate_parameters().is_err());
}
#[test]
fn test_default_filter_parameters() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_base_quality, Some(13));
assert!(filter.threading.is_single_threaded());
assert!(filter.filter_by_template);
}
#[test]
fn test_multithreaded_filter_configuration() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::new(8),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.threading.threads, Some(8));
}
#[test]
fn test_filter_with_optional_outputs() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: Some(20.0),
max_no_call_fraction: 0.05,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(PathBuf::from("rejects.bam")),
stats: Some(PathBuf::from("stats.txt")),
require_single_strand_agreement: true,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_mean_base_quality, Some(20.0));
assert_eq!(filter.rejects, Some(PathBuf::from("rejects.bam")));
assert_eq!(filter.stats, Some(PathBuf::from("stats.txt")));
assert!(filter.require_single_strand_agreement);
}
#[test]
fn test_validate_parameters_too_many_values() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![1, 2, 3, 4], max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(filter.validate_parameters().is_err());
}
#[test]
fn test_validate_parameters_invalid_stringency_order() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![1, 10], max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(filter.validate_parameters().is_err());
}
use crate::sam::builder::RecordBuilder;
use noodles::sam::alignment::record_buf::data::field::Value;
use noodles::sam::alignment::record_buf::data::field::value::Array;
fn create_filter_test_record(
name: &str,
sequence: &str,
qualities: &[u8],
read_depth: Option<u8>,
read_error: Option<f32>,
base_depths: Option<Vec<u16>>,
base_errors: Option<Vec<u16>>,
) -> RecordBuf {
let mut builder = RecordBuilder::new()
.name(name)
.sequence(sequence)
.qualities(qualities)
.reference_sequence_id(0)
.alignment_start(1)
.mapping_quality(60);
if let Some(depth) = read_depth {
builder = builder.tag("cD", Value::UInt8(depth));
}
if let Some(error) = read_error {
builder = builder.tag("cE", Value::Float(error));
}
if let Some(depths) = base_depths {
builder = builder.tag("cd", Value::Array(Array::UInt16(depths)));
}
if let Some(errors) = base_errors {
builder = builder.tag("ce", Value::Array(Array::UInt16(errors)));
}
builder.build()
}
#[test]
fn test_count_no_calls_empty_sequence() {
let record = create_filter_test_record("test", "", &[], None, None, None, None);
assert_eq!(crate::consensus_filter::count_no_calls(&record), 0);
}
#[test]
fn test_count_no_calls_no_ns() {
let record =
create_filter_test_record("test", "ACGT", &[30, 30, 30, 30], None, None, None, None);
assert_eq!(crate::consensus_filter::count_no_calls(&record), 0);
}
#[test]
fn test_count_no_calls_with_ns() {
let record =
create_filter_test_record("test", "ACNTN", &[30, 30, 0, 30, 0], None, None, None, None);
assert_eq!(crate::consensus_filter::count_no_calls(&record), 2);
}
#[test]
fn test_count_no_calls_all_ns() {
let record =
create_filter_test_record("test", "NNNN", &[0, 0, 0, 0], None, None, None, None);
assert_eq!(crate::consensus_filter::count_no_calls(&record), 4);
}
#[test]
fn test_mean_base_quality_empty() {
let record = create_filter_test_record("test", "", &[], None, None, None, None);
assert!((crate::consensus_filter::mean_base_quality(&record) - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_mean_base_quality_uniform() {
let record =
create_filter_test_record("test", "ACGT", &[30, 30, 30, 30], None, None, None, None);
assert!((crate::consensus_filter::mean_base_quality(&record) - 30.0).abs() < f64::EPSILON);
}
#[test]
fn test_mean_base_quality_mixed() {
let record =
create_filter_test_record("test", "ACGT", &[10, 20, 30, 40], None, None, None, None);
assert!((crate::consensus_filter::mean_base_quality(&record) - 25.0).abs() < f64::EPSILON);
}
#[test]
fn test_mask_bases_low_quality() {
use crate::consensus_filter::{FilterThresholds, mask_bases};
let mut record = create_filter_test_record(
"test",
"ACGT",
&[10, 30, 5, 30],
None,
None,
Some(vec![10, 10, 10, 10]), Some(vec![0, 0, 0, 0]), );
let thresholds =
FilterThresholds { min_reads: 1, max_read_error_rate: 1.0, max_base_error_rate: 1.0 };
mask_bases(&mut record, &thresholds, Some(20)).expect("mask_bases should succeed");
let seq = std::str::from_utf8(record.sequence().as_ref())
.expect("sequence should be valid UTF-8");
assert_eq!(seq, "NCNT");
let quals = record.quality_scores();
assert_eq!(quals.as_ref(), &[2, 30, 2, 30]);
}
#[test]
fn test_mask_bases_low_depth() {
use crate::consensus_filter::{FilterThresholds, mask_bases};
let mut record = create_filter_test_record(
"test",
"ACGT",
&[30, 30, 30, 30],
None,
None,
Some(vec![1, 10, 4, 10]), None,
);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 1.0, max_base_error_rate: 1.0 };
mask_bases(&mut record, &thresholds, Some(10)).expect("mask_bases should succeed");
let seq = std::str::from_utf8(record.sequence().as_ref())
.expect("sequence should be valid UTF-8");
assert_eq!(seq, "NCNT");
}
#[test]
fn test_mask_bases_high_error_count() {
use crate::consensus_filter::{FilterThresholds, mask_bases};
let mut record = create_filter_test_record(
"test",
"ACGT",
&[30, 30, 30, 30],
None,
None,
Some(vec![10, 10, 10, 10]),
Some(vec![1, 3, 2, 0]), );
let thresholds = FilterThresholds {
min_reads: 1,
max_read_error_rate: 1.0,
max_base_error_rate: 0.2, };
mask_bases(&mut record, &thresholds, Some(10)).expect("mask_bases should succeed");
let seq = std::str::from_utf8(record.sequence().as_ref())
.expect("sequence should be valid UTF-8");
assert_eq!(seq, "ANGT");
}
#[test]
fn test_filter_read_passes_all_thresholds() {
use crate::consensus_filter::{FilterResult, FilterThresholds, filter_read};
let record = create_filter_test_record(
"test",
"ACGT",
&[30, 30, 30, 30],
Some(10), Some(0.05), None,
None,
);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.2 };
let result = filter_read(&record, &thresholds).expect("filter_read should succeed");
assert_eq!(result, FilterResult::Pass);
}
#[test]
fn test_filter_read_fails_low_depth() {
use crate::consensus_filter::{FilterResult, FilterThresholds, filter_read};
let record = create_filter_test_record(
"test",
"ACGT",
&[30, 30, 30, 30],
Some(3), Some(0.05),
None,
None,
);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.2 };
let result = filter_read(&record, &thresholds).expect("filter_read should succeed");
assert_eq!(result, FilterResult::InsufficientReads);
}
#[test]
fn test_filter_read_fails_high_error_rate() {
use crate::consensus_filter::{FilterResult, FilterThresholds, filter_read};
let record = create_filter_test_record(
"test",
"ACGT",
&[30, 30, 30, 30],
Some(10),
Some(0.3), None,
None,
);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.2 };
let result = filter_read(&record, &thresholds).expect("filter_read should succeed");
assert_eq!(result, FilterResult::ExcessiveErrorRate);
}
#[test]
fn test_filter_read_without_tags() {
use crate::consensus_filter::{FilterResult, FilterThresholds, filter_read};
let record =
create_filter_test_record("test", "ACGT", &[30, 30, 30, 30], None, None, None, None);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.2 };
let result = filter_read(&record, &thresholds).expect("filter_read should succeed");
assert_eq!(result, FilterResult::Pass);
}
#[test]
fn test_template_passes_all_pass() {
use crate::consensus_filter::template_passes;
use ahash::AHashMap;
let r1 =
create_filter_test_record("read1", "ACGT", &[30, 30, 30, 30], None, None, None, None);
let r2 =
create_filter_test_record("read1", "GGGG", &[30, 30, 30, 30], None, None, None, None);
let records = vec![r1, r2];
let mut pass_map = AHashMap::new();
pass_map.insert(0, true);
pass_map.insert(1, true);
assert!(template_passes(&records, &pass_map));
}
#[test]
fn test_template_passes_one_fails() {
use crate::consensus_filter::template_passes;
use ahash::AHashMap;
let r1 =
create_filter_test_record("read1", "ACGT", &[30, 30, 30, 30], None, None, None, None);
let r2 =
create_filter_test_record("read1", "GGGG", &[30, 30, 30, 30], None, None, None, None);
let records = vec![r1, r2];
let mut pass_map = AHashMap::new();
pass_map.insert(0, true);
pass_map.insert(1, false);
assert!(!template_passes(&records, &pass_map));
}
#[test]
fn test_is_duplex_consensus_simplex() {
use crate::consensus_filter::is_duplex_consensus;
let record =
create_filter_test_record("test", "ACGT", &[30, 30, 30, 30], None, None, None, None);
assert!(!is_duplex_consensus(&record));
}
#[test]
fn test_is_duplex_consensus_with_tag() {
use crate::consensus_filter::is_duplex_consensus;
let mut record =
create_filter_test_record("test", "ACGT", &[30, 30, 30, 30], None, None, None, None);
record.data_mut().insert(crate::consensus_tags::per_read::tag("aD"), Value::UInt8(10));
assert!(is_duplex_consensus(&record));
}
#[test]
fn test_validate_max_no_call_fraction_integer() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 5.5, reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
let result = filter.validate_parameters();
assert!(result.is_err(), "Should reject non-integer max_no_call_fraction >= 1.0");
if let Err(e) = result {
assert!(e.to_string().contains("integer"), "Error should mention integer requirement");
}
}
#[test]
fn test_validate_max_no_call_fraction_integer_valid() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 5.0, reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(
filter.validate_parameters().is_ok(),
"Should accept integer max_no_call_fraction >= 1.0"
);
}
#[test]
fn test_filter_with_rejects_output() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(PathBuf::from("rejects.bam")),
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(filter.rejects.is_some());
assert_eq!(filter.rejects.expect("rejects should be set"), PathBuf::from("rejects.bam"));
}
#[test]
fn test_filter_with_stats_output() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: Some(PathBuf::from("stats.txt")),
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(filter.stats.is_some());
assert_eq!(filter.stats.expect("stats should be set"), PathBuf::from("stats.txt"));
}
#[test]
fn test_filter_multithreaded() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::new(8),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.threading.threads, Some(8));
}
#[test]
fn test_filter_require_single_strand_agreement() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: true,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(filter.require_single_strand_agreement);
}
#[test]
fn test_filter_by_read_not_template() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: false,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(!filter.filter_by_template);
}
#[test]
fn test_filter_reverse_per_base_tags_disabled() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(!filter.reverse_per_base_tags);
}
#[test]
fn test_filter_high_min_base_quality() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(30),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_base_quality, Some(30));
}
#[test]
fn test_filter_with_min_mean_base_quality() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: Some(25.0),
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_mean_base_quality, Some(25.0));
}
#[test]
fn test_filter_strict_error_rates() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.01],
max_base_error_rate: vec![0.005],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.01,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.max_read_error_rate[0], 0.01);
assert_eq!(filter.max_base_error_rate[0], 0.005);
assert_eq!(filter.max_no_call_fraction, 0.01);
}
#[test]
fn test_filter_lenient_error_rates() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.5],
max_base_error_rate: vec![0.5],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.max_read_error_rate[0], 0.5);
assert_eq!(filter.max_base_error_rate[0], 0.5);
assert_eq!(filter.max_no_call_fraction, 0.5);
}
#[test]
fn test_filter_high_min_reads_threshold() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![10],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_reads[0], 10);
}
#[test]
fn test_filter_duplex_different_thresholds() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![5, 3, 3], max_read_error_rate: vec![0.05, 0.1, 0.1],
max_base_error_rate: vec![0.05, 0.1, 0.1],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_reads.len(), 3);
assert_eq!(filter.min_reads[0], 5); assert_eq!(filter.min_reads[1], 3); assert_eq!(filter.min_reads[2], 3); }
#[test]
fn test_filter_all_optional_outputs() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: Some(20.0),
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::new(4),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(PathBuf::from("rejects.bam")),
stats: Some(PathBuf::from("stats.txt")),
require_single_strand_agreement: true,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert!(filter.rejects.is_some());
assert!(filter.stats.is_some());
assert!(filter.require_single_strand_agreement);
assert!(filter.min_mean_base_quality.is_some());
}
#[test]
fn test_filter_zero_no_call_fraction() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(13),
min_mean_base_quality: None,
max_no_call_fraction: 0.0,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.max_no_call_fraction, 0.0);
}
#[test]
fn test_filter_low_min_base_quality() {
let filter = Filter {
io: BamIoOptions {
input: PathBuf::from("input.bam"),
output: PathBuf::from("output.bam"),
},
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![3],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.2],
min_base_quality: Some(2),
min_mean_base_quality: None,
max_no_call_fraction: 0.1,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
assert_eq!(filter.min_base_quality, Some(2));
}
use crate::sam::builder::SamBuilder;
use std::io::Write;
use tempfile::TempDir;
fn create_test_reference(dir: &TempDir) -> PathBuf {
let ref_path = dir.path().join("ref.fa");
let mut file = std::fs::File::create(&ref_path).expect("failed to create reference file");
writeln!(file, ">chr1").expect("failed to write FASTA header");
writeln!(file, "{}", "A".repeat(1000)).expect("failed to write FASTA sequence");
file.flush().expect("failed to flush reference file");
ref_path
}
fn read_bam_records(path: &std::path::Path) -> Result<Vec<RecordBuf>> {
let mut reader = noodles::bam::io::reader::Builder.build_from_path(path)?;
let header = reader.read_header()?;
let mut records = Vec::new();
for result in reader.records() {
let record = result?;
let record_buf = RecordBuf::try_from_alignment_record(&header, &record)?;
records.push(record_buf);
}
Ok(records)
}
#[allow(clippy::too_many_arguments)]
fn create_simplex_consensus_record(
builder: &mut SamBuilder,
name: &str,
pos: usize,
bases: &str,
quals: &[u8],
read_depth: u8,
read_error: f32,
base_depths: &[i16],
base_errors: &[i16],
) {
let _ = builder
.add_frag()
.name(name)
.bases(bases)
.quals(quals)
.contig(0) .start(pos)
.attr("cD", read_depth)
.attr("cM", read_depth)
.attr("cE", read_error)
.attr("cd", base_depths.to_vec()) .attr("ce", base_errors.to_vec()) .build();
}
fn get_read_name(record: &RecordBuf) -> Option<String> {
record.name().map(|n| String::from_utf8_lossy(n.as_ref()).to_string())
}
#[allow(clippy::too_many_arguments)]
fn run_filter_command(
input: &std::path::Path,
output: &std::path::Path,
reference: &std::path::Path,
threads: usize,
min_reads: Vec<usize>,
max_read_error_rate: Vec<f64>,
max_base_error_rate: Vec<f64>,
min_base_quality: Option<u8>,
) -> Result<()> {
let cmd = Filter {
io: BamIoOptions { input: input.to_path_buf(), output: output.to_path_buf() },
reference: Some(reference.to_path_buf()),
min_reads,
max_read_error_rate,
max_base_error_rate,
min_base_quality,
min_mean_base_quality: None,
max_no_call_fraction: 0.2,
reverse_per_base_tags: false,
threading: ThreadingOptions::new(threads),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")
}
#[test]
fn test_filter_execute_basic_simplex() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"read1",
100,
"AAAA",
&[30, 30, 30, 30],
10, 0.01, &[10, 10, 10, 10],
&[0, 0, 0, 0], );
create_simplex_consensus_record(
&mut builder,
"read2",
200,
"AAAA",
&[30, 30, 30, 30],
2, 0.01,
&[2, 2, 2, 2],
&[0, 0, 0, 0],
);
create_simplex_consensus_record(
&mut builder,
"read3",
300,
"AAAA",
&[30, 30, 30, 30],
10,
0.5, &[10, 10, 10, 10],
&[5, 5, 5, 5], );
builder.write(&input_path)?;
run_filter_command(
&input_path,
&output_path,
&ref_path,
1, vec![5], vec![0.1], vec![0.3], Some(10), )?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1, "Only read1 should pass filtering");
assert_eq!(get_read_name(&records[0]), Some("read1".to_string()));
Ok(())
}
#[test]
fn test_filter_execute_base_masking() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"read1",
100,
"AAAAAAAAAA", &[30, 5, 30, 30, 30, 30, 30, 30, 30, 30], 10,
0.01,
&[10, 10, 10, 10, 10, 10, 10, 10, 10, 10],
&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![1],
max_read_error_rate: vec![0.5],
max_base_error_rate: vec![0.5],
min_base_quality: Some(20), min_mean_base_quality: None,
max_no_call_fraction: 0.2, reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1);
let record = &records[0];
let seq: Vec<u8> = record.sequence().as_ref().to_vec();
assert_eq!(seq, b"ANAAAAAAAA");
let quals: Vec<u8> = record.quality_scores().as_ref().to_vec();
assert_eq!(quals, vec![30, 2, 30, 30, 30, 30, 30, 30, 30, 30]);
Ok(())
}
#[test]
fn test_filter_execute_single_vs_multi_threaded() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_single = dir.path().join("output_single.bam");
let output_multi = dir.path().join("output_multi.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..50 {
let depth: u8 = if i % 3 == 0 { 3 } else { 10 }; let error: f32 = if i % 5 == 0 { 0.3 } else { 0.01 };
create_simplex_consensus_record(
&mut builder,
&format!("read{i}"),
(i * 10 + 100) % 800 + 1, "AAAA",
&[30, 30, 30, 30],
depth,
error,
&[depth as i16, depth as i16, depth as i16, depth as i16],
&[0, 0, 0, 0],
);
}
builder.write(&input_path)?;
run_filter_command(
&input_path,
&output_single,
&ref_path,
1, vec![5],
vec![0.1],
vec![0.3],
Some(10),
)?;
run_filter_command(
&input_path,
&output_multi,
&ref_path,
4, vec![5],
vec![0.1],
vec![0.3],
Some(10),
)?;
let records_single = read_bam_records(&output_single)?;
let records_multi = read_bam_records(&output_multi)?;
assert_eq!(
records_single.len(),
records_multi.len(),
"Single and multi-threaded should produce same number of records"
);
let names_single: std::collections::HashSet<_> =
records_single.iter().filter_map(get_read_name).collect();
let names_multi: std::collections::HashSet<_> =
records_multi.iter().filter_map(get_read_name).collect();
assert_eq!(
names_single, names_multi,
"Single and multi-threaded should produce same reads"
);
Ok(())
}
#[test]
fn test_filter_execute_with_rejects() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let rejects_path = dir.path().join("rejects.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"pass",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
create_simplex_consensus_record(
&mut builder,
"fail",
200,
"AAAA",
&[30, 30, 30, 30],
2,
0.01,
&[2, 2, 2, 2],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.2,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(rejects_path.clone()),
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let passed = read_bam_records(&output_path)?;
assert_eq!(passed.len(), 1);
assert_eq!(get_read_name(&passed[0]), Some("pass".to_string()));
let rejected = read_bam_records(&rejects_path)?;
assert_eq!(rejected.len(), 1);
assert_eq!(get_read_name(&rejected[0]), Some("fail".to_string()));
Ok(())
}
#[test]
fn test_filter_execute_max_no_call_filtering() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"pass",
100,
"AAAAAAAAAA", &[30, 30, 5, 30, 30, 30, 30, 30, 30, 30], 10,
0.01,
&[10, 10, 10, 10, 10, 10, 10, 10, 10, 10],
&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
);
create_simplex_consensus_record(
&mut builder,
"fail",
200,
"AAAAAAAAAA", &[5, 5, 5, 5, 5, 30, 30, 30, 30, 30], 10,
0.01,
&[10, 10, 10, 10, 10, 10, 10, 10, 10, 10],
&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![1],
max_read_error_rate: vec![1.0],
max_base_error_rate: vec![1.0],
min_base_quality: Some(20), min_mean_base_quality: None,
max_no_call_fraction: 0.2, reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1, "Only read with <=20% Ns should pass");
assert_eq!(get_read_name(&records[0]), Some("pass".to_string()));
Ok(())
}
#[test]
fn test_filter_execute_min_mean_base_quality() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"high_qual",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
create_simplex_consensus_record(
&mut builder,
"low_qual",
200,
"AAAA",
&[15, 15, 15, 15],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![1],
max_read_error_rate: vec![1.0],
max_base_error_rate: vec![1.0],
min_base_quality: Some(10),
min_mean_base_quality: Some(20.0), max_no_call_fraction: 1.0,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1);
assert_eq!(get_read_name(&records[0]), Some("high_qual".to_string()));
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn create_duplex_consensus_record(
builder: &mut SamBuilder,
name: &str,
pos: usize,
bases: &str,
quals: &[u8],
ab_depth: u8,
ba_depth: u8,
ab_error: f32,
ba_error: f32,
ab_base_depths: &[i16],
ba_base_depths: &[i16],
) {
let _ = builder
.add_frag()
.name(name)
.bases(bases)
.quals(quals)
.contig(0) .start(pos)
.attr("aD", ab_depth)
.attr("bD", ba_depth)
.attr("aM", ab_depth)
.attr("bM", ba_depth)
.attr("aE", ab_error)
.attr("bE", ba_error)
.attr("ad", ab_base_depths.to_vec())
.attr("bd", ba_base_depths.to_vec())
.attr("ae", vec![0i16; bases.len()]) .attr("be", vec![0i16; bases.len()]) .build();
}
#[test]
fn test_filter_execute_duplex_consensus() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_duplex_consensus_record(
&mut builder,
"duplex_pass",
100,
"AAAA",
&[30, 30, 30, 30],
10,
10, 0.01,
0.01, &[10, 10, 10, 10],
&[10, 10, 10, 10],
);
create_duplex_consensus_record(
&mut builder,
"duplex_fail_ab",
200,
"AAAA",
&[30, 30, 30, 30],
2,
10, 0.01,
0.01,
&[2, 2, 2, 2],
&[10, 10, 10, 10],
);
create_duplex_consensus_record(
&mut builder,
"duplex_fail_ba",
300,
"AAAA",
&[30, 30, 30, 30],
10,
2, 0.01,
0.01,
&[10, 10, 10, 10],
&[2, 2, 2, 2],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5, 5, 5], max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1, "Only duplex_pass should pass filtering");
assert_eq!(get_read_name(&records[0]), Some("duplex_pass".to_string()));
Ok(())
}
#[test]
fn test_filter_execute_non_template_mode() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"pair1",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
create_simplex_consensus_record(
&mut builder,
"pair1",
200,
"AAAA",
&[30, 30, 30, 30],
2, 0.01,
&[2, 2, 2, 2],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: false, rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1);
Ok(())
}
#[test]
fn test_filter_execute_with_stats_output() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let stats_path = dir.path().join("stats.txt");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"pass1",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
create_simplex_consensus_record(
&mut builder,
"fail1",
200,
"AAAA",
&[30, 30, 30, 30],
2,
0.01,
&[2, 2, 2, 2],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: Some(stats_path.clone()),
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
assert!(stats_path.exists(), "Stats file should be created");
let stats_content = std::fs::read_to_string(&stats_path)?;
assert!(
stats_content.contains("passed")
|| stats_content.contains("failed")
|| stats_content.contains('1'),
"Stats should contain filtering results"
);
Ok(())
}
#[test]
fn test_filter_execute_parallel_with_many_templates() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_single = dir.path().join("output_single.bam");
let output_multi = dir.path().join("output_multi.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..125 {
let depth: u8 = if i % 4 == 0 { 3 } else { 10 };
let error: f32 = if i % 7 == 0 { 0.25 } else { 0.02 };
create_simplex_consensus_record(
&mut builder,
&format!("read{i}"),
(i * 2 + 10) % 900 + 1,
"AAAAAAAA",
&[30, 30, 30, 30, 30, 30, 30, 30],
depth,
error,
&[depth as i16; 8],
&[0; 8],
);
}
builder.write(&input_path)?;
let cmd_single = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_single.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd_single.execute("test")?;
let cmd_multi = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_multi.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::new(4),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd_multi.execute("test")?;
let records_single = read_bam_records(&output_single)?;
let records_multi = read_bam_records(&output_multi)?;
assert_eq!(
records_single.len(),
records_multi.len(),
"Single and multi-threaded should produce same number of records"
);
assert!(!records_single.is_empty(), "Should have some passing reads");
assert!(records_single.len() < 125, "Should have filtered out some reads");
Ok(())
}
#[test]
fn test_filter_execute_regenerates_tags() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"read1",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![1],
max_read_error_rate: vec![1.0],
max_base_error_rate: vec![1.0],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 1.0,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1);
Ok(())
}
#[test]
fn test_filter_execute_reverse_per_base_tags() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"read1",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![1],
max_read_error_rate: vec![1.0],
max_base_error_rate: vec![1.0],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 1.0,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1);
Ok(())
}
#[test]
fn test_filter_execute_parallel_with_rejects() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let rejects_path = dir.path().join("rejects.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..75 {
let depth: u8 = if i % 3 == 0 { 2 } else { 10 }; create_simplex_consensus_record(
&mut builder,
&format!("read{i}"),
(i * 5 + 10) % 900 + 1,
"AAAA",
&[30, 30, 30, 30],
depth,
0.01,
&[depth as i16; 4],
&[0; 4],
);
}
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::new(4), compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(rejects_path.clone()), stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let passed = read_bam_records(&output_path)?;
let rejected = read_bam_records(&rejects_path)?;
assert!(!passed.is_empty(), "Should have passing reads");
assert!(!rejected.is_empty(), "Should have rejected reads");
assert_eq!(passed.len() + rejected.len(), 75, "Total should match input");
Ok(())
}
#[test]
fn test_filter_execute_duplex_single_vs_multi_threaded() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_single = dir.path().join("output_single.bam");
let output_multi = dir.path().join("output_multi.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..50 {
let ab_depth: u8 = if i % 3 == 0 { 3 } else { 10 };
let ba_depth: u8 = if i % 5 == 0 { 2 } else { 8 };
create_duplex_consensus_record(
&mut builder,
&format!("duplex{i}"),
(i * 10 + 100) % 800 + 1,
"AAAA",
&[30, 30, 30, 30],
ab_depth,
ba_depth,
0.01,
0.01,
&[ab_depth as i16; 4],
&[ba_depth as i16; 4],
);
}
builder.write(&input_path)?;
let cmd_single = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_single.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5, 5, 5], max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd_single.execute("test")?;
let cmd_multi = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_multi.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5, 5, 5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::new(4),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd_multi.execute("test")?;
let records_single = read_bam_records(&output_single)?;
let records_multi = read_bam_records(&output_multi)?;
assert_eq!(
records_single.len(),
records_multi.len(),
"Single and multi-threaded duplex filtering should match"
);
Ok(())
}
#[test]
fn test_filter_execute_non_template_mode_duplex() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let rejects_path = dir.path().join("rejects.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..5 {
create_duplex_consensus_record(
&mut builder,
&format!("duplex_pass{i}"),
100 + i * 10,
"AAAA",
&[30, 30, 30, 30],
10, 8, 0.01,
0.01,
&[10, 10, 10, 10],
&[8, 8, 8, 8],
);
}
for i in 0..5 {
create_duplex_consensus_record(
&mut builder,
&format!("duplex_fail{i}"),
200 + i * 10,
"AAAA",
&[30, 30, 30, 30],
2, 8,
0.01,
0.01,
&[2, 2, 2, 2],
&[8, 8, 8, 8],
);
}
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5, 5, 5], max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: true,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: false, rejects: Some(rejects_path.clone()),
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let passed = read_bam_records(&output_path)?;
let rejected = read_bam_records(&rejects_path)?;
assert_eq!(passed.len(), 5, "5 duplex reads should pass");
assert_eq!(rejected.len(), 5, "5 duplex reads should fail");
Ok(())
}
#[test]
fn test_filter_execute_with_supplementary_records() -> Result<()> {
use crate::sam::builder::RecordBuilder;
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let rejects_path = dir.path().join("rejects.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"read_with_supp",
100,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
builder.write(&input_path)?;
let bam_path = input_path.clone();
let mut reader = noodles::bam::io::reader::Builder.build_from_path(&bam_path)?;
let header = reader.read_header()?;
let records: Vec<_> = reader.record_bufs(&header).collect::<std::io::Result<Vec<_>>>()?;
let mut writer = noodles::bam::io::writer::Builder.build_from_path(&input_path)?;
writer.write_header(&header)?;
for record in &records {
writer.write_alignment_record(&header, record)?;
}
let supp = RecordBuilder::new()
.name("read_with_supp")
.sequence("CCCC")
.qualities(&[30, 30, 30, 30])
.supplementary(true)
.reference_sequence_id(0)
.alignment_start(500)
.tag("cD", 10u8)
.tag("cM", 10u8)
.tag("cE", 0.01f32)
.tag("cd", vec![10i16, 10, 10, 10])
.tag("ce", vec![0i16, 0, 0, 0])
.build();
writer.write_alignment_record(&header, &supp)?;
let supp_fail = RecordBuilder::new()
.name("read_with_supp")
.sequence("GGGG")
.qualities(&[30, 30, 30, 30])
.supplementary(true)
.reference_sequence_id(0)
.alignment_start(600)
.tag("cD", 2u8) .tag("cM", 2u8)
.tag("cE", 0.5f32) .tag("cd", vec![2i16, 2, 2, 2])
.tag("ce", vec![0i16, 0, 0, 0])
.build();
writer.write_alignment_record(&header, &supp_fail)?;
drop(writer);
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(rejects_path.clone()),
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let passed = read_bam_records(&output_path)?;
let rejected = read_bam_records(&rejects_path)?;
assert_eq!(passed.len(), 2, "Primary + passing supplementary should pass");
assert_eq!(rejected.len(), 1, "Failing supplementary should be rejected");
Ok(())
}
#[test]
fn test_filter_execute_parallel_with_supplementary() -> Result<()> {
use crate::sam::builder::RecordBuilder;
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let rejects_path = dir.path().join("rejects.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..75 {
create_simplex_consensus_record(
&mut builder,
&format!("read{i}"),
100 + i * 10,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
}
builder.write(&input_path)?;
let bam_path = input_path.clone();
let mut reader = noodles::bam::io::reader::Builder.build_from_path(&bam_path)?;
let header = reader.read_header()?;
let records: Vec<_> = reader.record_bufs(&header).collect::<std::io::Result<Vec<_>>>()?;
let mut writer = noodles::bam::io::writer::Builder.build_from_path(&input_path)?;
writer.write_header(&header)?;
for record in &records {
writer.write_alignment_record(&header, record)?;
let name = record.name().map(|n| String::from_utf8_lossy(n.as_ref()).to_string());
if let Some(ref n) = name {
if n.ends_with('0') || n.ends_with('5') {
let supp = RecordBuilder::new()
.name(n)
.sequence("CCCC")
.qualities(&[30, 30, 30, 30])
.supplementary(true)
.reference_sequence_id(0)
.alignment_start(800)
.tag("cD", 10u8)
.tag("cM", 10u8)
.tag("cE", 0.01f32)
.tag("cd", vec![10i16, 10, 10, 10])
.tag("ce", vec![0i16, 0, 0, 0])
.build();
writer.write_alignment_record(&header, &supp)?;
let supp_fail = RecordBuilder::new()
.name(n)
.sequence("GGGG")
.qualities(&[30, 30, 30, 30])
.supplementary(true)
.reference_sequence_id(0)
.alignment_start(900)
.tag("cD", 2u8) .tag("cM", 2u8)
.tag("cE", 0.5f32)
.tag("cd", vec![2i16, 2, 2, 2])
.tag("ce", vec![0i16, 0, 0, 0])
.build();
writer.write_alignment_record(&header, &supp_fail)?;
}
}
}
drop(writer);
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::new(4),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: Some(rejects_path.clone()),
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let passed = read_bam_records(&output_path)?;
let rejected = read_bam_records(&rejects_path)?;
assert!(passed.len() > 75, "Should have primaries + passing supplementaries");
assert!(!rejected.is_empty(), "Should have rejected supplementaries");
Ok(())
}
#[test]
fn test_validate_error_rate_ordering_ab_ba() {
let cmd = Filter {
io: BamIoOptions { input: PathBuf::from("test.bam"), output: PathBuf::from("out.bam") },
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![5, 5, 5],
max_read_error_rate: vec![0.1, 0.2, 0.1], max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
let result = cmd.validate_parameters();
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains("max-read-error-rate for AB must be <= BA")
);
}
#[test]
fn test_validate_base_error_rate_ordering_ab_ba() {
let cmd = Filter {
io: BamIoOptions { input: PathBuf::from("test.bam"), output: PathBuf::from("out.bam") },
reference: Some(PathBuf::from("ref.fa")),
min_reads: vec![5, 5, 5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.1, 0.3, 0.2], min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
let result = cmd.validate_parameters();
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains("max-base-error-rate for AB must be <= BA")
);
}
#[test]
fn test_filter_execute_parallel_reverse_per_base_tags() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..75 {
create_simplex_consensus_record(
&mut builder,
&format!("read{i}"),
100 + i * 10,
"AAAA",
&[30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10],
&[0, 0, 0, 0],
);
}
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5],
max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: true,
threading: ThreadingOptions::new(4),
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 75);
Ok(())
}
#[test]
fn test_filter_execute_duplex_parallel_processing() -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
for i in 0..75 {
let ab_depth: u8 = if i % 3 == 0 { 3 } else { 10 }; let ba_depth: u8 = if i % 5 == 0 { 2 } else { 8 };
create_duplex_consensus_record(
&mut builder,
&format!("duplex{i}"),
100 + i * 10,
"AAAA",
&[30, 30, 30, 30],
ab_depth,
ba_depth,
0.01,
0.01,
&[ab_depth as i16; 4],
&[ba_depth as i16; 4],
);
}
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path.clone(), output: output_path.clone() },
reference: Some(ref_path.clone()),
min_reads: vec![5, 5, 5], max_read_error_rate: vec![0.1],
max_base_error_rate: vec![0.3],
min_base_quality: Some(10),
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading: ThreadingOptions::new(4), compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert!(
!records.is_empty() && records.len() < 75,
"Some duplex records should pass, some fail"
);
Ok(())
}
#[rstest]
#[case::default(ThreadingOptions::none())]
#[case::pipeline_1(ThreadingOptions::new(1))]
#[case::pipeline_2(ThreadingOptions::new(2))]
fn test_threading_modes(#[case] threading: ThreadingOptions) -> Result<()> {
let dir = TempDir::new()?;
let ref_path = create_test_reference(&dir);
let input_path = dir.path().join("input.bam");
let output_path = dir.path().join("output.bam");
let mut builder = SamBuilder::with_single_ref("chr1", 1000);
create_simplex_consensus_record(
&mut builder,
"read1",
100,
"AAAAAAAAAA",
&[30, 30, 30, 30, 30, 30, 30, 30, 30, 30],
10,
0.01,
&[10, 10, 10, 10, 10, 10, 10, 10, 10, 10],
&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
);
builder.write(&input_path)?;
let cmd = Filter {
io: BamIoOptions { input: input_path, output: output_path.clone() },
reference: Some(ref_path),
min_reads: vec![1],
max_read_error_rate: vec![0.5],
max_base_error_rate: vec![0.5],
min_base_quality: None,
min_mean_base_quality: None,
max_no_call_fraction: 0.5,
reverse_per_base_tags: false,
threading,
compression: CompressionOptions { compression_level: 1 },
filter_by_template: true,
rejects: None,
stats: None,
require_single_strand_agreement: false,
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
};
cmd.execute("test")?;
let records = read_bam_records(&output_path)?;
assert_eq!(records.len(), 1, "Should have 1 record");
Ok(())
}
#[test]
fn test_check_filters_raw_no_call_fraction_mode() -> Result<()> {
use crate::consensus_filter::FilterThresholds;
use crate::sam::builder::RecordBuilder;
use crate::vendored::bam_codec::encoder::encode_record_buf;
use noodles::sam::Header;
let mut record = RecordBuilder::new()
.sequence("AANNTTGGCC") .qualities(&[30, 30, 30, 30, 30, 30, 30, 30, 30, 30])
.build();
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'c', b'D']),
noodles::sam::alignment::record_buf::data::field::Value::from(10u8),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'c', b'E']),
noodles::sam::alignment::record_buf::data::field::Value::from(0.01f32),
);
let header = Header::default();
let mut raw = Vec::new();
encode_record_buf(&mut raw, &header, &record)?;
let aux = crate::sort::bam_fields::aux_data_slice(&raw);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.1 };
let result = Filter::check_filters_raw(&raw, aux, &thresholds, None, 0.2)?;
assert!(result, "Should pass with 2/10 Ns and threshold 0.2");
let result = Filter::check_filters_raw(&raw, aux, &thresholds, None, 0.19)?;
assert!(!result, "Should fail with 2/10 Ns and threshold 0.19");
Ok(())
}
#[test]
fn test_check_filters_raw_no_call_count_mode_pass() -> Result<()> {
use crate::consensus_filter::FilterThresholds;
use crate::sam::builder::RecordBuilder;
use crate::vendored::bam_codec::encoder::encode_record_buf;
use noodles::sam::Header;
let mut record = RecordBuilder::new()
.sequence("AANNNTTGGC") .qualities(&[30, 30, 30, 30, 30, 30, 30, 30, 30, 30])
.build();
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'c', b'D']),
noodles::sam::alignment::record_buf::data::field::Value::from(10u8),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'c', b'E']),
noodles::sam::alignment::record_buf::data::field::Value::from(0.01f32),
);
let header = Header::default();
let mut raw = Vec::new();
encode_record_buf(&mut raw, &header, &record)?;
let aux = crate::sort::bam_fields::aux_data_slice(&raw);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.1 };
let result = Filter::check_filters_raw(&raw, aux, &thresholds, None, 5.0)?;
assert!(result, "Should pass with 3 Ns and count threshold 5.0");
let result = Filter::check_filters_raw(&raw, aux, &thresholds, None, 3.0)?;
assert!(result, "Should pass with 3 Ns and count threshold 3.0 (boundary)");
Ok(())
}
#[test]
fn test_check_filters_raw_no_call_count_mode_fail() -> Result<()> {
use crate::consensus_filter::FilterThresholds;
use crate::sam::builder::RecordBuilder;
use crate::vendored::bam_codec::encoder::encode_record_buf;
use noodles::sam::Header;
let mut record = RecordBuilder::new()
.sequence("AANNNTTGGC") .qualities(&[30, 30, 30, 30, 30, 30, 30, 30, 30, 30])
.build();
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'c', b'D']),
noodles::sam::alignment::record_buf::data::field::Value::from(10u8),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'c', b'E']),
noodles::sam::alignment::record_buf::data::field::Value::from(0.01f32),
);
let header = Header::default();
let mut raw = Vec::new();
encode_record_buf(&mut raw, &header, &record)?;
let aux = crate::sort::bam_fields::aux_data_slice(&raw);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.1 };
let result = Filter::check_filters_raw(&raw, aux, &thresholds, None, 2.0)?;
assert!(!result, "Should fail with 3 Ns and count threshold 2.0");
Ok(())
}
#[test]
fn test_check_duplex_filters_raw_no_call_count_mode() -> Result<()> {
use crate::consensus_filter::FilterThresholds;
use crate::sam::builder::RecordBuilder;
use crate::vendored::bam_codec::encoder::encode_record_buf;
use noodles::sam::Header;
let mut record = RecordBuilder::new()
.sequence("AANNNTTGGC") .qualities(&[30, 30, 30, 30, 30, 30, 30, 30, 30, 30])
.build();
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'a', b'D']),
noodles::sam::alignment::record_buf::data::field::Value::from(10u8),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'b', b'D']),
noodles::sam::alignment::record_buf::data::field::Value::from(8u8),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'a', b'E']),
noodles::sam::alignment::record_buf::data::field::Value::from(0.01f32),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'b', b'E']),
noodles::sam::alignment::record_buf::data::field::Value::from(0.01f32),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'a', b'M']),
noodles::sam::alignment::record_buf::data::field::Value::from(10u8),
);
record.data_mut().insert(
noodles::sam::alignment::record::data::field::Tag::from([b'b', b'M']),
noodles::sam::alignment::record_buf::data::field::Value::from(8u8),
);
let header = Header::default();
let mut raw = Vec::new();
encode_record_buf(&mut raw, &header, &record)?;
let aux = crate::sort::bam_fields::aux_data_slice(&raw);
let thresholds =
FilterThresholds { min_reads: 5, max_read_error_rate: 0.1, max_base_error_rate: 0.1 };
let result = Filter::check_duplex_filters_raw(
&raw,
aux,
&thresholds,
&thresholds,
&thresholds,
None,
5.0,
)?;
assert!(result, "Should pass with 3 Ns and count threshold 5.0");
let result = Filter::check_duplex_filters_raw(
&raw,
aux,
&thresholds,
&thresholds,
&thresholds,
None,
2.0,
)?;
assert!(!result, "Should fail with 3 Ns and count threshold 2.0");
Ok(())
}
#[test]
fn test_process_record_raw_no_reference() -> Result<()> {
use crate::vendored::bam_codec::encoder::encode_record_buf;
let record = RecordBuilder::new()
.name("unmapped_read")
.sequence("ACGTACGT")
.qualities(&[35, 35, 35, 35, 35, 35, 35, 35])
.unmapped(true)
.consensus_tags(
crate::sam::builder::ConsensusTagsBuilder::new()
.per_base_depths(&[10; 8])
.per_base_errors(&[0; 8]),
)
.build();
let header = Header::default();
let mut raw = Vec::new();
encode_record_buf(&mut raw, &header, &record)?;
let config = FilterConfig::new(&[1], &[0.025], &[0.1], None, None, 0.2);
let (bases_masked, pass) = Filter::process_record_raw(
&mut raw, &config, None, &header, false, None, false, None, 0.2, )?;
assert_eq!(bases_masked, 0, "No bases should be masked with good tags");
assert!(pass, "Unmapped record should pass filtering without reference");
Ok(())
}
#[test]
fn test_process_record_raw_no_reference_mapped_fails() -> Result<()> {
use crate::vendored::bam_codec::encoder::encode_record_buf;
let sam_builder = crate::sam::builder::SamBuilder::with_single_ref("chr1", 1000);
let record = RecordBuilder::new()
.name("mapped_read")
.sequence("ACGTACGT")
.qualities(&[35; 8])
.reference_sequence_id(0)
.alignment_start(100)
.mapping_quality(60)
.cigar("8M")
.consensus_tags(
crate::sam::builder::ConsensusTagsBuilder::new()
.per_base_depths(&[10; 8])
.per_base_errors(&[0; 8]),
)
.build();
let mut raw = Vec::new();
encode_record_buf(&mut raw, &sam_builder.header, &record)?;
let config = FilterConfig::new(&[1], &[0.025], &[0.1], None, None, 0.2);
let result = Filter::process_record_raw(
&mut raw,
&config,
None, &sam_builder.header,
false,
None,
false,
None,
0.2,
);
assert!(result.is_err(), "Should fail for mapped reads without reference");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("--ref is required"),
"Error should mention --ref requirement, got: {err_msg}"
);
Ok(())
}
#[rstest]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1"], false)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--reverse-per-base-tags"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--reverse-per-base-tags", "true"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--reverse-per-base-tags", "false"], false)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--reverse-per-base-tags=true"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--reverse-per-base-tags=false"], false)]
fn test_reverse_per_base_tags_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = Filter::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.reverse_per_base_tags, expected);
}
#[rstest]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--filter-by-template"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--filter-by-template", "true"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--filter-by-template", "false"], false)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--filter-by-template=true"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--filter-by-template=false"], false)]
fn test_filter_by_template_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = Filter::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.filter_by_template, expected);
}
#[rstest]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1"], false)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--require-single-strand-agreement"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--require-single-strand-agreement", "true"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--require-single-strand-agreement", "false"], false)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--require-single-strand-agreement=true"], true)]
#[case(&["filter", "-i", "in.bam", "-o", "out.bam", "-M", "1", "--require-single-strand-agreement=false"], false)]
fn test_require_single_strand_agreement_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = Filter::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.require_single_strand_agreement, expected);
}
}