use std::path::PathBuf;
#[cfg(feature = "simplex")]
use std::sync::Arc;
#[cfg(feature = "simplex")]
use crate::logging::OperationTimer;
use crate::unified_pipeline::{BamPipelineConfig, SchedulerStrategy};
use crate::validation::validate_file_exists;
use bytesize::ByteSize;
use clap::Args;
use fgumi_bam_io::is_stdin_path;
#[cfg(feature = "simplex")]
use fgumi_consensus::methylation::RefBaseProvider;
#[cfg(feature = "simplex")]
use log::info;
use noodles::sam::Header;
#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
pub enum MethylationModeArg {
#[value(name = "em-seq")]
EmSeq,
#[value(name = "taps")]
Taps,
}
impl From<MethylationModeArg> for fgumi_consensus::MethylationMode {
fn from(arg: MethylationModeArg) -> Self {
match arg {
MethylationModeArg::EmSeq => Self::EmSeq,
MethylationModeArg::Taps => Self::Taps,
}
}
}
pub fn resolve_methylation_mode(
arg: Option<MethylationModeArg>,
) -> fgumi_consensus::MethylationMode {
arg.map_or(fgumi_consensus::MethylationMode::Disabled, Into::into)
}
#[cfg(feature = "simplex")]
pub type MethylationRef = Option<(
Arc<dyn fgumi_consensus::methylation::RefBaseProvider + Send + Sync>,
Arc<Vec<String>>,
)>;
#[cfg(feature = "simplex")]
pub fn load_methylation_reference(
methylation_mode: fgumi_consensus::MethylationMode,
reference: &Option<PathBuf>,
header: &Header,
) -> anyhow::Result<MethylationRef> {
if !methylation_mode.is_enabled() {
return Ok(None);
}
let mode_name = match methylation_mode {
fgumi_consensus::MethylationMode::EmSeq => "EM-Seq",
fgumi_consensus::MethylationMode::Taps => "TAPs",
fgumi_consensus::MethylationMode::Disabled => unreachable!(),
};
let ref_path = reference
.as_ref()
.ok_or_else(|| anyhow::anyhow!("--ref is required when --methylation-mode is set"))?;
let ref_timer = OperationTimer::new("Loading reference FASTA");
let reference = Arc::new(crate::reference::ReferenceReader::new(ref_path)?);
ref_timer.log_completion(0);
let ref_names: Vec<String> =
header.reference_sequences().keys().map(|name| name.to_string()).collect();
let missing_contigs: Vec<&String> =
ref_names.iter().filter(|name| reference.sequence_for(name).is_none()).collect();
if !missing_contigs.is_empty() {
anyhow::bail!(
"Reference FASTA is missing {} contig(s) from the BAM header: {}",
missing_contigs.len(),
missing_contigs.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(", ")
);
}
info!("{mode_name} mode enabled with {} reference contigs", ref_names.len());
Ok(Some((reference, Arc::new(ref_names))))
}
pub fn add_pg_record(header: Header, command_line: &str) -> anyhow::Result<Header> {
fgumi_bam_io::header::add_pg_record(header, crate::version::VERSION.as_str(), command_line)
}
pub fn add_pg_to_builder(
builder: noodles::sam::header::Builder,
command_line: &str,
) -> anyhow::Result<noodles::sam::header::Builder> {
fgumi_bam_io::header::add_pg_to_builder(builder, crate::version::VERSION.as_str(), command_line)
}
#[derive(Debug, Clone, Default, Args)]
pub struct EmSeqOptions {
#[arg(long = "em-seq", default_value_t = false, requires = "reference")]
pub em_seq: bool,
#[arg(long = "ref")]
pub reference: Option<PathBuf>,
}
#[derive(Debug, Clone, Args)]
pub struct BamIoOptions {
#[arg(short = 'i', long = "input")]
pub input: PathBuf,
#[arg(short = 'o', long = "output")]
pub output: PathBuf,
#[arg(long = "async-reader", default_value_t = false, hide = true)]
pub async_reader: bool,
}
impl Default for BamIoOptions {
fn default() -> Self {
Self { input: PathBuf::new(), output: PathBuf::new(), async_reader: false }
}
}
impl BamIoOptions {
pub fn new(input: impl Into<PathBuf>, output: impl Into<PathBuf>) -> Self {
Self { input: input.into(), output: output.into(), async_reader: false }
}
pub fn pipeline_reader_opts(&self) -> fgumi_bam_io::PipelineReaderOpts {
fgumi_bam_io::PipelineReaderOpts { async_reader: self.async_reader }
}
pub fn validate(&self) -> anyhow::Result<()> {
if !is_stdin_path(&self.input) {
validate_file_exists(&self.input, "Input BAM")?;
}
Ok(())
}
}
#[derive(Debug, Clone, Default, Args)]
pub struct RejectsOptions {
#[arg(short = 'r', long = "rejects")]
pub rejects: Option<PathBuf>,
}
impl RejectsOptions {
#[must_use]
pub fn is_enabled(&self) -> bool {
self.rejects.is_some()
}
}
#[derive(Debug, Clone, Default, Args)]
pub struct StatsOptions {
#[arg(short = 's', long = "stats")]
pub stats: Option<PathBuf>,
}
impl StatsOptions {
#[must_use]
pub fn is_enabled(&self) -> bool {
self.stats.is_some()
}
}
#[derive(Debug, Clone, Args)]
pub struct ConsensusCallingOptions {
#[arg(short = '1', long = "error-rate-pre-umi", default_value = "45")]
pub error_rate_pre_umi: u8,
#[arg(short = '2', long = "error-rate-post-umi", default_value = "40")]
pub error_rate_post_umi: u8,
#[arg(short = 'm', long = "min-input-base-quality", default_value = "10")]
pub min_input_base_quality: u8,
#[arg(short = 'B', long = "output-per-base-tags", default_value = "true", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub output_per_base_tags: bool,
#[arg(long = "trim", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub trim: bool,
#[arg(long = "min-consensus-base-quality", default_value = "2")]
pub min_consensus_base_quality: u8,
}
impl Default for ConsensusCallingOptions {
fn default() -> Self {
Self {
error_rate_pre_umi: 45,
error_rate_post_umi: 40,
min_input_base_quality: 10,
output_per_base_tags: true,
trim: false,
min_consensus_base_quality: 2,
}
}
}
impl ConsensusCallingOptions {
const MAX_PHRED: u8 = 93;
pub fn validate(&self) -> anyhow::Result<()> {
use anyhow::bail;
if self.error_rate_pre_umi > Self::MAX_PHRED {
bail!(
"error-rate-pre-umi ({}) exceeds maximum Phred score ({})",
self.error_rate_pre_umi,
Self::MAX_PHRED
);
}
if self.error_rate_post_umi > Self::MAX_PHRED {
bail!(
"error-rate-post-umi ({}) exceeds maximum Phred score ({})",
self.error_rate_post_umi,
Self::MAX_PHRED
);
}
if self.min_input_base_quality > Self::MAX_PHRED {
bail!(
"min-input-base-quality ({}) exceeds maximum Phred score ({})",
self.min_input_base_quality,
Self::MAX_PHRED
);
}
if self.min_consensus_base_quality < 2 {
bail!(
"min-consensus-base-quality ({}) must be at least 2 (MIN_PHRED)",
self.min_consensus_base_quality
);
}
if self.min_consensus_base_quality > Self::MAX_PHRED {
bail!(
"min-consensus-base-quality ({}) exceeds maximum Phred score ({})",
self.min_consensus_base_quality,
Self::MAX_PHRED
);
}
Ok(())
}
}
#[derive(Debug, Clone, Args)]
pub struct ReadGroupOptions {
#[arg(short = 'p', long = "read-name-prefix")]
pub read_name_prefix: Option<String>,
#[arg(short = 'R', long = "read-group-id", default_value = "A")]
pub read_group_id: String,
}
impl ReadGroupOptions {
#[must_use]
pub fn prefix_or_from_header(&self, header: &noodles::sam::Header) -> String {
self.read_name_prefix
.clone()
.unwrap_or_else(|| crate::consensus_caller::make_prefix_from_header(header))
}
}
impl Default for ReadGroupOptions {
fn default() -> Self {
Self { read_name_prefix: None, read_group_id: "A".to_string() }
}
}
#[derive(Debug, Clone, Args)]
pub struct OverlappingConsensusOptions {
#[arg(long = "consensus-call-overlapping-bases", default_value = "true", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub consensus_call_overlapping_bases: bool,
}
impl Default for OverlappingConsensusOptions {
fn default() -> Self {
Self { consensus_call_overlapping_bases: true }
}
}
impl OverlappingConsensusOptions {
#[must_use]
pub fn is_enabled(&self) -> bool {
self.consensus_call_overlapping_bases
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadingMode {
SingleThreaded,
Threads(usize),
}
impl ThreadingMode {
#[must_use]
pub fn is_parallel(&self) -> bool {
!matches!(self, Self::SingleThreaded)
}
#[must_use]
pub fn num_threads(&self) -> usize {
match self {
Self::SingleThreaded => 1,
Self::Threads(n) => *n,
}
}
}
#[derive(Debug, Clone, Args)]
pub struct ThreadingOptions {
#[arg(long = "threads")]
pub threads: Option<usize>,
}
#[derive(Debug, Clone, Default, Args)]
pub struct CompressionOptions {
#[arg(long, default_value_t = 1, value_parser = clap::value_parser!(u32).range(0..=12))]
pub compression_level: u32,
}
#[derive(Debug, Clone, Default, Args)]
pub struct SchedulerOptions {
#[arg(long = "scheduler", value_enum, default_value_t = SchedulerStrategy::default(), hide = true)]
pub scheduler: SchedulerStrategy,
#[arg(long = "pipeline-stats", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool, hide = true)]
pub pipeline_stats: bool,
#[arg(long = "deadlock-timeout", default_value_t = 10, hide = true)]
pub deadlock_timeout: u64,
#[arg(long = "deadlock-recover", default_value = "false", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool, hide = true)]
pub deadlock_recover: bool,
}
impl SchedulerOptions {
#[must_use]
pub fn strategy(&self) -> SchedulerStrategy {
self.scheduler
}
#[must_use]
pub fn collect_stats(&self) -> bool {
self.pipeline_stats
}
#[must_use]
pub fn deadlock_timeout_secs(&self) -> u64 {
self.deadlock_timeout
}
#[must_use]
pub fn deadlock_recover_enabled(&self) -> bool {
self.deadlock_recover
}
}
impl ThreadingOptions {
pub const DEFAULT_BATCH_SIZE: usize = 100;
#[must_use]
pub fn new(threads: usize) -> Self {
Self { threads: Some(threads) }
}
#[must_use]
pub fn none() -> Self {
Self { threads: None }
}
#[must_use]
pub fn mode(&self) -> ThreadingMode {
match self.threads {
None => ThreadingMode::SingleThreaded,
Some(n) => ThreadingMode::Threads(n),
}
}
#[must_use]
pub fn num_threads(&self) -> usize {
self.mode().num_threads()
}
#[must_use]
pub fn is_parallel(&self) -> bool {
self.mode().is_parallel()
}
#[must_use]
pub fn is_single_threaded(&self) -> bool {
matches!(self.mode(), ThreadingMode::SingleThreaded)
}
#[must_use]
pub fn queue_len(&self) -> usize {
self.num_threads() * 2
}
#[must_use]
pub fn log_message(&self) -> String {
match self.mode() {
ThreadingMode::SingleThreaded => "Single-threaded mode".to_string(),
ThreadingMode::Threads(n) => format!("Using {n} threads"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemoryLimit {
Auto,
Fixed(usize),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MemoryReserve {
Auto,
Fixed(usize),
}
pub(crate) const MIN_MEMORY_PER_THREAD: usize = 256 * 1024 * 1024;
pub(crate) const AUTO_RESERVE_CAP: usize = 10 * 1024 * 1024 * 1024;
fn parse_memory_bytes(s: &str, label: &str) -> Result<usize, String> {
let bytes = parse_memory_size(s).map_err(|e| e.to_string())?;
usize::try_from(bytes).map_err(|_| format!("{label} too large: {bytes}"))
}
pub(crate) fn parse_memory(s: &str) -> Result<MemoryLimit, String> {
let s = s.trim();
if s.eq_ignore_ascii_case("auto") {
return Ok(MemoryLimit::Auto);
}
Ok(MemoryLimit::Fixed(parse_memory_bytes(s, "Memory size")?))
}
pub(crate) fn parse_memory_reserve(s: &str) -> Result<MemoryReserve, String> {
let s = s.trim();
if s.eq_ignore_ascii_case("auto") {
return Ok(MemoryReserve::Auto);
}
Ok(MemoryReserve::Fixed(parse_memory_bytes(s, "Memory reserve")?))
}
pub(crate) fn resolve_reserve(reserve: MemoryReserve, total_memory: usize) -> usize {
match reserve {
MemoryReserve::Fixed(bytes) => bytes,
MemoryReserve::Auto => AUTO_RESERVE_CAP.min(total_memory / 2),
}
}
pub(crate) fn resolve_memory_budget(
limit: MemoryLimit,
reserve: MemoryReserve,
threads: usize,
per_thread: bool,
) -> anyhow::Result<usize> {
resolve_memory_budget_with_total(limit, reserve, threads, per_thread, detect_total_memory())
}
fn resolve_memory_budget_with_total(
limit: MemoryLimit,
reserve: MemoryReserve,
threads: usize,
per_thread: bool,
total: usize,
) -> anyhow::Result<usize> {
if threads == 0 {
anyhow::bail!("--threads must be at least 1");
}
let budget = match limit {
MemoryLimit::Fixed(bytes) => {
if per_thread {
bytes
.checked_mul(threads)
.ok_or_else(|| anyhow::anyhow!("memory limit × {threads} threads overflowed"))?
} else {
bytes
}
}
MemoryLimit::Auto => {
let margin = resolve_reserve(reserve, total);
let available = total.saturating_sub(margin);
let target = if per_thread {
(available / threads)
.max(MIN_MEMORY_PER_THREAD)
.checked_mul(threads)
.ok_or_else(|| anyhow::anyhow!("auto memory budget overflowed"))?
} else {
available.max(MIN_MEMORY_PER_THREAD)
};
let budget = target.min(available);
if budget < target {
log::warn!(
"Auto memory: capping budget to host-available {} ({}/thread target × {} threads \
exceeds it after reserve {}); throughput may drop but the run stays within memory",
ByteSize(budget as u64),
ByteSize(MIN_MEMORY_PER_THREAD as u64),
threads,
ByteSize(margin as u64),
);
}
log::debug!(
"Auto memory: {} of {} ({}/thread × {} threads, reserve {})",
ByteSize(budget as u64),
ByteSize(total as u64),
ByteSize((budget / threads) as u64),
threads,
ByteSize(margin as u64),
);
budget
}
};
if budget > total {
log::warn!(
"Memory budget {} exceeds total host memory {}; this may cause OOM (or, for sort, earlier spill-to-disk)",
ByteSize(budget as u64),
ByteSize(total as u64),
);
}
Ok(budget)
}
#[derive(Debug, Clone, Args)]
pub struct QueueMemoryOptions {
#[arg(long = "max-memory", default_value = "768", value_parser = parse_memory)]
pub max_memory: MemoryLimit,
#[arg(long = "memory-reserve", default_value = "auto", value_parser = parse_memory_reserve)]
pub memory_reserve: MemoryReserve,
#[arg(long = "memory-per-thread", default_value = "true", num_args = 0..=1, default_missing_value = "true", action = clap::ArgAction::Set, value_parser = parse_bool)]
pub memory_per_thread: bool,
}
impl Default for QueueMemoryOptions {
fn default() -> Self {
Self {
max_memory: MemoryLimit::Fixed(768 * 1024 * 1024),
memory_reserve: MemoryReserve::Auto,
memory_per_thread: true,
}
}
}
impl QueueMemoryOptions {
pub fn calculate_memory_limit(&self, num_threads: usize) -> anyhow::Result<u64> {
let bytes = resolve_memory_budget(
self.max_memory,
self.memory_reserve,
num_threads,
self.memory_per_thread,
)?;
Ok(bytes as u64)
}
pub fn log_memory_config(&self, num_threads: usize, total_memory: u64) {
let per_thread = self.memory_per_thread;
if per_thread && num_threads > 1 {
log::info!(
"Queue memory budget: {} total ({}/thread × {} threads)",
ByteSize(total_memory),
ByteSize(total_memory / num_threads as u64),
num_threads
);
} else {
log::info!("Queue memory budget: {} total", ByteSize(total_memory));
}
}
}
pub(crate) fn serialize_raw_bam_records<R: AsRef<[u8]>>(
records: &[R],
output: &mut Vec<u8>,
) -> std::io::Result<u64> {
let additional = records.iter().try_fold(0usize, |acc, record| {
let len = record.as_ref().len();
u32::try_from(len).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("BAM record too large ({len} bytes) for u32 block_size"),
)
})?;
len.checked_add(4).and_then(|frame| acc.checked_add(frame)).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"serialized BAM batch size overflowed usize",
)
})
})?;
output.reserve(additional);
for record in records {
let body = record.as_ref();
let block_size = u32::try_from(body.len()).expect("body length pre-validated to fit u32");
output.extend_from_slice(&block_size.to_le_bytes());
output.extend_from_slice(body);
}
Ok(records.len() as u64)
}
pub(crate) fn parse_bool(s: &str) -> Result<bool, String> {
match s.to_ascii_lowercase().as_str() {
"true" | "t" | "yes" | "y" => Ok(true),
"false" | "f" | "no" | "n" => Ok(false),
_ => Err(format!("Invalid boolean value '{s}'. Expected: true|false|yes|no|y|n|t|f")),
}
}
pub(crate) use crate::system::detect_total_memory;
pub use crate::validation::parse_memory_size;
pub fn build_pipeline_config(
scheduler_opts: &SchedulerOptions,
compression: &CompressionOptions,
queue_memory: &QueueMemoryOptions,
num_threads: usize,
) -> anyhow::Result<BamPipelineConfig> {
let mut config = BamPipelineConfig::auto_tuned(num_threads, compression.compression_level);
config.pipeline.scheduler_strategy = scheduler_opts.strategy();
if scheduler_opts.collect_stats() {
config.pipeline = config.pipeline.with_stats(true);
}
config.pipeline.deadlock_timeout_secs = scheduler_opts.deadlock_timeout_secs();
config.pipeline.deadlock_recover_enabled = scheduler_opts.deadlock_recover_enabled();
let queue_memory_limit_bytes = queue_memory.calculate_memory_limit(num_threads)?;
config.pipeline.queue_memory_limit = queue_memory_limit_bytes;
queue_memory.log_memory_config(num_threads, queue_memory_limit_bytes);
Ok(config)
}
#[cfg(test)]
mod tests {
use super::*;
fn enable_logging() {
let _ =
env_logger::builder().is_test(true).filter_level(log::LevelFilter::Trace).try_init();
}
#[test]
fn test_none_is_single_threaded() {
let opts = ThreadingOptions::none();
assert!(opts.is_single_threaded());
assert!(!opts.is_parallel());
assert_eq!(opts.mode(), ThreadingMode::SingleThreaded);
assert_eq!(opts.threads, None);
}
#[test]
fn test_mode_detection() {
assert_eq!(ThreadingOptions::none().mode(), ThreadingMode::SingleThreaded);
assert_eq!(ThreadingOptions::new(1).mode(), ThreadingMode::Threads(1));
assert_eq!(ThreadingOptions::new(8).mode(), ThreadingMode::Threads(8));
}
#[test]
fn test_num_threads() {
assert_eq!(ThreadingOptions::none().num_threads(), 1);
assert_eq!(ThreadingOptions::new(1).num_threads(), 1);
assert_eq!(ThreadingOptions::new(8).num_threads(), 8);
}
#[test]
fn test_queue_len() {
assert_eq!(ThreadingOptions::new(1).queue_len(), 2);
assert_eq!(ThreadingOptions::new(8).queue_len(), 16);
}
#[test]
fn test_log_message() {
let opts = ThreadingOptions::new(8);
let msg = opts.log_message();
assert!(msg.contains("8 threads"));
let opts = ThreadingOptions::none();
let msg = opts.log_message();
assert!(msg.contains("Single-threaded"));
}
#[test]
fn test_new_uses_pipeline() {
let opts = ThreadingOptions::new(1);
assert!(!opts.is_single_threaded());
assert!(opts.is_parallel());
assert_eq!(opts.threads, Some(1));
}
#[test]
fn test_consensus_calling_options_validate_defaults() {
let opts = ConsensusCallingOptions::default();
assert!(opts.validate().is_ok());
}
#[test]
fn test_consensus_calling_options_validate_valid() {
let opts = ConsensusCallingOptions {
error_rate_pre_umi: 45,
error_rate_post_umi: 40,
min_input_base_quality: 10,
output_per_base_tags: true,
trim: false,
min_consensus_base_quality: 13,
};
assert!(opts.validate().is_ok());
}
#[test]
fn test_consensus_calling_options_validate_error_rate_pre_umi_too_high() {
let opts = ConsensusCallingOptions {
error_rate_pre_umi: 94, ..ConsensusCallingOptions::default()
};
let err = opts.validate().unwrap_err();
assert!(err.to_string().contains("error-rate-pre-umi"));
}
#[test]
fn test_consensus_calling_options_validate_error_rate_post_umi_too_high() {
let opts = ConsensusCallingOptions {
error_rate_post_umi: 94, ..ConsensusCallingOptions::default()
};
let err = opts.validate().unwrap_err();
assert!(err.to_string().contains("error-rate-post-umi"));
}
#[test]
fn test_consensus_calling_options_validate_min_consensus_too_low() {
let opts = ConsensusCallingOptions {
min_consensus_base_quality: 1, ..ConsensusCallingOptions::default()
};
let err = opts.validate().unwrap_err();
assert!(err.to_string().contains("min-consensus-base-quality"));
}
#[test]
fn test_consensus_calling_options_validate_min_consensus_at_min() {
let opts = ConsensusCallingOptions {
min_consensus_base_quality: 2, ..ConsensusCallingOptions::default()
};
assert!(opts.validate().is_ok());
}
#[test]
fn test_scheduler_options_default() {
let opts = SchedulerOptions::default();
assert_eq!(opts.strategy(), SchedulerStrategy::BalancedChaseDrain);
assert!(!opts.collect_stats());
}
#[test]
fn test_scheduler_options_strategy() {
let opts = SchedulerOptions {
scheduler: SchedulerStrategy::FixedPriority,
pipeline_stats: false,
deadlock_timeout: 10,
deadlock_recover: false,
};
assert_eq!(opts.strategy(), SchedulerStrategy::FixedPriority);
}
#[test]
fn test_scheduler_options_collect_stats() {
let opts = SchedulerOptions {
scheduler: SchedulerStrategy::default(),
pipeline_stats: true,
deadlock_timeout: 10,
deadlock_recover: false,
};
assert!(opts.collect_stats());
}
#[test]
fn test_scheduler_options_deadlock_timeout() {
let opts = SchedulerOptions {
scheduler: SchedulerStrategy::default(),
pipeline_stats: false,
deadlock_timeout: 30,
deadlock_recover: false,
};
assert_eq!(opts.deadlock_timeout_secs(), 30);
}
#[test]
fn test_scheduler_options_deadlock_recover() {
let opts = SchedulerOptions {
scheduler: SchedulerStrategy::default(),
pipeline_stats: false,
deadlock_timeout: 10,
deadlock_recover: true,
};
assert!(opts.deadlock_recover_enabled());
}
#[test]
fn test_queue_memory_default_is_768_mib_per_thread() {
let opts = QueueMemoryOptions::default();
let result = opts
.calculate_memory_limit(4)
.expect("calculate_memory_limit should succeed for the default");
assert_eq!(result, 768 * 1024 * 1024 * 4);
}
#[test]
fn test_queue_memory_max_memory_per_thread_scaling() {
let opts = QueueMemoryOptions {
max_memory: MemoryLimit::Fixed(100 * 1024 * 1024),
..QueueMemoryOptions::default()
};
let result = opts.calculate_memory_limit(4).expect("should succeed");
assert_eq!(result, 100 * 1024 * 1024 * 4);
}
#[test]
fn test_queue_memory_fixed_total_does_not_scale() {
let opts = QueueMemoryOptions {
max_memory: MemoryLimit::Fixed(200 * 1024 * 1024),
memory_per_thread: false,
..QueueMemoryOptions::default()
};
let result = opts.calculate_memory_limit(8).expect("should succeed");
assert_eq!(result, 200 * 1024 * 1024);
}
#[test]
fn test_queue_memory_zero_threads_is_error() {
assert!(QueueMemoryOptions::default().calculate_memory_limit(0).is_err());
}
#[test]
fn test_queue_memory_human_readable_via_parse() {
let opts = QueueMemoryOptions {
max_memory: parse_memory("2GB").expect("parse 2GB"),
memory_per_thread: false,
..QueueMemoryOptions::default()
};
let result = opts.calculate_memory_limit(4).expect("should succeed");
assert_eq!(result, 2 * 1000 * 1000 * 1000);
}
#[test]
fn test_queue_memory_auto_is_bounded_by_host() {
let total = detect_total_memory() as u64;
let opts =
QueueMemoryOptions { max_memory: MemoryLimit::Auto, ..QueueMemoryOptions::default() };
let result = opts.calculate_memory_limit(4).expect("auto should resolve");
assert!(result > 0, "auto resolved to a zero budget");
assert!(result <= total, "auto budget {result} exceeded host total {total}");
}
#[test]
fn test_auto_never_oversubscribes_small_host() {
enable_logging(); let total = 4 * 1024 * 1024 * 1024; let margin = resolve_reserve(MemoryReserve::Auto, total); let available = total - margin;
let budget = resolve_memory_budget_with_total(
MemoryLimit::Auto,
MemoryReserve::Auto,
16,
true,
total,
)
.expect("should resolve");
assert!(budget <= available, "budget {budget} oversubscribed available {available}");
assert!(budget <= total, "budget {budget} oversubscribed host {total}");
}
#[test]
fn test_auto_uses_floor_when_host_is_ample() {
let total = 256 * 1024 * 1024 * 1024;
let margin = resolve_reserve(MemoryReserve::Auto, total); let available = total - margin;
let budget = resolve_memory_budget_with_total(
MemoryLimit::Auto,
MemoryReserve::Auto,
4,
true,
total,
)
.expect("should resolve");
assert!(budget >= MIN_MEMORY_PER_THREAD * 4, "budget {budget} fell below the floor");
assert!(budget <= available, "budget {budget} exceeded available {available}");
}
#[test]
fn test_fixed_budget_independent_of_host() {
enable_logging(); let tiny_host = 512 * 1024 * 1024;
let budget = resolve_memory_budget_with_total(
MemoryLimit::Fixed(2 * 1024 * 1024 * 1024),
MemoryReserve::Auto,
4,
false,
tiny_host,
)
.expect("should resolve");
assert_eq!(budget, 2 * 1024 * 1024 * 1024);
}
#[test]
fn test_queue_memory_auto_reserve_shrinks_budget() {
let large_reserve = QueueMemoryOptions {
max_memory: MemoryLimit::Auto,
memory_reserve: MemoryReserve::Fixed(512 * 1024 * 1024),
..QueueMemoryOptions::default()
}
.calculate_memory_limit(4)
.expect("should succeed");
let small_reserve = QueueMemoryOptions {
max_memory: MemoryLimit::Auto,
memory_reserve: MemoryReserve::Fixed(128 * 1024 * 1024),
..QueueMemoryOptions::default()
}
.calculate_memory_limit(4)
.expect("should succeed");
assert!(large_reserve <= small_reserve);
}
#[test]
fn test_fixed_per_thread_overflow_is_error() {
let opts = QueueMemoryOptions {
max_memory: MemoryLimit::Fixed(usize::MAX),
..QueueMemoryOptions::default()
};
assert!(opts.calculate_memory_limit(4).is_err());
}
#[test]
fn test_auto_per_thread_overflow_is_error() {
let result = resolve_memory_budget_with_total(
MemoryLimit::Auto,
MemoryReserve::Auto,
usize::MAX,
true,
1024 * 1024 * 1024,
);
assert!(result.is_err());
}
#[test]
fn test_log_memory_config_exercises_both_branches() {
let per_thread = QueueMemoryOptions::default();
per_thread.log_memory_config(8, 8 * 768 * 1024 * 1024); per_thread.log_memory_config(1, 768 * 1024 * 1024);
let fixed_total =
QueueMemoryOptions { memory_per_thread: false, ..QueueMemoryOptions::default() };
fixed_total.log_memory_config(8, 1024 * 1024 * 1024); }
#[test]
#[cfg(feature = "memory-debug")]
fn test_sysinfo_returns_reasonable_values() {
use sysinfo::System;
let mut system = System::new();
system.refresh_memory();
let total = system.total_memory();
let available = system.available_memory();
assert!(total > 100_000_000); assert!(available > 0);
assert!(available <= total);
}
use clap::Parser;
#[derive(Debug, Parser)]
#[command(name = "test")]
struct TestBoolFlags {
#[command(flatten)]
consensus: ConsensusCallingOptions,
#[command(flatten)]
overlapping: OverlappingConsensusOptions,
#[command(flatten)]
queue_memory: QueueMemoryOptions,
}
use rstest::rstest;
#[rstest]
#[case(&["test"], true)]
#[case(&["test", "--output-per-base-tags"], true)]
#[case(&["test", "--output-per-base-tags", "true"], true)]
#[case(&["test", "--output-per-base-tags", "false"], false)]
#[case(&["test", "--output-per-base-tags=true"], true)]
#[case(&["test", "--output-per-base-tags=false"], false)]
fn test_output_per_base_tags_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = TestBoolFlags::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.consensus.output_per_base_tags, expected);
}
#[rstest]
#[case(&["test"], false)]
#[case(&["test", "--trim"], true)]
#[case(&["test", "--trim", "true"], true)]
#[case(&["test", "--trim", "false"], false)]
#[case(&["test", "--trim=true"], true)]
#[case(&["test", "--trim=false"], false)]
fn test_trim_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = TestBoolFlags::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.consensus.trim, expected);
}
#[rstest]
#[case(&["test"], true)]
#[case(&["test", "--consensus-call-overlapping-bases"], true)]
#[case(&["test", "--consensus-call-overlapping-bases", "true"], true)]
#[case(&["test", "--consensus-call-overlapping-bases", "false"], false)]
#[case(&["test", "--consensus-call-overlapping-bases=true"], true)]
#[case(&["test", "--consensus-call-overlapping-bases=false"], false)]
fn test_overlapping_bases_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = TestBoolFlags::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.overlapping.consensus_call_overlapping_bases, expected);
}
#[rstest]
#[case(&["test"], true)]
#[case(&["test", "--memory-per-thread"], true)]
#[case(&["test", "--memory-per-thread", "true"], true)]
#[case(&["test", "--memory-per-thread", "false"], false)]
#[case(&["test", "--memory-per-thread=true"], true)]
#[case(&["test", "--memory-per-thread=false"], false)]
fn test_memory_per_thread_parsing(#[case] args: &[&str], #[case] expected: bool) {
let cmd = TestBoolFlags::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.queue_memory.memory_per_thread, expected);
}
#[rstest]
#[case(&["test"], MemoryLimit::Fixed(768 * 1024 * 1024))]
#[case(&["test", "--max-memory", "auto"], MemoryLimit::Auto)]
#[case(&["test", "--max-memory", "2GiB"], MemoryLimit::Fixed(2 * 1024 * 1024 * 1024))]
#[case(&["test", "--max-memory=512M"], MemoryLimit::Fixed(512 * 1000 * 1000))]
fn test_max_memory_parsing(#[case] args: &[&str], #[case] expected: MemoryLimit) {
let cmd = TestBoolFlags::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.queue_memory.max_memory, expected);
}
#[rstest]
#[case("true", true)]
#[case("false", false)]
#[case("yes", true)]
#[case("no", false)]
#[case("t", true)]
#[case("f", false)]
#[case("y", true)]
#[case("n", false)]
#[case("True", true)]
#[case("TRUE", true)]
#[case("False", false)]
#[case("FALSE", false)]
#[case("Yes", true)]
#[case("YES", true)]
#[case("No", false)]
#[case("NO", false)]
#[case("T", true)]
#[case("F", false)]
#[case("Y", true)]
#[case("N", false)]
#[case("tRuE", true)]
#[case("fAlSe", false)]
#[case("yEs", true)]
fn test_parse_bool_valid(#[case] input: &str, #[case] expected: bool) {
assert_eq!(parse_bool(input).expect("should parse"), expected);
}
#[rstest]
#[case("")]
#[case("tru")]
#[case("fals")]
#[case("truee")]
#[case("noo")]
#[case("yess")]
#[case("maybe")]
#[case("0")]
#[case("1")]
#[case("on")]
#[case("off")]
#[case(" true")]
#[case("true ")]
fn test_parse_bool_invalid(#[case] input: &str) {
assert!(parse_bool(input).is_err(), "expected error for input: {input:?}");
}
#[rstest]
#[case(&["test", "--trim", "yes"], true)]
#[case(&["test", "--trim", "no"], false)]
#[case(&["test", "--trim", "y"], true)]
#[case(&["test", "--trim", "n"], false)]
#[case(&["test", "--trim", "t"], true)]
#[case(&["test", "--trim", "f"], false)]
#[case(&["test", "--trim", "YES"], true)]
#[case(&["test", "--trim", "NO"], false)]
#[case(&["test", "--trim=yes"], true)]
#[case(&["test", "--trim=no"], false)]
fn test_extended_bool_values_in_cli(#[case] args: &[&str], #[case] expected: bool) {
let cmd = TestBoolFlags::try_parse_from(args).expect("valid CLI args should parse");
assert_eq!(cmd.consensus.trim, expected);
}
#[rstest]
#[case(&["test", "--trim", "maybe"])]
#[case(&["test", "--trim", "0"])]
#[case(&["test", "--trim", "1"])]
#[case(&["test", "--trim", "on"])]
#[case(&["test", "--trim", "off"])]
fn test_extended_bool_values_in_cli_invalid(#[case] args: &[&str]) {
assert!(TestBoolFlags::try_parse_from(args).is_err());
}
#[test]
fn test_detect_total_memory_returns_nonzero() {
let total = detect_total_memory();
assert!(total > 0, "detect_total_memory returned 0");
assert!(total >= 64 * 1024 * 1024, "detect_total_memory returned < 64 MiB: {total}");
}
#[test]
fn test_detect_total_memory_bounded_by_sysinfo() {
let total = detect_total_memory();
let mut system = sysinfo::System::new();
system.refresh_memory();
let sysinfo_total = usize::try_from(system.total_memory()).unwrap_or(usize::MAX);
assert!(
total <= sysinfo_total,
"cgroup-limited total {total} exceeded sysinfo total {sysinfo_total}"
);
}
#[test]
fn test_serialize_raw_bam_records_empty() {
let records: Vec<Vec<u8>> = Vec::new();
let mut output = Vec::new();
let count = serialize_raw_bam_records(&records, &mut output).unwrap();
assert_eq!(count, 0);
assert!(output.is_empty());
}
#[test]
fn test_serialize_raw_bam_records_single_frames_correctly() {
let records = vec![vec![0xDEu8, 0xAD, 0xBE, 0xEF]];
let mut output = Vec::new();
let count = serialize_raw_bam_records(&records, &mut output).unwrap();
assert_eq!(count, 1);
assert_eq!(output, vec![0x04, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF]);
}
#[test]
fn test_serialize_raw_bam_records_multiple_frames_concatenated() {
let records = vec![vec![0x11u8, 0x22], vec![0x33u8, 0x44, 0x55]];
let mut output = Vec::new();
let count = serialize_raw_bam_records(&records, &mut output).unwrap();
assert_eq!(count, 2);
assert_eq!(
output,
vec![0x02, 0x00, 0x00, 0x00, 0x11, 0x22, 0x03, 0x00, 0x00, 0x00, 0x33, 0x44, 0x55],
);
}
#[test]
fn test_serialize_raw_bam_records_reserves_capacity_upfront() {
let records: Vec<Vec<u8>> = (0..32).map(|i| vec![i as u8; 64]).collect();
let expected_size: usize = records.iter().map(|r| 4 + r.len()).sum();
let mut output = Vec::new();
serialize_raw_bam_records(&records, &mut output).unwrap();
assert_eq!(output.len(), expected_size);
assert!(
output.capacity() >= expected_size,
"capacity {} should be >= expected serialized size {expected_size}",
output.capacity(),
);
}
#[test]
fn test_serialize_raw_bam_records_preserves_existing_output_content() {
let mut output = vec![0xAAu8, 0xBB];
let records = vec![vec![0x01u8]];
serialize_raw_bam_records(&records, &mut output).unwrap();
assert_eq!(output, vec![0xAA, 0xBB, 0x01, 0x00, 0x00, 0x00, 0x01]);
}
}