use crate::bam_io::{
RawBamWriter, create_bam_reader_for_pipeline_with_opts, create_bam_writer,
create_optional_bam_writer, create_raw_bam_reader_with_opts, create_raw_bam_writer,
};
use crate::consensus_caller::{
ConsensusCaller, ConsensusCallingStats, ConsensusOutput, RejectionReason,
};
use crate::logging::{OperationTimer, log_consensus_summary};
use crate::mi_group::{MiGroup, MiGroupBatch, MiGroupIterator, MiGrouper};
use crate::overlapping_consensus::{
AgreementStrategy, CorrectionStats, DisagreementStrategy, OverlappingBasesConsensusCaller,
apply_overlapping_consensus,
};
use crate::progress::ProgressTracker;
use crate::read_info::LibraryIndex;
use crate::unified_pipeline::{
GroupKeyConfig, Grouper, MemoryEstimate, run_bam_pipeline_from_reader,
};
use anyhow::{Context, Result, bail};
use clap::Parser;
use fgoxide::io::DelimFile;
use fgumi_raw_bam::RawRecord;
use crate::per_thread_accumulator::PerThreadAccumulator;
use crate::sam::{SamTag, header_as_unsorted};
use crate::vanilla_consensus_caller::{VanillaUmiConsensusCaller, VanillaUmiConsensusOptions};
use parking_lot::Mutex;
use log::info;
use noodles::sam::Header;
use noodles::sam::alignment::record::data::field::Tag;
use std::io;
use std::io::Write as IoWrite;
use std::sync::Arc;
use crate::commands::command::Command;
use crate::commands::common::{
BamIoOptions, CompressionOptions, ConsensusCallingOptions, OverlappingConsensusOptions,
QueueMemoryOptions, ReadGroupOptions, RejectsOptions, SchedulerOptions, StatsOptions,
ThreadingOptions, build_pipeline_config,
};
use crate::commands::consensus_runner::{
ConsensusStatsOps, create_unmapped_consensus_header, log_overlapping_stats,
};
use super::common::{MethylationRef, load_methylation_reference};
struct SimplexProcessedBatch {
consensus_output: ConsensusOutput,
groups_count: u64,
stats: ConsensusCallingStats,
overlapping_stats: Option<CorrectionStats>,
}
impl MemoryEstimate for SimplexProcessedBatch {
fn estimate_heap_size(&self) -> usize {
self.consensus_output.data.capacity()
}
}
#[derive(Default)]
struct CollectedSimplexMetrics {
stats: ConsensusCallingStats,
overlapping_stats: Option<CorrectionStats>,
groups_processed: u64,
}
#[derive(Debug, Parser)]
#[command(
name = "simplex",
about = "\x1b[38;5;180m[CONSENSUS]\x1b[0m \x1b[36mCall simplex consensus sequences from UMI-grouped reads\x1b[0m",
long_about = r#"
Calls consensus sequences from reads with the same unique molecular tag.
Reads with the same unique molecular tag are examined base-by-base to assess the likelihood of each base in the
source molecule. The likelihood model is as follows:
1. First, the base qualities are adjusted. The base qualities are assumed to represent the probability of a
sequencing error (i.e. the sequencer observed the wrong base present on the cluster/flowcell/well). The base
quality scores are converted to probabilities incorporating a probability representing the chance of an error
from the time the unique molecular tags were integrated to just prior to sequencing. The resulting probability
is the error rate of all processes from right after integrating the molecular tag through to the end of
sequencing.
2. Next, a consensus sequence is called for all reads with the same unique molecular tag base-by-base. For a
given base position in the reads, the likelihoods that an A, C, G, or T is the base for the underlying
source molecule respectively are computed by multiplying the likelihood of each read observing the base
position being considered. The probability of error (from 1.) is used when the observed base does not match
the hypothesized base for the underlying source molecule, while one minus that probability is used otherwise.
The computed likelihoods are normalized by dividing them by the sum of all four likelihoods to produce a
posterior probability, namely the probability that the source molecule was an A, C, G, or T from just after
integrating molecular tag through to sequencing, given the observations. The base with the maximum posterior
probability as the consensus call, and the posterior probability is used as its raw base quality.
3. Finally, the consensus raw base quality is modified by incorporating the probability of an error prior to
integrating the unique molecular tags. Therefore, the probability used for the final consensus base
quality is the posterior probability of the source molecule having the consensus base given the observed
reads with the same molecular tag, all the way from sample extraction and through sample and library
preparation, through preparing the library for sequencing (e.g. amplification, target selection), and finally,
through sequencing.
This tool assumes that reads with the same tag are grouped together (consecutive in the file). Also, this tool
calls each end of a pair independently, and does not jointly call bases that overlap within a pair. Insertion or
deletion errors in the reads are not considered in the consensus model.
The consensus reads produced are unaligned, due to the difficulty and error-prone nature of inferring the consensus
alignment. Consensus reads should therefore be aligned after, which should not be too expensive as likely there
are far fewer consensus reads than input raw reads.
Particular attention should be paid to setting the --min-reads parameter as this can have a dramatic effect on
both results and runtime. For libraries with low duplication rates (e.g. 100-300X exomes libraries) in which it
is desirable to retain singleton reads while making consensus reads from sets of duplicates, --min-reads=1 is
appropriate. For libraries with high duplication rates where it is desirable to only produce consensus reads
supported by 2+ reads to allow error correction, --min-reads=2 or higher is appropriate. After generation,
consensus reads can be further filtered using the filter tool. As such it is always safe to run
with --min-reads=1 and filter later, but filtering at this step can improve performance significantly.
Consensus reads have a number of additional optional tags set in the resulting BAM file. The tags break down into
those that are single-valued per read:
consensus depth [cD] (int) : the maximum depth of raw reads at any point in the consensus read
consensus min depth [cM] (int) : the minimum depth of raw reads at any point in the consensus read
consensus error rate [cE] (float): the fraction of bases in raw reads disagreeing with the final consensus calls
And those that have a value per base:
consensus depth [cd] (short[]): the count of bases contributing to the consensus read at each position
consensus errors [ce] (short[]): the number of bases from raw reads disagreeing with the final consensus base
The per base depths and errors are both capped at 32,767. In all cases no-calls (Ns) and bases below the
--min-input-base-quality are not counted in tag value calculations.
"#
)]
pub struct Simplex {
#[command(flatten)]
pub io: BamIoOptions,
#[command(flatten)]
pub rejects_opts: RejectsOptions,
#[command(flatten)]
pub stats_opts: StatsOptions,
#[command(flatten)]
pub read_group: ReadGroupOptions,
#[command(flatten)]
pub consensus: ConsensusCallingOptions,
#[command(flatten)]
pub overlapping: OverlappingConsensusOptions,
#[command(flatten)]
pub threading: ThreadingOptions,
#[command(flatten)]
pub compression: CompressionOptions,
#[arg(short = 'M', long = "min-reads")]
pub min_reads: usize,
#[arg(long = "max-reads")]
pub max_reads: Option<usize>,
#[command(flatten)]
pub scheduler_opts: SchedulerOptions,
#[command(flatten)]
pub queue_memory: QueueMemoryOptions,
#[arg(long = "methylation-mode", value_enum)]
pub methylation_mode: Option<crate::commands::common::MethylationModeArg>,
#[arg(long = "ref")]
pub reference: Option<std::path::PathBuf>,
}
impl Command for Simplex {
fn execute(&self, command_line: &str) -> Result<()> {
let timer = OperationTimer::new("Calling simplex consensus");
self.io.validate()?;
if let Some(max) = self.max_reads {
if max < self.min_reads {
bail!("--max-reads ({}) must be >= --min-reads ({})", max, self.min_reads);
}
}
info!("Starting Simplex");
info!("Input: {}", self.io.input.display());
info!("Output: {}", self.io.output.display());
info!("Min reads: {}", self.min_reads);
if let Some(max) = self.max_reads {
info!("Max reads: {max}");
}
info!("Error rate pre-UMI: Q{}", self.consensus.error_rate_pre_umi);
info!("Error rate post-UMI: Q{}", self.consensus.error_rate_post_umi);
let writer_threads = self.threading.num_threads();
let cell_tag = Tag::from(SamTag::CB);
let track_rejects = self.rejects_opts.is_enabled();
if self.reference.is_some() && self.methylation_mode.is_none() {
bail!("--ref requires --methylation-mode to be set");
}
let methylation_mode =
crate::commands::common::resolve_methylation_mode(self.methylation_mode);
let overlapping_enabled = self.overlapping.is_enabled();
if overlapping_enabled {
info!("Overlapping consensus calling enabled");
}
info!("Processing reads and calling consensus (streaming)...");
if let Some(threads) = self.threading.threads {
let (reader, header) = create_bam_reader_for_pipeline_with_opts(
&self.io.input,
self.io.pipeline_reader_opts(),
)?;
let output_header = create_unmapped_consensus_header(
&header,
&self.read_group.read_group_id,
"Read group",
command_line,
)?;
let read_name_prefix = self.read_group.prefix_or_from_header(&header);
let methylation_ref: MethylationRef =
load_methylation_reference(methylation_mode, &self.reference, &header)?;
let result = self.execute_threads_mode(
threads,
reader,
header,
output_header,
read_name_prefix,
track_rejects,
methylation_ref,
methylation_mode,
);
timer.log_completion(0); return result;
}
let (mut raw_reader, header) =
create_raw_bam_reader_with_opts(&self.io.input, 1, self.io.pipeline_reader_opts())?;
let output_header = create_unmapped_consensus_header(
&header,
&self.read_group.read_group_id,
"Read group",
command_line,
)?;
let read_name_prefix = self.read_group.prefix_or_from_header(&header);
let methylation_ref: MethylationRef =
load_methylation_reference(methylation_mode, &self.reference, &header)?;
let mut writer = create_bam_writer(
&self.io.output,
&output_header,
writer_threads,
self.compression.compression_level,
)?;
let mut rejects_writer = create_optional_bam_writer(
self.rejects_opts.rejects.as_ref(),
&header,
writer_threads,
self.compression.compression_level,
)?;
let options = VanillaUmiConsensusOptions {
tag: "MI".to_string(),
error_rate_pre_umi: self.consensus.error_rate_pre_umi,
error_rate_post_umi: self.consensus.error_rate_post_umi,
min_input_base_quality: self.consensus.min_input_base_quality,
min_reads: self.min_reads,
max_reads: self.max_reads,
produce_per_base_tags: self.consensus.output_per_base_tags,
seed: Some(42), trim: self.consensus.trim,
min_consensus_base_quality: self.consensus.min_consensus_base_quality,
cell_tag: Some(cell_tag),
methylation_mode,
};
let mut caller = VanillaUmiConsensusCaller::new_with_rejects_tracking(
read_name_prefix.clone(),
self.read_group.read_group_id.clone(),
options.clone(),
track_rejects,
);
if let Some((ref reference, ref ref_names)) = methylation_ref {
caller.set_reference(Arc::clone(reference), Arc::clone(ref_names));
}
let mut merged_overlapping_stats = CorrectionStats::new();
let mut record_count: usize = 0;
let progress = ProgressTracker::new("Processed records").with_interval(1_000_000);
let raw_record_iter = std::iter::from_fn(move || {
let mut record = RawRecord::new();
match raw_reader.read_record(&mut record) {
Ok(0) => None, Ok(_) => Some(Ok(record)),
Err(e) => Some(Err(e.into())),
}
});
let mi_group_iter = MiGroupIterator::new(raw_record_iter, "MI").with_cell_tag(Some(*b"CB"));
let mut overlapping_caller = if overlapping_enabled {
Some(OverlappingBasesConsensusCaller::new(
AgreementStrategy::Consensus,
DisagreementStrategy::Consensus,
))
} else {
None
};
for result in mi_group_iter {
let (umi, mut records) = result.context("Failed to read MI group")?;
if let Some(ref mut oc) = overlapping_caller {
apply_overlapping_consensus(&mut records, oc)?;
}
let output = caller
.consensus_reads(records)
.with_context(|| format!("Failed to call consensus for UMI: {umi}"))?;
let batch_size = output.count;
record_count += batch_size;
writer.get_mut().write_all(&output.data).context("Failed to write consensus read")?;
if let Some(ref mut rw) = rejects_writer {
for raw_record in caller.rejected_reads() {
let block_size = raw_record.len() as u32;
rw.get_mut()
.write_all(&block_size.to_le_bytes())
.context("Failed to write rejected read block size")?;
rw.get_mut().write_all(raw_record).context("Failed to write rejected read")?;
}
caller.clear_rejected_reads();
}
progress.log_if_needed(batch_size as u64);
}
let merged_stats = caller.statistics();
if let Some(ref oc) = overlapping_caller {
merged_overlapping_stats.merge(oc.stats());
}
progress.log_final();
writer.into_inner().finish().context("Failed to finish output BAM")?;
if overlapping_enabled {
log_overlapping_stats(&merged_overlapping_stats);
}
info!("Consensus calling complete");
info!("Total records processed: {record_count}");
let metrics = merged_stats.to_metrics();
let consensus_count = metrics.consensus_reads;
log_consensus_summary(&metrics);
if let Some(stats_path) = &self.stats_opts.stats {
let kv_metrics = metrics.to_kv_metrics();
DelimFile::default()
.write_tsv(stats_path, kv_metrics)
.with_context(|| format!("Failed to write statistics: {}", stats_path.display()))?;
info!("Wrote statistics to: {}", stats_path.display());
}
timer.log_completion(consensus_count);
if let Some(rw) = rejects_writer {
rw.into_inner().finish().context("Failed to finish rejects file")?;
info!("Rejected reads written successfully");
}
Ok(())
}
}
impl Simplex {
#[expect(clippy::too_many_arguments, reason = "pipeline setup needs all configuration")]
fn execute_threads_mode(
&self,
num_threads: usize,
reader: Box<dyn std::io::Read + Send>,
input_header: Header,
output_header: Header,
read_name_prefix: String,
track_rejects: bool,
methylation_ref: MethylationRef,
methylation_mode: fgumi_consensus::MethylationMode,
) -> Result<()> {
let mut pipeline_config = build_pipeline_config(
&self.scheduler_opts,
&self.compression,
&self.queue_memory,
num_threads,
)?;
let collected_metrics = PerThreadAccumulator::<CollectedSimplexMetrics>::new(num_threads);
let collected_metrics_for_serialize = Arc::clone(&collected_metrics);
let tag_str = "MI".to_string();
let min_reads = self.min_reads;
let max_reads = self.max_reads;
let error_rate_pre_umi = self.consensus.error_rate_pre_umi;
let error_rate_post_umi = self.consensus.error_rate_post_umi;
let min_input_base_quality = self.consensus.min_input_base_quality;
let output_per_base_tags = self.consensus.output_per_base_tags;
let min_consensus_base_quality = self.consensus.min_consensus_base_quality;
let trim = self.consensus.trim;
let overlapping_enabled = self.overlapping.is_enabled();
let read_group_id = self.read_group.read_group_id.clone();
let cell_tag = Tag::from(SamTag::CB);
let batch_size = 50;
let options = VanillaUmiConsensusOptions {
tag: tag_str.clone(),
error_rate_pre_umi,
error_rate_post_umi,
min_input_base_quality,
min_reads,
max_reads,
produce_per_base_tags: output_per_base_tags,
seed: Some(42),
trim,
min_consensus_base_quality,
cell_tag: Some(cell_tag),
methylation_mode,
};
let rejects_writer: Option<Arc<Mutex<Option<RawBamWriter>>>> = if track_rejects {
if let Some(path) = self.rejects_opts.rejects.as_ref() {
let writer_threads = self.threading.num_threads();
let rejects_header = header_as_unsorted(&input_header);
let w = create_raw_bam_writer(
path,
&rejects_header,
writer_threads,
self.compression.compression_level,
)?;
Some(Arc::new(Mutex::new(Some(w))))
} else {
None
}
} else {
None
};
let rejects_writer_for_process = rejects_writer.as_ref().map(Arc::clone);
let library_index = LibraryIndex::from_header(&input_header);
pipeline_config.group_key_config = Some(GroupKeyConfig::new(library_index, cell_tag));
let pipeline_result = run_bam_pipeline_from_reader(
pipeline_config,
reader,
input_header,
&self.io.output,
Some(output_header.clone()),
move |_header: &Header| {
Box::new(MiGrouper::new("MI", batch_size).with_cell_tag(Some(*b"CB")))
as Box<dyn Grouper<Group = MiGroupBatch> + Send>
},
move |batch: MiGroupBatch| -> io::Result<SimplexProcessedBatch> {
let mut caller = VanillaUmiConsensusCaller::new_with_rejects_tracking(
read_name_prefix.clone(),
read_group_id.clone(),
options.clone(),
track_rejects,
);
if let Some((ref reference, ref ref_names)) = methylation_ref {
caller.set_reference(Arc::clone(reference), Arc::clone(ref_names));
}
let mut overlapping_caller = if overlapping_enabled {
Some(OverlappingBasesConsensusCaller::new(
AgreementStrategy::Consensus,
DisagreementStrategy::Consensus,
))
} else {
None
};
let mut all_output = ConsensusOutput::default();
let mut batch_stats = ConsensusCallingStats::new();
let mut batch_overlapping = CorrectionStats::new();
let groups_count = batch.groups.len() as u64;
let flush_raw_records = |recs: &[RawRecord]| -> io::Result<()> {
if let Some(ref rw_arc) = rejects_writer_for_process {
if !recs.is_empty() {
let mut guard = rw_arc.lock();
if let Some(w) = guard.as_mut() {
for raw in recs {
w.write_raw_record(raw.as_ref())?;
}
}
}
}
Ok(())
};
let flush_byte_records = |recs: &[Vec<u8>]| -> io::Result<()> {
if let Some(ref rw_arc) = rejects_writer_for_process {
if !recs.is_empty() {
let mut guard = rw_arc.lock();
if let Some(w) = guard.as_mut() {
for raw in recs {
w.write_raw_record(raw)?;
}
}
}
}
Ok(())
};
for MiGroup { mi, records: mut raw_records } in batch.groups {
caller.clear();
if raw_records.len() < min_reads {
batch_stats.record_input(raw_records.len());
batch_stats.record_rejection(
RejectionReason::InsufficientReads,
raw_records.len(),
);
if track_rejects {
flush_raw_records(&raw_records)?;
}
continue;
}
if let Some(ref mut oc) = overlapping_caller {
oc.reset_stats();
if apply_overlapping_consensus(&mut raw_records, oc).is_err() {
batch_overlapping.merge(oc.stats());
batch_stats.record_input(raw_records.len());
batch_stats.record_rejection(RejectionReason::Other, raw_records.len());
if track_rejects {
flush_raw_records(&raw_records)?;
}
continue;
}
batch_overlapping.merge(oc.stats());
}
let batch_output = caller.consensus_reads(raw_records).map_err(|e| {
io::Error::other(format!("Consensus error for MI {mi}: {e}"))
})?;
all_output.merge(batch_output);
batch_stats.merge(&caller.statistics());
if track_rejects {
flush_byte_records(&caller.take_rejected_reads())?;
}
}
Ok(SimplexProcessedBatch {
consensus_output: all_output,
groups_count,
stats: batch_stats,
overlapping_stats: if overlapping_enabled {
Some(batch_overlapping)
} else {
None
},
})
},
move |processed: SimplexProcessedBatch,
_header: &Header,
output: &mut Vec<u8>|
-> io::Result<u64> {
let batch_stats = processed.stats;
let batch_overlapping = processed.overlapping_stats;
let groups_count = processed.groups_count;
collected_metrics_for_serialize.with_slot(|m| {
m.stats.merge(&batch_stats);
if let Some(o) = batch_overlapping {
m.overlapping_stats.get_or_insert_with(CorrectionStats::new).merge(&o);
}
m.groups_processed += groups_count;
});
let count = processed.consensus_output.count as u64;
output.extend_from_slice(&processed.consensus_output.data);
Ok(count)
},
);
let rejects_finish_result = rejects_writer
.and_then(|rw_arc| rw_arc.lock().take())
.map(|writer| writer.finish().context("Failed to finish rejects file"));
let groups_processed = match (pipeline_result, rejects_finish_result) {
(Ok(groups_processed), Some(Ok(()))) => {
info!("Rejected reads streamed to rejects file during processing");
groups_processed
}
(Ok(groups_processed), None) => groups_processed,
(Ok(_), Some(Err(finish_err))) => return Err(finish_err),
(Err(pipeline_err), Some(Err(finish_err))) => {
return Err(anyhow::anyhow!(
"Pipeline error: {pipeline_err}; additionally failed to finish rejects file: {finish_err}"
));
}
(Err(pipeline_err), _) => {
return Err(anyhow::anyhow!("Pipeline error: {pipeline_err}"));
}
};
let mut total_groups = 0u64;
let mut merged_stats = ConsensusCallingStats::new();
let mut merged_overlapping_stats = CorrectionStats::new();
for slot in collected_metrics.slots() {
let m = slot.lock();
total_groups += m.groups_processed;
merged_stats.merge(&m.stats);
if let Some(ref ocs) = m.overlapping_stats {
merged_overlapping_stats.merge(ocs);
}
}
if self.overlapping.is_enabled() {
log_overlapping_stats(&merged_overlapping_stats);
}
info!("Consensus calling complete");
info!("Total MI groups processed: {total_groups}");
info!("Total groups processed by pipeline: {groups_processed}");
let metrics = merged_stats.to_metrics();
let consensus_count = metrics.consensus_reads;
log_consensus_summary(&metrics);
if let Some(stats_path) = &self.stats_opts.stats {
let kv_metrics = metrics.to_kv_metrics();
DelimFile::default()
.write_tsv(stats_path, kv_metrics)
.with_context(|| format!("Failed to write statistics: {}", stats_path.display()))?;
info!("Wrote statistics to: {}", stats_path.display());
}
info!("Wrote {consensus_count} consensus reads");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::consensus::ConsensusKvMetric;
use noodles::sam::alignment::record::data::field::Tag;
use noodles::sam::alignment::record_buf::RecordBuf;
use rstest::rstest;
use std::path::PathBuf;
fn create_test_simplex() -> Simplex {
create_simplex_with_paths(PathBuf::from("test.bam"), PathBuf::from("out.bam"))
}
fn create_simplex_with_paths(input: PathBuf, output: PathBuf) -> Simplex {
Simplex {
io: BamIoOptions { input, output, async_reader: false },
rejects_opts: RejectsOptions::default(),
stats_opts: StatsOptions::default(),
read_group: ReadGroupOptions::default(),
consensus: ConsensusCallingOptions {
output_per_base_tags: false,
min_consensus_base_quality: 0,
..ConsensusCallingOptions::default()
},
overlapping: OverlappingConsensusOptions { consensus_call_overlapping_bases: false },
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
queue_memory: QueueMemoryOptions::default(),
min_reads: 1,
max_reads: None,
scheduler_opts: SchedulerOptions::default(),
methylation_mode: None,
reference: None,
}
}
#[test]
fn test_default_parameters() {
let cmd = create_test_simplex();
assert_eq!(cmd.consensus.error_rate_pre_umi, 45);
assert_eq!(cmd.consensus.error_rate_post_umi, 40);
assert_eq!(cmd.consensus.min_input_base_quality, 10);
assert_eq!(cmd.min_reads, 1);
assert_eq!(cmd.max_reads, None);
assert!(!cmd.consensus.output_per_base_tags);
assert!(cmd.threading.is_single_threaded());
assert!(!cmd.overlapping.consensus_call_overlapping_bases);
}
#[test]
fn test_custom_parameters() {
let mut cmd = create_test_simplex();
cmd.min_reads = 3;
cmd.max_reads = Some(10);
cmd.consensus.output_per_base_tags = true;
cmd.threading = ThreadingOptions::new(4);
assert_eq!(cmd.min_reads, 3);
assert_eq!(cmd.max_reads, Some(10));
assert!(cmd.consensus.output_per_base_tags);
assert_eq!(cmd.threading.threads, Some(4));
}
#[test]
fn test_missing_input_file_fails() {
let cmd = create_test_simplex();
let result = cmd.execute("test");
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("does not exist"));
}
use crate::sam::builder::SamBuilder;
use noodles::sam::alignment::record_buf::data::field::Value as BufValue;
use std::collections::HashMap;
use tempfile::TempDir;
struct TestPaths {
#[allow(dead_code)]
dir: TempDir,
pub input: PathBuf,
pub output: PathBuf,
pub rejects: PathBuf,
pub stats: PathBuf,
}
impl TestPaths {
fn new() -> Result<Self> {
let dir = TempDir::new()?;
Ok(Self {
input: dir.path().join("input.bam"),
output: dir.path().join("output.bam"),
rejects: dir.path().join("rejects.bam"),
stats: dir.path().join("stats.txt"),
dir,
})
}
}
fn read_bam_records(path: &std::path::Path) -> Result<Vec<RecordBuf>> {
let mut reader = noodles::bam::io::reader::Builder.build_from_path(path)?;
let header = reader.read_header()?;
let mut records = Vec::new();
for result in reader.records() {
let record = result?;
let record_buf = RecordBuf::try_from_alignment_record(&header, &record)?;
records.push(record_buf);
}
Ok(records)
}
fn get_string_tag(record: &RecordBuf, tag_name: &str) -> Option<String> {
let tag_bytes = tag_name.as_bytes();
let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
record.data().get(&tag).and_then(|v| {
if let noodles::sam::alignment::record_buf::data::field::Value::String(s) = v {
Some(String::from_utf8_lossy(s).to_string())
} else {
None
}
})
}
fn get_int_tag(record: &RecordBuf, tag_name: &str) -> Option<i64> {
let tag_bytes = tag_name.as_bytes();
let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
record.data().get(&tag).and_then(|v| match v {
noodles::sam::alignment::record_buf::data::field::Value::Int8(i) => Some(*i as i64),
noodles::sam::alignment::record_buf::data::field::Value::UInt8(i) => Some(*i as i64),
noodles::sam::alignment::record_buf::data::field::Value::Int16(i) => Some(*i as i64),
noodles::sam::alignment::record_buf::data::field::Value::UInt16(i) => Some(*i as i64),
noodles::sam::alignment::record_buf::data::field::Value::Int32(i) => Some(*i as i64),
noodles::sam::alignment::record_buf::data::field::Value::UInt32(i) => Some(*i as i64),
_ => None,
})
}
#[test]
fn test_end_to_end_paired_end_workflow() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
for idx in 0..1000 {
let umi = format!("GATTACA:{idx}");
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from(umi.clone()));
attrs.insert("RX", BufValue::from("ACGT-TGCA"));
builder.add_pair_with_attrs(
&format!("READ:{}", 2 * idx),
None,
None,
true,
true,
&attrs,
);
builder.add_pair_with_attrs(
&format!("READ:{}", 2 * idx + 1),
None,
None,
true,
true,
&attrs,
);
}
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.read_group.read_group_id = "ABC".to_string();
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 2000, "Should have 2000 consensus reads");
let first_count = records.iter().filter(|r| r.flags().is_first_segment()).count();
let second_count = records.iter().filter(|r| r.flags().is_last_segment()).count();
assert_eq!(first_count, 1000, "Should have 1000 first-of-pair reads");
assert_eq!(second_count, 1000, "Should have 1000 second-of-pair reads");
for record in &records {
assert_eq!(record.sequence().len(), 100, "Sequence length should be 100");
let rg = get_string_tag(record, "RG");
assert_eq!(rg.as_deref(), Some("ABC"), "Read group should be ABC");
let cd_tag = get_int_tag(record, "cD");
assert!(cd_tag.is_some(), "cD tag should be present");
assert_eq!(
cd_tag.expect("cD tag should have a value"),
2,
"Depth should be 2 (2 reads per UMI)"
);
let mi_tag = get_string_tag(record, "MI");
assert!(mi_tag.is_some(), "MI tag should be preserved");
}
Ok(())
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::threaded(ThreadingOptions::new(2))]
fn test_end_to_end_single_end_workflow(#[case] threading: ThreadingOptions) -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs_a = HashMap::new();
attrs_a.insert("RX", BufValue::from("ACGT"));
attrs_a.insert("MI", BufValue::from("shared"));
attrs_a.insert("CB", BufValue::from("AB"));
builder.add_frag_with_attrs("a1", None, true, &attrs_a);
builder.add_frag_with_attrs("a2", None, true, &attrs_a);
builder.add_frag_with_attrs("a3", None, true, &attrs_a);
let mut attrs_b = HashMap::new();
attrs_b.insert("RX", BufValue::from("ACGT"));
attrs_b.insert("MI", BufValue::from("shared"));
attrs_b.insert("CB", BufValue::from("CD"));
builder.add_frag_with_attrs("b1", None, true, &attrs_b);
builder.add_frag_with_attrs("b2", None, true, &attrs_b);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.read_group.read_group_id = "ABC".to_string();
cmd.threading = threading;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 2, "Should have 2 consensus reads (one per cell barcode)");
assert!(records.iter().all(|r| !r.flags().is_segmented()), "All reads should be unpaired");
let observed_cbs: std::collections::HashSet<_> =
records.iter().filter_map(|r| get_string_tag(r, "CB")).collect();
assert_eq!(
observed_cbs,
["AB".to_string(), "CD".to_string()].into_iter().collect(),
"Both cell barcodes should produce independent consensus reads"
);
for record in &records {
let rg = get_string_tag(record, "RG");
assert_eq!(rg.as_deref(), Some("ABC"), "Read group should be ABC");
let cd_tag = get_int_tag(record, "cD");
assert!(cd_tag.is_some(), "cD tag should be present");
assert!(cd_tag.expect("cD tag should have a value") >= 2, "Depth should be at least 2");
assert_eq!(record.sequence().len(), 100, "Sequence length should be 100");
}
Ok(())
}
#[test]
fn test_min_reads_filtering() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs1 = HashMap::new();
attrs1.insert("MI", BufValue::from("group1"));
builder.add_frag_with_attrs("a1", None, true, &attrs1);
builder.add_frag_with_attrs("a2", None, true, &attrs1);
builder.add_frag_with_attrs("a3", None, true, &attrs1);
let mut attrs2 = HashMap::new();
attrs2.insert("MI", BufValue::from("group2"));
builder.add_frag_with_attrs("b1", None, true, &attrs2);
builder.add_frag_with_attrs("b2", None, true, &attrs2);
let mut attrs3 = HashMap::new();
attrs3.insert("MI", BufValue::from("group3"));
builder.add_frag_with_attrs("c1", None, true, &attrs3);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.min_reads = 3;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
let cd_tag = get_int_tag(&records[0], "cD");
assert_eq!(
cd_tag.expect("cD tag should have a value"),
3,
"Depth should be 3 for the passing group"
);
Ok(())
}
#[test]
fn test_per_base_tags_generation() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("test_umi"));
for i in 0..5 {
builder.add_frag_with_attrs(&format!("read{i}"), None, true, &attrs);
}
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.consensus.output_per_base_tags = true;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
let record = &records[0];
let cd_tag = get_int_tag(record, "cD");
assert!(cd_tag.is_some(), "cD tag should be present");
assert_eq!(cd_tag.expect("cD tag should have a value"), 5, "Max depth should be 5");
let cm_tag = get_int_tag(record, "cM");
assert!(cm_tag.is_some(), "cM tag should be present");
assert_eq!(cm_tag.expect("cM tag should have a value"), 5, "Min depth should be 5");
let tag_bytes = "cd".as_bytes();
let cd_array_tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
assert!(record.data().get(&cd_array_tag).is_some(), "cd per-base tag should be present");
let tag_bytes = "ce".as_bytes();
let ce_array_tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
assert!(record.data().get(&ce_array_tag).is_some(), "ce per-base tag should be present");
Ok(())
}
#[test]
fn test_multithreading() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
for idx in 0..100 {
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from(format!("umi_{idx}")));
builder.add_frag_with_attrs(&format!("read_{idx}a"), None, true, &attrs);
builder.add_frag_with_attrs(&format!("read_{idx}b"), None, true, &attrs);
}
let paths = TestPaths::new()?;
let output_multi_path = paths.dir.path().join("output_multi.bam");
builder.write(&paths.input)?;
let cmd_single = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd_single.execute("test")?;
let mut cmd_multi =
create_simplex_with_paths(paths.input.clone(), output_multi_path.clone());
cmd_multi.threading = ThreadingOptions::new(4);
cmd_multi.execute("test")?;
let records_single = read_bam_records(&paths.output)?;
let records_multi = read_bam_records(&output_multi_path)?;
assert_eq!(
records_single.len(),
records_multi.len(),
"Single and multi-threaded should produce same number of records"
);
assert_eq!(records_single.len(), 100, "Should have 100 consensus reads");
Ok(())
}
#[test]
fn test_max_reads_downsampling() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("large_group"));
for i in 0..20 {
builder.add_frag_with_attrs(&format!("read{i}"), None, true, &attrs);
}
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.max_reads = Some(10);
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
let cd_tag = get_int_tag(&records[0], "cD");
assert!(cd_tag.is_some(), "cD tag should be present");
assert!(
cd_tag.expect("cD tag should have a value") <= 10,
"Max depth should be <= 10 due to downsampling"
);
Ok(())
}
#[test]
fn test_max_reads_less_than_min_reads_fails() {
let builder = SamBuilder::new_unmapped();
let paths = TestPaths::new().expect("failed to create test paths");
builder.write(&paths.input).expect("failed to write test BAM");
let mut cmd = create_simplex_with_paths(paths.input.clone(), PathBuf::from("out.bam"));
cmd.min_reads = 5;
cmd.max_reads = Some(3);
let result = cmd.execute("test");
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("--max-reads"));
assert!(error_msg.contains("--min-reads"));
}
#[test]
fn test_statistics_file_generation() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs1 = HashMap::new();
attrs1.insert("MI", BufValue::from("pass"));
builder.add_frag_with_attrs("p1", None, true, &attrs1);
builder.add_frag_with_attrs("p2", None, true, &attrs1);
builder.add_frag_with_attrs("p3", None, true, &attrs1);
let mut attrs2 = HashMap::new();
attrs2.insert("MI", BufValue::from("fail"));
builder.add_frag_with_attrs("f1", None, true, &attrs2);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.stats_opts.stats = Some(paths.stats.clone());
cmd.min_reads = 2;
cmd.execute("test")?;
assert!(&paths.stats.exists(), "Stats file should exist");
let kv_metrics: Vec<ConsensusKvMetric> =
DelimFile::default().read_tsv(&paths.stats).expect("Failed to read metrics file");
assert!(!kv_metrics.is_empty(), "Should have metrics");
let keys: Vec<&str> = kv_metrics.iter().map(|m| m.key.as_str()).collect();
assert!(keys.contains(&"raw_reads_considered"), "Should have raw_reads_considered");
assert!(keys.contains(&"raw_reads_rejected"), "Should have raw_reads_rejected");
assert!(keys.contains(&"consensus_reads_emitted"), "Should have consensus_reads_emitted");
let raw_reads = kv_metrics
.iter()
.find(|m| m.key == "raw_reads_considered")
.expect("Should have raw_reads_considered");
let count: u64 = raw_reads.value.parse().expect("Should be a number");
assert!(count > 0, "Stats should have total reads");
Ok(())
}
#[test]
fn test_custom_read_name_prefix() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("test"));
builder.add_frag_with_attrs("read1", None, true, &attrs);
builder.add_frag_with_attrs("read2", None, true, &attrs);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.read_group.read_name_prefix = Some("MYCONSENSUS".to_string());
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
let read_name = records[0].name().map(std::string::ToString::to_string).unwrap_or_default();
assert!(read_name.starts_with("MYCONSENSUS"), "Read name should start with custom prefix");
Ok(())
}
#[test]
fn test_rejects_file_generation() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs1 = HashMap::new();
attrs1.insert("MI", BufValue::from("pass"));
builder.add_frag_with_attrs("p1", None, true, &attrs1);
builder.add_frag_with_attrs("p2", None, true, &attrs1);
builder.add_frag_with_attrs("p3", None, true, &attrs1);
let mut attrs2 = HashMap::new();
attrs2.insert("MI", BufValue::from("fail"));
builder.add_frag_with_attrs("f1", None, true, &attrs2);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.rejects_opts.rejects = Some(paths.rejects.clone());
cmd.min_reads = 2;
cmd.execute("test")?;
assert!(&paths.rejects.exists(), "Rejects file should exist");
let reject_records = read_bam_records(&paths.rejects)?;
assert_eq!(reject_records.len(), 1, "Should have 1 rejected read");
let umi = get_string_tag(&reject_records[0], "MI");
assert_eq!(umi.as_deref(), Some("fail"), "Rejected read should have 'fail' UMI");
Ok(())
}
#[test]
fn test_multithreaded_with_rejects() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
for idx in 0..50 {
let mut attrs = HashMap::new();
if idx % 3 == 0 {
attrs.insert("MI", BufValue::from(format!("umi_{idx}")));
builder.add_frag_with_attrs(&format!("read_{idx}"), None, true, &attrs);
} else {
attrs.insert("MI", BufValue::from(format!("umi_{idx}")));
builder.add_frag_with_attrs(&format!("read_{idx}a"), None, true, &attrs);
builder.add_frag_with_attrs(&format!("read_{idx}b"), None, true, &attrs);
}
}
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.rejects_opts.rejects = Some(paths.rejects.clone());
cmd.min_reads = 2;
cmd.threading = ThreadingOptions::new(4);
cmd.execute("test")?;
let output_records = read_bam_records(&paths.output)?;
let reject_records = read_bam_records(&paths.rejects)?;
assert!(output_records.len() >= 30, "Should have at least 30 consensus reads");
assert!(reject_records.len() >= 15, "Should have at least 15 rejected reads");
Ok(())
}
#[test]
fn test_multithreaded_with_stats() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
for idx in 0..100 {
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from(format!("umi_{idx}")));
builder.add_frag_with_attrs(&format!("read_{idx}a"), None, true, &attrs);
builder.add_frag_with_attrs(&format!("read_{idx}b"), None, true, &attrs);
}
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.stats_opts.stats = Some(paths.stats.clone());
cmd.threading = ThreadingOptions::new(4);
cmd.execute("test")?;
assert!(&paths.stats.exists(), "Stats file should exist");
let kv_metrics: Vec<ConsensusKvMetric> =
DelimFile::default().read_tsv(&paths.stats).expect("Failed to read metrics file");
assert!(!kv_metrics.is_empty(), "Should have metrics");
let raw_reads = kv_metrics
.iter()
.find(|m| m.key == "raw_reads_considered")
.expect("Should have raw_reads_considered");
let count: u64 = raw_reads.value.parse().expect("Should be a number");
assert!(count > 0, "Stats should have total reads");
let consensus = kv_metrics
.iter()
.find(|m| m.key == "consensus_reads_emitted")
.expect("Should have consensus_reads_emitted");
let count: u64 = consensus.value.parse().expect("Should be a number");
assert!(count > 0, "Stats should have consensus count");
Ok(())
}
#[test]
fn test_overlapping_consensus_calling_paired() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("test_umi"));
builder.add_pair_with_attrs("pair1", None, None, true, true, &attrs);
builder.add_pair_with_attrs("pair2", None, None, true, true, &attrs);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.overlapping.consensus_call_overlapping_bases = true;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 2, "Should have 2 consensus reads (R1 and R2)");
Ok(())
}
#[test]
fn test_overlapping_consensus_unpaired_read() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("test_umi"));
builder.add_frag_with_attrs("frag1", None, true, &attrs);
builder.add_frag_with_attrs("frag2", None, true, &attrs);
builder.add_frag_with_attrs("frag3", None, true, &attrs);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.overlapping.consensus_call_overlapping_bases = true;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read from unpaired fragments");
Ok(())
}
#[test]
fn test_trim_enabled() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("test_umi"));
builder.add_frag_with_attrs("read1", None, true, &attrs);
builder.add_frag_with_attrs("read2", None, true, &attrs);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.consensus.trim = true;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
Ok(())
}
#[test]
fn test_min_consensus_base_quality() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("test_umi"));
builder.add_frag_with_attrs("read1", None, true, &attrs);
builder.add_frag_with_attrs("read2", None, true, &attrs);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.consensus.min_consensus_base_quality = 30;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
Ok(())
}
#[test]
fn test_reads_without_umi_tag_skipped() -> Result<()> {
let mut builder = SamBuilder::new_unmapped();
let mut attrs_with_umi = HashMap::new();
attrs_with_umi.insert("MI", BufValue::from("has_umi"));
builder.add_frag_with_attrs("with_umi1", None, true, &attrs_with_umi);
builder.add_frag_with_attrs("with_umi2", None, true, &attrs_with_umi);
let attrs_no_umi = HashMap::new();
builder.add_frag_with_attrs("no_umi1", None, true, &attrs_no_umi);
builder.add_frag_with_attrs("no_umi2", None, true, &attrs_no_umi);
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read from reads with UMI only");
Ok(())
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::pipeline_1(ThreadingOptions::new(1))]
#[case::pipeline_2(ThreadingOptions::new(2))]
fn test_threading_modes(#[case] threading: ThreadingOptions) -> Result<()> {
let paths = TestPaths::new()?;
let mut builder = SamBuilder::with_single_ref("chr1", 100);
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("1"));
builder.add_frag_with_attrs("read1", None, true, &attrs);
builder.add_frag_with_attrs("read2", None, true, &attrs);
builder.add_frag_with_attrs("read3", None, true, &attrs);
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.threading = threading;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
Ok(())
}
#[test]
fn test_simplex_processed_batch_memory_estimate() {
let mut data = Vec::with_capacity(1024);
data.extend_from_slice(&[0u8; 100]);
let batch = SimplexProcessedBatch {
consensus_output: ConsensusOutput { data, count: 1 },
groups_count: 1,
stats: ConsensusCallingStats::default(),
overlapping_stats: None,
};
let estimate = batch.estimate_heap_size();
assert_eq!(estimate, 1024, "estimate should match consensus_output capacity");
}
#[rstest]
#[case::single_threaded(ThreadingOptions::none())]
#[case::multi_threaded(ThreadingOptions::new(2))]
fn test_simplex_em_seq_command(#[case] threading: ThreadingOptions) -> Result<()> {
use crate::sam::builder::{Strand, create_test_fasta};
let ref_seq = "C".repeat(200);
let ref_fasta = create_test_fasta(&[("chr1", &ref_seq)])?;
let mut builder = SamBuilder::with_single_ref("chr1", 200);
let mut attrs = HashMap::new();
attrs.insert("MI", BufValue::from("1"));
for i in 0..3 {
let _ = builder
.add_frag()
.name(&format!("r{i}"))
.start(1)
.strand(Strand::Plus)
.bases("CCCCCCCCCC")
.attr("MI", BufValue::from("1"))
.build();
}
let paths = TestPaths::new()?;
builder.write(&paths.input)?;
let mut cmd = create_simplex_with_paths(paths.input.clone(), paths.output.clone());
cmd.methylation_mode = Some(crate::commands::common::MethylationModeArg::EmSeq);
cmd.reference = Some(ref_fasta.path().to_path_buf());
cmd.threading = threading;
cmd.execute("test")?;
let records = read_bam_records(&paths.output)?;
assert_eq!(records.len(), 1, "Should have 1 consensus read");
use noodles::sam::alignment::record_buf::data::field::value::{
Array as BufArray, Value as BufValue,
};
let record = &records[0];
let cu_tag = Tag::from([b'c', b'u']);
let cu_value = record.data().get(&cu_tag).expect("cu tag should be present with EM-Seq");
if let BufValue::Array(BufArray::Int16(cu_vals)) = cu_value {
assert!(
cu_vals.iter().any(|&v| v > 0),
"cu should have non-zero values for methylated reads"
);
} else {
panic!("cu tag should be an i16 array");
}
let ct_tag = Tag::from([b'c', b't']);
let ct_value = record.data().get(&ct_tag).expect("ct tag should be present with EM-Seq");
if let BufValue::Array(BufArray::Int16(ct_vals)) = ct_value {
assert!(
ct_vals.iter().all(|&v| v == 0),
"ct should be all zeros for fully methylated reads"
);
} else {
panic!("ct tag should be an i16 array");
}
Ok(())
}
}