use std::borrow::Cow;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
use crate::cli::SplitConfig;
use crate::correction::{CorrectionBuffers, CorrectionConfig, CorrectionStats, OverlapCorrector};
use crate::filter::{apply_filters, FilterConfig};
use crate::io::direct_reader::{DirectFastqReader, DirectPairedFastqReader};
use crate::io::{
CompressionType, FastqReader, FastqWriter, OwnedRecord, PairedFastqReader, PairedFastqWriter,
PairedSplitWriter, ReadPool, SplitWriter,
};
use crate::merge::{MergeConfig, MergeStats};
use crate::qc::{FastQcStats, InsertSizeEstimator, Mode as QcMode, QcStats};
use crate::trim::{Mode, TrimConfig};
use crate::umi::{UmiConfig, UmiProcessor};
const CHANNEL_BUFFER_MULTIPLIER: usize = 4;
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub threads: usize,
pub batch_size: usize,
pub input_files: Vec<(PathBuf, Option<PathBuf>)>,
pub output_prefix: Option<PathBuf>,
pub use_stdout: bool,
pub trim_config: TrimConfig,
pub filter_config: FilterConfig,
pub mode: Mode,
pub split_config: Option<SplitConfig>,
pub umi_config: UmiConfig,
pub merge_config: MergeConfig,
pub correction_config: CorrectionConfig,
pub interleaved_in: bool,
pub reads_to_process: usize,
pub phred64: bool,
pub fix_mgi_id: bool,
pub eval_duplication: bool,
pub overrepresentation_analysis: bool,
pub overrepresentation_sampling: u32,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
threads: num_cpus::get(),
batch_size: 5000,
input_files: Vec::new(),
output_prefix: None,
use_stdout: false,
trim_config: TrimConfig::default(),
filter_config: FilterConfig::default(),
mode: Mode::Short,
split_config: None,
umi_config: UmiConfig::disabled(),
merge_config: MergeConfig::disabled(),
correction_config: CorrectionConfig::new(),
interleaved_in: false,
reads_to_process: 0,
phred64: false,
fix_mgi_id: false,
eval_duplication: true,
overrepresentation_analysis: true,
overrepresentation_sampling: 20,
}
}
}
impl PipelineConfig {
pub fn new() -> Self {
Self::default()
}
pub fn short_read() -> Self {
Self {
trim_config: TrimConfig::short_read(),
filter_config: FilterConfig::short_read(),
mode: Mode::Short,
..Self::default()
}
}
pub fn long_read() -> Self {
Self {
trim_config: TrimConfig::long_read(),
filter_config: FilterConfig::long_read(),
mode: Mode::Long,
..Self::default()
}
}
pub fn with_threads(mut self, threads: usize) -> Self {
self.threads = threads.max(1);
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size.max(1);
self
}
pub fn with_input(mut self, path: PathBuf) -> Self {
self.input_files.push((path, None));
self
}
pub fn with_paired_input(mut self, r1: PathBuf, r2: PathBuf) -> Self {
self.input_files.push((r1, Some(r2)));
self
}
pub fn with_output_prefix(mut self, prefix: PathBuf) -> Self {
self.output_prefix = Some(prefix);
self
}
pub fn with_stdout(mut self) -> Self {
self.use_stdout = true;
self
}
pub fn with_trim_config(mut self, config: TrimConfig) -> Self {
self.trim_config = config;
self
}
pub fn with_filter_config(mut self, config: FilterConfig) -> Self {
self.filter_config = config;
self
}
pub fn with_mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}
pub fn with_split_config(mut self, config: SplitConfig) -> Self {
self.split_config = Some(config);
self
}
pub fn with_umi_config(mut self, config: UmiConfig) -> Self {
self.umi_config = config;
self
}
pub fn with_merge_config(mut self, config: MergeConfig) -> Self {
self.merge_config = config;
self
}
pub fn with_correction_config(mut self, config: CorrectionConfig) -> Self {
self.correction_config = config;
self
}
pub fn with_reads_to_process(mut self, reads_to_process: usize) -> Self {
self.reads_to_process = reads_to_process;
self
}
pub fn with_interleaved_in(mut self, interleaved: bool) -> Self {
self.interleaved_in = interleaved;
self
}
pub fn with_eval_duplication(mut self, enabled: bool) -> Self {
self.eval_duplication = enabled;
self
}
pub fn with_overrepresentation_analysis(mut self, enabled: bool) -> Self {
self.overrepresentation_analysis = enabled;
self
}
pub fn with_overrepresentation_sampling(mut self, rate: u32) -> Self {
self.overrepresentation_sampling = rate.max(1);
self
}
pub fn is_paired_end(&self) -> bool {
self.interleaved_in || self.input_files
.first()
.map(|(_, r2)| r2.is_some())
.unwrap_or(false)
}
}
#[derive(Debug, Clone, Default)]
pub struct WorkerStats {
pub reads_processed: u64,
pub reads_passed: u64,
pub bases_before: u64,
pub bases_after: u64,
pub merge_stats: MergeStats,
pub correction_stats: CorrectionStats,
}
impl WorkerStats {
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &WorkerStats) {
self.reads_processed += other.reads_processed;
self.reads_passed += other.reads_passed;
self.bases_before += other.bases_before;
self.bases_after += other.bases_after;
self.merge_stats.merge(&other.merge_stats);
self.correction_stats.merge(&other.correction_stats);
}
pub fn pass_rate(&self) -> f64 {
if self.reads_processed == 0 {
0.0
} else {
(self.reads_passed as f64 / self.reads_processed as f64) * 100.0
}
}
pub fn base_retention_rate(&self) -> f64 {
if self.bases_before == 0 {
0.0
} else {
(self.bases_after as f64 / self.bases_before as f64) * 100.0
}
}
}
#[derive(Debug, Clone)]
pub struct PipelineResult {
pub qc_before: QcStats,
pub qc_after: QcStats,
pub worker_stats: WorkerStats,
pub output_files: Vec<PathBuf>,
}
impl PipelineResult {
pub fn reads_filtered(&self) -> u64 {
self.worker_stats
.reads_processed
.saturating_sub(self.worker_stats.reads_passed)
}
pub fn pass_rate(&self) -> f64 {
self.worker_stats.pass_rate()
}
pub fn merge_stats(&self) -> &MergeStats {
&self.worker_stats.merge_stats
}
pub fn merge_rate(&self) -> f64 {
self.worker_stats.merge_stats.merge_rate()
}
}
#[derive(Debug)]
enum ReadBatch {
Single(Vec<OwnedRecord>),
Paired(Vec<(OwnedRecord, OwnedRecord)>),
}
#[derive(Debug)]
struct ProcessedBatch {
output: ReadBatch,
qc_before: FastQcStats,
qc_after: FastQcStats,
stats: WorkerStats,
insert_size_estimator: Option<InsertSizeEstimator>,
}
#[derive(Debug)]
pub struct PipelineExecutor {
config: PipelineConfig,
}
impl Default for PipelineExecutor {
fn default() -> Self {
Self::new(PipelineConfig::default())
}
}
impl PipelineExecutor {
pub fn new(config: PipelineConfig) -> Self {
Self { config }
}
pub fn run(&self) -> Result<PipelineResult> {
if self.config.input_files.is_empty() {
return Err(anyhow::anyhow!("No input files specified"));
}
if self.config.threads == 1 {
log::info!(
"Running pipeline in single-threaded mode, batch size {}",
self.config.batch_size
);
return self.run_single_threaded();
}
log::info!(
"Running optimized pipeline with {} threads, batch size {}",
self.config.threads,
self.config.batch_size
);
self.run_optimized_multi()
}
fn run_single(&self, input_path: &Path, file_idx: usize) -> Result<PipelineResult> {
let qc_mode = match self.config.mode {
Mode::Short => QcMode::Short,
Mode::Long => QcMode::Long,
};
let num_workers = self.config.threads;
let channel_capacity = num_workers * CHANNEL_BUFFER_MULTIPLIER;
let (read_tx, read_rx) = bounded::<ReadBatch>(channel_capacity);
let (processed_tx, processed_rx) = bounded::<ProcessedBatch>(channel_capacity);
let (write_tx, write_rx) = unbounded::<ReadBatch>();
let shutdown = Arc::new(AtomicBool::new(false));
let reads_sent = Arc::new(AtomicU64::new(0));
let reads_received = Arc::new(AtomicU64::new(0));
let trim_config = self.config.trim_config.clone();
let filter_config = self.config.filter_config.clone();
let umi_config = self.config.umi_config.clone();
let eval_duplication = self.config.eval_duplication;
let overrep_analysis = self.config.overrepresentation_analysis;
let overrep_sampling = self.config.overrepresentation_sampling;
let num_workers = self.config.threads;
let channel_capacity = num_workers * CHANNEL_BUFFER_MULTIPLIER;
let output_path = self.config.output_prefix.as_ref().map(|prefix| {
if self.config.input_files.len() > 1 {
prefix.with_extension(format!("{}.fastq.gz", file_idx))
} else {
prefix.with_extension("fastq.gz")
}
});
let reader_shutdown = Arc::clone(&shutdown);
let reader_reads_sent = Arc::clone(&reads_sent);
let input_path_clone = input_path.to_path_buf();
let batch_size = self.config.batch_size;
let phred64 = self.config.phred64;
let fix_mgi_id = self.config.fix_mgi_id;
let reader_handle = thread::spawn(move || -> Result<()> {
let mut reader = FastqReader::new(&input_path_clone)?
.with_phred64_conversion(phred64)
.with_mgi_id_conversion(fix_mgi_id);
let mut pool = ReadPool::new(300);
loop {
if reader_shutdown.load(Ordering::Relaxed) {
break;
}
let batch = reader.read_batch_pooled(batch_size, &mut pool)?;
if batch.is_empty() {
break;
}
reader_reads_sent.fetch_add(batch.len() as u64, Ordering::Relaxed);
if read_tx.send(ReadBatch::Single(batch)).is_err() {
break; }
}
drop(read_tx);
Ok(())
});
let worker_handles: Vec<_> = (0..num_workers)
.map(|_| {
let rx = read_rx.clone();
let tx = processed_tx.clone();
let trim = trim_config.clone();
let filter = filter_config.clone();
let umi = umi_config.clone();
let shutdown = Arc::clone(&shutdown);
let reads_recv = Arc::clone(&reads_received);
thread::spawn(move || -> Result<()> {
process_worker_single(rx, tx, &trim, &filter, &umi, qc_mode, eval_duplication, overrep_analysis, overrep_sampling, shutdown, reads_recv)
})
})
.collect();
let split_config = self.config.split_config.clone();
let use_stdout = self.config.use_stdout;
let writer_handle = if use_stdout {
let rx = write_rx;
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
use crate::io::create_stdout_writer;
let mut writer = create_stdout_writer(false, 4)?;
for batch in rx {
if let ReadBatch::Single(records) = batch {
writer.write_batch(&records)?;
}
}
writer.flush()?;
Ok(vec![])
}))
} else if let Some(ref path) = output_path {
let path = path.clone();
let rx = write_rx;
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
if let Some(config) = split_config {
let mut all_records = Vec::new();
for batch in rx {
if let ReadBatch::Single(records) = batch {
all_records.extend(records);
}
}
let total_records = all_records.len();
let mut split_writer = SplitWriter::new(&path, config, Some(total_records))?;
split_writer.write_batch(&all_records)?;
split_writer.finish()?;
Ok(split_writer.get_output_files())
} else {
let compression = CompressionType::from_path(&path);
let mut writer = FastqWriter::new(&path, compression)?;
for batch in rx {
if let ReadBatch::Single(records) = batch {
writer.write_batch(&records)?;
}
}
writer.flush()?;
Ok(vec![path])
}
}))
} else {
drop(write_rx);
None
};
drop(read_rx);
drop(processed_tx);
let mut fast_qc_before = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut fast_qc_after = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut worker_stats = WorkerStats::new();
for batch in processed_rx {
fast_qc_before.merge(batch.qc_before);
fast_qc_after.merge(batch.qc_after);
worker_stats.merge(&batch.stats);
if self.config.output_prefix.is_some() || self.config.use_stdout {
if write_tx.send(batch.output).is_err() {
break;
}
}
}
drop(write_tx);
shutdown.store(true, Ordering::Relaxed);
reader_handle
.join()
.map_err(|_| anyhow::anyhow!("Reader thread panicked"))??;
for handle in worker_handles {
handle
.join()
.map_err(|_| anyhow::anyhow!("Worker thread panicked"))??;
}
let output_files = if let Some(handle) = writer_handle {
handle
.join()
.map_err(|_| anyhow::anyhow!("Writer thread panicked"))??
} else {
Vec::new()
};
let qc_before = fast_qc_before.to_qc_stats();
let qc_after = fast_qc_after.to_qc_stats();
Ok(PipelineResult {
qc_before,
qc_after,
worker_stats,
output_files,
})
}
fn run_paired(
&self,
r1_path: &Path,
r2_path: &Path,
file_idx: usize,
) -> Result<PipelineResult> {
let qc_mode = match self.config.mode {
Mode::Short => QcMode::Short,
Mode::Long => QcMode::Long,
};
let num_workers = self.config.threads;
let channel_capacity = num_workers * CHANNEL_BUFFER_MULTIPLIER;
let (read_tx, read_rx) = bounded::<ReadBatch>(channel_capacity);
let (processed_tx, processed_rx) = bounded::<ProcessedBatch>(channel_capacity);
let (write_tx, write_rx) = unbounded::<ReadBatch>();
let shutdown = Arc::new(AtomicBool::new(false));
let reads_sent = Arc::new(AtomicU64::new(0));
let reads_received = Arc::new(AtomicU64::new(0));
let trim_config = self.config.trim_config.clone();
let filter_config = self.config.filter_config.clone();
let umi_config = self.config.umi_config.clone();
let eval_duplication = self.config.eval_duplication;
let overrep_analysis = self.config.overrepresentation_analysis;
let overrep_sampling = self.config.overrepresentation_sampling;
let num_workers = self.config.threads;
let output_paths = self.config.output_prefix.as_ref().map(|prefix| {
if self.config.input_files.len() > 1 {
(
prefix.with_extension(format!("{}.R1.fastq.gz", file_idx)),
prefix.with_extension(format!("{}.R2.fastq.gz", file_idx)),
)
} else {
(
prefix.with_extension("R1.fastq.gz"),
prefix.with_extension("R2.fastq.gz"),
)
}
});
let reader_shutdown = Arc::clone(&shutdown);
let reader_reads_sent = Arc::clone(&reads_sent);
let r1_clone = r1_path.to_path_buf();
let r2_clone = r2_path.to_path_buf();
let batch_size = self.config.batch_size;
let interleaved = self.config.interleaved_in;
let phred64 = self.config.phred64;
let fix_mgi_id = self.config.fix_mgi_id;
let reader_handle = thread::spawn(move || -> Result<()> {
let mut pool1 = ReadPool::new(300); let mut pool2 = ReadPool::new(300);
if interleaved {
let mut reader = FastqReader::new(&r1_clone)?
.with_phred64_conversion(phred64)
.with_mgi_id_conversion(fix_mgi_id);
loop {
if reader_shutdown.load(Ordering::Relaxed) {
break;
}
let batch = reader.read_batch_interleaved_pooled(batch_size, &mut pool1, &mut pool2)?;
if batch.is_empty() {
break;
}
reader_reads_sent.fetch_add(batch.len() as u64, Ordering::Relaxed);
if read_tx.send(ReadBatch::Paired(batch)).is_err() {
break; }
}
} else {
let mut reader = PairedFastqReader::new(&r1_clone, &r2_clone)?
.with_phred64_conversion(phred64)
.with_mgi_id_conversion(fix_mgi_id);
loop {
if reader_shutdown.load(Ordering::Relaxed) {
break;
}
let batch = reader.read_batch_pooled(batch_size, &mut pool1, &mut pool2)?;
if batch.is_empty() {
break;
}
reader_reads_sent.fetch_add(batch.len() as u64, Ordering::Relaxed);
if read_tx.send(ReadBatch::Paired(batch)).is_err() {
break; }
}
}
drop(read_tx);
Ok(())
});
let correction_config = self.config.correction_config.clone();
let worker_handles: Vec<_> = (0..num_workers)
.map(|_| {
let rx = read_rx.clone();
let tx = processed_tx.clone();
let trim = trim_config.clone();
let filter = filter_config.clone();
let umi = umi_config.clone();
let correction = correction_config.clone();
let shutdown = Arc::clone(&shutdown);
let reads_recv = Arc::clone(&reads_received);
thread::spawn(move || -> Result<()> {
process_worker_paired(rx, tx, &trim, &filter, &umi, &correction, qc_mode, eval_duplication, overrep_analysis, overrep_sampling, shutdown, reads_recv)
})
})
.collect();
let split_config = self.config.split_config.clone();
let use_stdout = self.config.use_stdout;
let writer_handle = if use_stdout {
let rx = write_rx;
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
use crate::io::create_stdout_writer;
let mut writer = create_stdout_writer(false, 4)?;
for batch in rx {
if let ReadBatch::Paired(pairs) = batch {
for (r1, r2) in pairs {
writer.write_record(&r1)?;
writer.write_record(&r2)?;
}
}
}
writer.flush()?;
Ok(vec![])
}))
} else if let Some((ref r1_output, ref r2_output)) = output_paths {
let r1_path = r1_output.clone();
let r2_path = r2_output.clone();
let rx = write_rx;
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
if let Some(config) = split_config {
let mut all_pairs = Vec::new();
for batch in rx {
if let ReadBatch::Paired(pairs) = batch {
all_pairs.extend(pairs);
}
}
let total_pairs = all_pairs.len();
let mut split_writer = PairedSplitWriter::new(&r1_path, config, Some(total_pairs))?;
split_writer.write_batch(&all_pairs)?;
split_writer.finish()?;
Ok(split_writer.get_output_files())
} else {
let compression = CompressionType::from_path(&r1_path);
let mut writer = PairedFastqWriter::new(&r1_path, &r2_path, compression)?;
for batch in rx {
if let ReadBatch::Paired(pairs) = batch {
writer.write_batch(&pairs)?;
}
}
writer.flush()?;
Ok(vec![r1_path, r2_path])
}
}))
} else {
drop(write_rx);
None
};
drop(read_rx);
drop(processed_tx);
let mut fast_qc_before = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut fast_qc_after = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut worker_stats = WorkerStats::new();
let mut combined_insert_size = InsertSizeEstimator::new();
for batch in processed_rx {
fast_qc_before.merge(batch.qc_before);
fast_qc_after.merge(batch.qc_after);
worker_stats.merge(&batch.stats);
if let Some(ref est) = batch.insert_size_estimator {
combined_insert_size.merge(est);
}
if self.config.output_prefix.is_some() || self.config.use_stdout {
if write_tx.send(batch.output).is_err() {
break;
}
}
}
let qc_before = fast_qc_before.to_qc_stats();
let mut qc_after = fast_qc_after.to_qc_stats();
let insert_size_stats = combined_insert_size.finalize();
if insert_size_stats.has_data() {
qc_after.set_insert_size(insert_size_stats);
}
drop(write_tx);
shutdown.store(true, Ordering::Relaxed);
reader_handle
.join()
.map_err(|_| anyhow::anyhow!("Reader thread panicked"))??;
for handle in worker_handles {
handle
.join()
.map_err(|_| anyhow::anyhow!("Worker thread panicked"))??;
}
let output_files = if let Some(handle) = writer_handle {
handle
.join()
.map_err(|_| anyhow::anyhow!("Writer thread panicked"))??
} else {
Vec::new()
};
Ok(PipelineResult {
qc_before,
qc_after,
worker_stats,
output_files,
})
}
fn run_single_threaded(&self) -> Result<PipelineResult> {
let mut combined_result: Option<PipelineResult> = None;
for (idx, (r1_path, r2_opt)) in self.config.input_files.iter().enumerate() {
log::info!(
"Processing file {} of {}: {}",
idx + 1,
self.config.input_files.len(),
r1_path.display()
);
let result = if let Some(r2_path) = r2_opt {
self.run_single_threaded_paired(r1_path, r2_path, idx)?
} else {
self.run_single_threaded_single(r1_path, idx)?
};
if let Some(ref mut combined) = combined_result {
combined.qc_before.merge(result.qc_before);
combined.qc_after.merge(result.qc_after);
combined.worker_stats.merge(&result.worker_stats);
combined.output_files.extend(result.output_files);
} else {
combined_result = Some(result);
}
}
combined_result.ok_or_else(|| anyhow::anyhow!("No results produced"))
}
fn run_single_threaded_single(&self, input_path: &Path, file_idx: usize) -> Result<PipelineResult> {
let qc_mode = match self.config.mode {
Mode::Short => QcMode::Short,
Mode::Long => QcMode::Long,
};
let mut reader = DirectFastqReader::new(input_path)?
.with_mgi_id_conversion(self.config.fix_mgi_id);
let mut fast_qc_before = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut fast_qc_after = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut worker_stats = WorkerStats::new();
let umi_processor = UmiProcessor::new(self.config.umi_config.clone());
let output_path = self.config.output_prefix.as_ref().map(|prefix| {
if self.config.input_files.len() > 1 {
prefix.with_extension(format!("{}.fastq.gz", file_idx))
} else {
prefix.with_extension("fastq.gz")
}
});
enum WriterType {
File(FastqWriter),
Stdout(std::io::StdoutLock<'static>),
}
let mut writer = if self.config.use_stdout {
Some(WriterType::Stdout(std::io::stdout().lock()))
} else if let Some(ref path) = output_path {
let compression = CompressionType::from_path(path);
Some(WriterType::File(FastqWriter::new(path, compression)?))
} else {
None
};
let mut output_buffer = Vec::with_capacity(65536);
let mut record = OwnedRecord::with_capacity(300);
let mut total_reads = 0usize;
while reader.read_into(&mut record)? {
if self.config.reads_to_process > 0 && total_reads >= self.config.reads_to_process {
break;
}
total_reads += 1;
worker_stats.reads_processed += 1;
worker_stats.bases_before += record.seq.len() as u64;
fast_qc_before.update_fast(&record.seq, &record.qual);
if umi_processor.is_enabled() {
let (working_name, working_seq, working_qual) = match umi_processor.process_read(&record.name, &record.seq, &record.qual, None) {
Some((name, seq, qual)) => (name, seq, qual),
None => continue,
};
let trim_result = self.config.trim_config.apply(&working_seq, &working_qual);
if trim_result.is_empty() {
continue;
}
if !self.config.trim_config.check_length(trim_result.len()) {
continue;
}
let mut trimmed_seq = trim_result.apply(&working_seq).to_vec();
let mut trimmed_qual = trim_result.apply(&working_qual).to_vec();
if let Some(max_len1) = self.config.filter_config.length_config.max_len_r1 {
crate::trim::length::truncate_to_max_len(&mut trimmed_seq, &mut trimmed_qual, max_len1);
}
let decision = apply_filters(&working_name, &trimmed_seq, &trimmed_qual, &self.config.filter_config);
if decision.is_fail() {
continue;
}
worker_stats.reads_passed += 1;
worker_stats.bases_after += trimmed_seq.len() as u64;
fast_qc_after.update_fast(&trimmed_seq, &trimmed_qual);
if writer.is_some() {
OwnedRecord::append_slices_to_buffer(
&mut output_buffer,
&working_name,
&trimmed_seq,
&trimmed_qual,
);
if output_buffer.len() >= 65536 {
if let Some(ref mut w) = writer {
use std::io::Write;
match w {
WriterType::File(f) => f.write_raw(&output_buffer)?,
WriterType::Stdout(s) => s.write_all(&output_buffer)?,
}
output_buffer.clear();
}
}
}
continue;
}
let working_name: Cow<'_, [u8]> = Cow::Borrowed(&record.name[..]);
let seq_ref: &[u8] = &record.seq[..];
let qual_ref: &[u8] = &record.qual[..];
let trim_result = self.config.trim_config.apply(seq_ref, qual_ref);
if trim_result.is_empty() {
continue;
}
if !self.config.trim_config.check_length(trim_result.len()) {
continue;
}
let mut trimmed_seq = trim_result.apply(seq_ref).to_vec();
let mut trimmed_qual = trim_result.apply(qual_ref).to_vec();
if let Some(max_len1) = self.config.filter_config.length_config.max_len_r1 {
crate::trim::length::truncate_to_max_len(&mut trimmed_seq, &mut trimmed_qual, max_len1);
}
let decision = apply_filters(&working_name, &trimmed_seq, &trimmed_qual, &self.config.filter_config);
if decision.is_fail() {
continue;
}
worker_stats.reads_passed += 1;
worker_stats.bases_after += trimmed_seq.len() as u64;
fast_qc_after.update_fast(&trimmed_seq, &trimmed_qual);
if writer.is_some() {
OwnedRecord::append_slices_to_buffer(
&mut output_buffer,
&working_name,
&trimmed_seq,
&trimmed_qual,
);
if output_buffer.len() >= 65536 {
if let Some(ref mut w) = writer {
use std::io::Write;
match w {
WriterType::File(f) => f.write_raw(&output_buffer)?,
WriterType::Stdout(s) => s.write_all(&output_buffer)?,
}
output_buffer.clear();
}
}
}
}
if let Some(ref mut w) = writer {
use std::io::Write;
if !output_buffer.is_empty() {
match w {
WriterType::File(f) => f.write_raw(&output_buffer)?,
WriterType::Stdout(s) => s.write_all(&output_buffer)?,
}
}
match w {
WriterType::File(f) => f.flush()?,
WriterType::Stdout(s) => s.flush()?,
}
}
let output_files = output_path.into_iter().collect();
Ok(PipelineResult {
qc_before: fast_qc_before.to_qc_stats(),
qc_after: fast_qc_after.to_qc_stats(),
worker_stats,
output_files,
})
}
fn run_single_threaded_paired(
&self,
r1_path: &Path,
r2_path: &Path,
file_idx: usize,
) -> Result<PipelineResult> {
let qc_mode = match self.config.mode {
Mode::Short => QcMode::Short,
Mode::Long => QcMode::Long,
};
let mut reader = DirectPairedFastqReader::new(r1_path, r2_path)?
.with_mgi_id_conversion(self.config.fix_mgi_id);
let mut fast_qc_before = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut fast_qc_after = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut worker_stats = WorkerStats::new();
let mut insert_size_est = InsertSizeEstimator::new();
let umi_processor = UmiProcessor::new(self.config.umi_config.clone());
let corrector = if self.config.correction_config.enabled {
Some(OverlapCorrector::new(self.config.correction_config.clone()))
} else {
None
};
let output_paths = self.config.output_prefix.as_ref().map(|prefix| {
if self.config.input_files.len() > 1 {
(
prefix.with_extension(format!("{}.R1.fastq.gz", file_idx)),
prefix.with_extension(format!("{}.R2.fastq.gz", file_idx)),
)
} else {
(
prefix.with_extension("R1.fastq.gz"),
prefix.with_extension("R2.fastq.gz"),
)
}
});
enum PairedWriterType {
File(PairedFastqWriter),
Stdout(std::io::StdoutLock<'static>),
}
let mut writer = if self.config.use_stdout {
Some(PairedWriterType::Stdout(std::io::stdout().lock()))
} else if let Some((ref r1_out, ref r2_out)) = output_paths {
let compression = CompressionType::from_path(r1_out);
Some(PairedWriterType::File(PairedFastqWriter::new(r1_out, r2_out, compression)?))
} else {
None
};
let mut r1_buffer = Vec::with_capacity(65536);
let mut r2_buffer = Vec::with_capacity(65536);
let mut r1 = OwnedRecord::with_capacity(300);
let mut r2 = OwnedRecord::with_capacity(300);
let mut total_pairs = 0usize;
let mut correction_buffers = CorrectionBuffers::new();
loop {
let has_r1 = reader.reader1.read_into(&mut r1)?;
let has_r2 = reader.reader2.read_into(&mut r2)?;
match (has_r1, has_r2) {
(true, true) => {}
(false, false) => break,
(true, false) => return Err(anyhow::anyhow!("R1 file has more records than R2")),
(false, true) => return Err(anyhow::anyhow!("R2 file has more records than R1")),
}
if self.config.reads_to_process > 0 && total_pairs * 2 >= self.config.reads_to_process {
break;
}
total_pairs += 1;
if insert_size_est.is_sampling() {
insert_size_est.estimate_from_pair(&r1.seq, &r2.seq);
}
worker_stats.reads_processed += 2;
worker_stats.bases_before += r1.seq.len() as u64 + r2.seq.len() as u64;
fast_qc_before.update_fast(&r1.seq, &r1.qual);
fast_qc_before.update_fast(&r2.seq, &r2.qual);
if umi_processor.is_enabled() {
let (r1_name, r1_seq, r1_qual, r2_name, r2_seq, r2_qual) = match umi_processor.process_paired_reads(
&r1.name, &r1.seq, &r1.qual,
&r2.name, &r2.seq, &r2.qual,
None,
) {
Some(((n1, s1, q1), (n2, s2, q2))) => (n1, s1, q1, n2, s2, q2),
None => continue,
};
let trim_r1 = self.config.trim_config.apply_with_read_type(&r1_seq, &r1_qual, false);
let trim_r2 = self.config.trim_config.apply_with_read_type(&r2_seq, &r2_qual, true);
if trim_r1.is_empty() || trim_r2.is_empty() {
continue;
}
if !self.config.trim_config.check_length(trim_r1.len())
|| !self.config.trim_config.check_length(trim_r2.len()) {
continue;
}
let mut trimmed_r1_seq = trim_r1.apply(&r1_seq).to_vec();
let mut trimmed_r1_qual = trim_r1.apply(&r1_qual).to_vec();
let mut trimmed_r2_seq = trim_r2.apply(&r2_seq).to_vec();
let mut trimmed_r2_qual = trim_r2.apply(&r2_qual).to_vec();
if let Some(ref corr) = corrector {
if let Some(stats) = corr.correct_pair_into(
&trimmed_r1_seq, &trimmed_r1_qual,
&trimmed_r2_seq, &trimmed_r2_qual,
&mut correction_buffers,
) {
std::mem::swap(&mut trimmed_r1_seq, &mut correction_buffers.r1_seq);
std::mem::swap(&mut trimmed_r1_qual, &mut correction_buffers.r1_qual);
std::mem::swap(&mut trimmed_r2_seq, &mut correction_buffers.r2_seq);
std::mem::swap(&mut trimmed_r2_qual, &mut correction_buffers.r2_qual);
worker_stats.correction_stats.merge(&stats);
}
}
if let Some(max_len1) = self.config.filter_config.length_config.max_len_r1 {
crate::trim::length::truncate_to_max_len(&mut trimmed_r1_seq, &mut trimmed_r1_qual, max_len1);
}
if let Some(max_len2) = self.config.filter_config.length_config.max_len_r2 {
crate::trim::length::truncate_to_max_len(&mut trimmed_r2_seq, &mut trimmed_r2_qual, max_len2);
}
let decision_r1 = apply_filters(&r1_name, &trimmed_r1_seq, &trimmed_r1_qual, &self.config.filter_config);
let decision_r2 = apply_filters(&r2_name, &trimmed_r2_seq, &trimmed_r2_qual, &self.config.filter_config);
if decision_r1.is_fail() || decision_r2.is_fail() {
continue;
}
worker_stats.reads_passed += 2;
worker_stats.bases_after += trimmed_r1_seq.len() as u64 + trimmed_r2_seq.len() as u64;
fast_qc_after.update_fast(&trimmed_r1_seq, &trimmed_r1_qual);
fast_qc_after.update_fast(&trimmed_r2_seq, &trimmed_r2_qual);
if writer.is_some() {
if let Some(ref mut w) = writer {
use std::io::Write;
match w {
PairedWriterType::File(f) => {
OwnedRecord::append_slices_to_buffer(&mut r1_buffer, &r1_name, &trimmed_r1_seq, &trimmed_r1_qual);
OwnedRecord::append_slices_to_buffer(&mut r2_buffer, &r2_name, &trimmed_r2_seq, &trimmed_r2_qual);
if r1_buffer.len() >= 65536 {
f.write_raw(&r1_buffer, &r2_buffer)?;
r1_buffer.clear();
r2_buffer.clear();
}
}
PairedWriterType::Stdout(s) => {
OwnedRecord::append_slices_to_buffer(&mut r1_buffer, &r1_name, &trimmed_r1_seq, &trimmed_r1_qual);
s.write_all(&r1_buffer)?;
r1_buffer.clear();
OwnedRecord::append_slices_to_buffer(&mut r2_buffer, &r2_name, &trimmed_r2_seq, &trimmed_r2_qual);
s.write_all(&r2_buffer)?;
r2_buffer.clear();
}
}
}
}
continue;
}
let r1_name: Cow<'_, [u8]> = Cow::Borrowed(&r1.name[..]);
let r2_name: Cow<'_, [u8]> = Cow::Borrowed(&r2.name[..]);
let r1_seq_ref: &[u8] = &r1.seq[..];
let r1_qual_ref: &[u8] = &r1.qual[..];
let r2_seq_ref: &[u8] = &r2.seq[..];
let r2_qual_ref: &[u8] = &r2.qual[..];
let trim_r1 = self.config.trim_config.apply_with_read_type(r1_seq_ref, r1_qual_ref, false);
let trim_r2 = self.config.trim_config.apply_with_read_type(r2_seq_ref, r2_qual_ref, true);
if trim_r1.is_empty() || trim_r2.is_empty() {
continue;
}
if !self.config.trim_config.check_length(trim_r1.len())
|| !self.config.trim_config.check_length(trim_r2.len()) {
continue;
}
let mut trimmed_r1_seq = trim_r1.apply(r1_seq_ref).to_vec();
let mut trimmed_r1_qual = trim_r1.apply(r1_qual_ref).to_vec();
let mut trimmed_r2_seq = trim_r2.apply(r2_seq_ref).to_vec();
let mut trimmed_r2_qual = trim_r2.apply(r2_qual_ref).to_vec();
if let Some(ref corr) = corrector {
if let Some(stats) = corr.correct_pair_into(
&trimmed_r1_seq, &trimmed_r1_qual,
&trimmed_r2_seq, &trimmed_r2_qual,
&mut correction_buffers,
) {
std::mem::swap(&mut trimmed_r1_seq, &mut correction_buffers.r1_seq);
std::mem::swap(&mut trimmed_r1_qual, &mut correction_buffers.r1_qual);
std::mem::swap(&mut trimmed_r2_seq, &mut correction_buffers.r2_seq);
std::mem::swap(&mut trimmed_r2_qual, &mut correction_buffers.r2_qual);
worker_stats.correction_stats.merge(&stats);
}
}
if let Some(max_len1) = self.config.filter_config.length_config.max_len_r1 {
crate::trim::length::truncate_to_max_len(&mut trimmed_r1_seq, &mut trimmed_r1_qual, max_len1);
}
if let Some(max_len2) = self.config.filter_config.length_config.max_len_r2 {
crate::trim::length::truncate_to_max_len(&mut trimmed_r2_seq, &mut trimmed_r2_qual, max_len2);
}
let decision_r1 = apply_filters(&r1_name, &trimmed_r1_seq, &trimmed_r1_qual, &self.config.filter_config);
let decision_r2 = apply_filters(&r2_name, &trimmed_r2_seq, &trimmed_r2_qual, &self.config.filter_config);
if decision_r1.is_fail() || decision_r2.is_fail() {
continue;
}
worker_stats.reads_passed += 2;
worker_stats.bases_after += trimmed_r1_seq.len() as u64 + trimmed_r2_seq.len() as u64;
fast_qc_after.update_fast(&trimmed_r1_seq, &trimmed_r1_qual);
fast_qc_after.update_fast(&trimmed_r2_seq, &trimmed_r2_qual);
if writer.is_some() {
if let Some(ref mut w) = writer {
use std::io::Write;
match w {
PairedWriterType::File(f) => {
OwnedRecord::append_slices_to_buffer(
&mut r1_buffer,
&r1_name,
&trimmed_r1_seq,
&trimmed_r1_qual,
);
OwnedRecord::append_slices_to_buffer(
&mut r2_buffer,
&r2_name,
&trimmed_r2_seq,
&trimmed_r2_qual,
);
if r1_buffer.len() >= 65536 {
f.write_raw(&r1_buffer, &r2_buffer)?;
r1_buffer.clear();
r2_buffer.clear();
}
}
PairedWriterType::Stdout(s) => {
OwnedRecord::append_slices_to_buffer(&mut r1_buffer, &r1_name, &trimmed_r1_seq, &trimmed_r1_qual);
s.write_all(&r1_buffer)?;
r1_buffer.clear();
OwnedRecord::append_slices_to_buffer(&mut r2_buffer, &r2_name, &trimmed_r2_seq, &trimmed_r2_qual);
s.write_all(&r2_buffer)?;
r2_buffer.clear();
}
}
}
}
}
if let Some(ref mut w) = writer {
use std::io::Write;
match w {
PairedWriterType::File(f) => {
if !r1_buffer.is_empty() {
f.write_raw(&r1_buffer, &r2_buffer)?;
}
f.flush()?;
}
PairedWriterType::Stdout(s) => {
s.flush()?;
}
}
}
let mut qc_after = fast_qc_after.to_qc_stats();
let insert_stats = insert_size_est.finalize();
if insert_stats.has_data() {
qc_after.set_insert_size(insert_stats);
}
let output_files = output_paths
.map(|(r1, r2)| vec![r1, r2])
.unwrap_or_default();
Ok(PipelineResult {
qc_before: fast_qc_before.to_qc_stats(),
qc_after,
worker_stats,
output_files,
})
}
fn run_optimized_multi(&self) -> Result<PipelineResult> {
let mut combined_result: Option<PipelineResult> = None;
for (idx, (r1_path, r2_opt)) in self.config.input_files.iter().enumerate() {
log::info!(
"Processing file {} of {}: {}",
idx + 1,
self.config.input_files.len(),
r1_path.display()
);
let result = if let Some(r2_path) = r2_opt {
self.run_optimized_paired(r1_path, r2_path, idx)?
} else {
self.run_optimized_single(r1_path, idx)?
};
if let Some(ref mut combined) = combined_result {
combined.qc_before.merge(result.qc_before);
combined.qc_after.merge(result.qc_after);
combined.worker_stats.merge(&result.worker_stats);
combined.output_files.extend(result.output_files);
} else {
combined_result = Some(result);
}
}
combined_result.ok_or_else(|| anyhow::anyhow!("No results produced"))
}
fn run_optimized_single(&self, input_path: &Path, file_idx: usize) -> Result<PipelineResult> {
let qc_mode = match self.config.mode {
Mode::Short => QcMode::Short,
Mode::Long => QcMode::Long,
};
let num_workers = self.config.threads;
let batch_size = self.config.batch_size;
let channel_capacity = num_workers * CHANNEL_BUFFER_MULTIPLIER;
let (read_tx, read_rx) = bounded::<Vec<OwnedRecord>>(channel_capacity);
let (output_tx, output_rx) = unbounded::<Vec<u8>>();
let shutdown = Arc::new(AtomicBool::new(false));
let trim_config = self.config.trim_config.clone();
let filter_config = self.config.filter_config.clone();
let umi_config = self.config.umi_config.clone();
let eval_duplication = self.config.eval_duplication;
let overrep_analysis = self.config.overrepresentation_analysis;
let overrep_sampling = self.config.overrepresentation_sampling;
let output_path = self.config.output_prefix.as_ref().map(|prefix| {
if self.config.input_files.len() > 1 {
prefix.with_extension(format!("{}.fastq.gz", file_idx))
} else {
prefix.with_extension("fastq.gz")
}
});
let reader_shutdown = Arc::clone(&shutdown);
let input_path_clone = input_path.to_path_buf();
let fix_mgi_id = self.config.fix_mgi_id;
let reader_handle = thread::spawn(move || -> Result<()> {
let mut reader = DirectFastqReader::new(&input_path_clone)?
.with_mgi_id_conversion(fix_mgi_id);
let mut pool = ReadPool::new(300);
loop {
if reader_shutdown.load(Ordering::Relaxed) {
break;
}
let batch = reader.read_batch_pooled(batch_size, &mut pool)?;
if batch.is_empty() {
break;
}
if read_tx.send(batch).is_err() {
break;
}
}
drop(read_tx);
Ok(())
});
let worker_handles: Vec<_> = (0..num_workers)
.map(|_| {
let rx = read_rx.clone();
let tx = output_tx.clone();
let trim = trim_config.clone();
let filter = filter_config.clone();
let umi = umi_config.clone();
let shutdown = Arc::clone(&shutdown);
thread::spawn(move || -> Result<(FastQcStats, FastQcStats, WorkerStats)> {
let mut qc_before = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut qc_after = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut stats = WorkerStats::new();
let umi_processor = UmiProcessor::new(umi);
let mut output_buffer = Vec::with_capacity(65536);
for batch in rx {
if shutdown.load(Ordering::Relaxed) {
break;
}
for record in batch {
stats.reads_processed += 1;
stats.bases_before += record.seq.len() as u64;
qc_before.update_fast(&record.seq, &record.qual);
if umi_processor.is_enabled() {
let (working_name, working_seq, working_qual) =
match umi_processor.process_read(&record.name, &record.seq, &record.qual, None) {
Some((n, s, q)) => (n, s, q),
None => continue,
};
let trim_result = trim.apply(&working_seq, &working_qual);
if trim_result.is_empty() || !trim.check_length(trim_result.len()) {
continue;
}
let trimmed_seq = trim_result.apply(&working_seq);
let trimmed_qual = trim_result.apply(&working_qual);
let decision = apply_filters(&working_name, trimmed_seq, trimmed_qual, &filter);
if decision.is_fail() {
continue;
}
stats.reads_passed += 1;
stats.bases_after += trimmed_seq.len() as u64;
qc_after.update_fast(trimmed_seq, trimmed_qual);
OwnedRecord::append_slices_to_buffer(
&mut output_buffer,
&working_name,
trimmed_seq,
trimmed_qual,
);
if output_buffer.len() >= 65536 {
let _ = tx.send(std::mem::take(&mut output_buffer));
output_buffer = Vec::with_capacity(65536);
}
continue;
}
let working_name: Cow<'_, [u8]> = Cow::Borrowed(&record.name[..]);
let seq_ref: &[u8] = &record.seq[..];
let qual_ref: &[u8] = &record.qual[..];
let trim_result = trim.apply(seq_ref, qual_ref);
if trim_result.is_empty() || !trim.check_length(trim_result.len()) {
continue;
}
let trimmed_seq = trim_result.apply(seq_ref);
let trimmed_qual = trim_result.apply(qual_ref);
let decision = apply_filters(&working_name, trimmed_seq, trimmed_qual, &filter);
if decision.is_fail() {
continue;
}
stats.reads_passed += 1;
stats.bases_after += trimmed_seq.len() as u64;
qc_after.update_fast(trimmed_seq, trimmed_qual);
OwnedRecord::append_slices_to_buffer(
&mut output_buffer,
&working_name,
trimmed_seq,
trimmed_qual,
);
if output_buffer.len() >= 65536 {
let _ = tx.send(std::mem::take(&mut output_buffer));
output_buffer = Vec::with_capacity(65536);
}
}
}
if !output_buffer.is_empty() {
let _ = tx.send(output_buffer);
}
Ok((qc_before, qc_after, stats))
})
})
.collect();
drop(read_rx);
drop(output_tx);
let use_stdout = self.config.use_stdout;
let writer_handle = if use_stdout {
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
use crate::io::create_stdout_writer;
let mut writer = create_stdout_writer(false, 4)?;
for buffer in output_rx {
use std::io::Write;
writer.flush()?;
std::io::stdout().write_all(&buffer)?;
}
writer.flush()?;
Ok(vec![])
}))
} else if let Some(ref path) = output_path {
let path = path.clone();
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
let compression = CompressionType::from_path(&path);
let mut writer = FastqWriter::new(&path, compression)?;
for buffer in output_rx {
writer.write_raw(&buffer)?;
}
writer.flush()?;
Ok(vec![path])
}))
} else {
for _ in output_rx {}
None
};
reader_handle.join().map_err(|_| anyhow::anyhow!("Reader panicked"))??;
let mut fast_qc_before = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut fast_qc_after = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut worker_stats = WorkerStats::new();
for handle in worker_handles {
let (qc_b, qc_a, stats) = handle.join().map_err(|_| anyhow::anyhow!("Worker panicked"))??;
fast_qc_before.merge(qc_b);
fast_qc_after.merge(qc_a);
worker_stats.merge(&stats);
}
shutdown.store(true, Ordering::Relaxed);
let output_files = if let Some(handle) = writer_handle {
handle.join().map_err(|_| anyhow::anyhow!("Writer panicked"))??
} else {
Vec::new()
};
Ok(PipelineResult {
qc_before: fast_qc_before.to_qc_stats(),
qc_after: fast_qc_after.to_qc_stats(),
worker_stats,
output_files,
})
}
fn run_optimized_paired(
&self,
r1_path: &Path,
r2_path: &Path,
file_idx: usize,
) -> Result<PipelineResult> {
use crate::correction::OverlapCorrector;
let qc_mode = match self.config.mode {
Mode::Short => QcMode::Short,
Mode::Long => QcMode::Long,
};
let num_workers = self.config.threads;
let batch_size = self.config.batch_size;
let channel_capacity = num_workers * CHANNEL_BUFFER_MULTIPLIER;
let (read_tx, read_rx) = bounded::<Vec<(OwnedRecord, OwnedRecord)>>(channel_capacity);
let (output_tx, output_rx) = unbounded::<(Vec<u8>, Vec<u8>)>();
let shutdown = Arc::new(AtomicBool::new(false));
let trim_config = self.config.trim_config.clone();
let filter_config = self.config.filter_config.clone();
let umi_config = self.config.umi_config.clone();
let correction_config = self.config.correction_config.clone();
let eval_duplication = self.config.eval_duplication;
let overrep_analysis = self.config.overrepresentation_analysis;
let overrep_sampling = self.config.overrepresentation_sampling;
let output_paths = self.config.output_prefix.as_ref().map(|prefix| {
if self.config.input_files.len() > 1 {
(
prefix.with_extension(format!("{}.R1.fastq.gz", file_idx)),
prefix.with_extension(format!("{}.R2.fastq.gz", file_idx)),
)
} else {
(prefix.with_extension("R1.fastq.gz"), prefix.with_extension("R2.fastq.gz"))
}
});
let reader_shutdown = Arc::clone(&shutdown);
let r1_clone = r1_path.to_path_buf();
let r2_clone = r2_path.to_path_buf();
let fix_mgi_id = self.config.fix_mgi_id;
let reader_handle = thread::spawn(move || -> Result<()> {
let mut reader = DirectPairedFastqReader::new(&r1_clone, &r2_clone)?
.with_mgi_id_conversion(fix_mgi_id);
let mut pool1 = ReadPool::new(300);
let mut pool2 = ReadPool::new(300);
loop {
if reader_shutdown.load(Ordering::Relaxed) {
break;
}
let batch = reader.read_batch_pooled(batch_size, &mut pool1, &mut pool2)?;
if batch.is_empty() {
break;
}
if read_tx.send(batch).is_err() {
break;
}
}
drop(read_tx);
Ok(())
});
let worker_handles: Vec<_> = (0..num_workers)
.map(|_| {
let rx = read_rx.clone();
let tx = output_tx.clone();
let trim = trim_config.clone();
let filter = filter_config.clone();
let umi = umi_config.clone();
let correction = correction_config.clone();
let shutdown = Arc::clone(&shutdown);
thread::spawn(move || -> Result<(FastQcStats, FastQcStats, WorkerStats, InsertSizeEstimator)> {
let mut qc_before = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut qc_after = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut stats = WorkerStats::new();
let mut insert_size_est = InsertSizeEstimator::new();
let umi_processor = UmiProcessor::new(umi);
let corrector = if correction.enabled {
Some(OverlapCorrector::new(correction))
} else {
None
};
let mut r1_buffer = Vec::with_capacity(65536);
let mut r2_buffer = Vec::with_capacity(65536);
let mut correction_buffers = CorrectionBuffers::new();
for batch in rx {
if shutdown.load(Ordering::Relaxed) {
break;
}
for (r1, r2) in batch {
if insert_size_est.is_sampling() {
insert_size_est.estimate_from_pair(&r1.seq, &r2.seq);
}
stats.reads_processed += 2;
stats.bases_before += r1.seq.len() as u64 + r2.seq.len() as u64;
qc_before.update_fast(&r1.seq, &r1.qual);
qc_before.update_fast(&r2.seq, &r2.qual);
if umi_processor.is_enabled() {
let (r1_name, r1_seq, r1_qual, r2_name, r2_seq, r2_qual) =
match umi_processor.process_paired_reads(
&r1.name, &r1.seq, &r1.qual,
&r2.name, &r2.seq, &r2.qual,
None,
) {
Some(((n1, s1, q1), (n2, s2, q2))) => (n1, s1, q1, n2, s2, q2),
None => continue,
};
let trim_r1 = trim.apply_with_read_type(&r1_seq, &r1_qual, false);
let trim_r2 = trim.apply_with_read_type(&r2_seq, &r2_qual, true);
if trim_r1.is_empty() || trim_r2.is_empty() {
continue;
}
if !trim.check_length(trim_r1.len()) || !trim.check_length(trim_r2.len()) {
continue;
}
let mut trimmed_r1_seq = trim_r1.apply(&r1_seq).to_vec();
let mut trimmed_r1_qual = trim_r1.apply(&r1_qual).to_vec();
let mut trimmed_r2_seq = trim_r2.apply(&r2_seq).to_vec();
let mut trimmed_r2_qual = trim_r2.apply(&r2_qual).to_vec();
if let Some(ref corr) = corrector {
if let Some(cstats) = corr.correct_pair_into(
&trimmed_r1_seq, &trimmed_r1_qual,
&trimmed_r2_seq, &trimmed_r2_qual,
&mut correction_buffers,
) {
std::mem::swap(&mut trimmed_r1_seq, &mut correction_buffers.r1_seq);
std::mem::swap(&mut trimmed_r1_qual, &mut correction_buffers.r1_qual);
std::mem::swap(&mut trimmed_r2_seq, &mut correction_buffers.r2_seq);
std::mem::swap(&mut trimmed_r2_qual, &mut correction_buffers.r2_qual);
stats.correction_stats.merge(&cstats);
}
}
let d1 = apply_filters(&r1_name, &trimmed_r1_seq, &trimmed_r1_qual, &filter);
let d2 = apply_filters(&r2_name, &trimmed_r2_seq, &trimmed_r2_qual, &filter);
if d1.is_fail() || d2.is_fail() {
continue;
}
stats.reads_passed += 2;
stats.bases_after += trimmed_r1_seq.len() as u64 + trimmed_r2_seq.len() as u64;
qc_after.update_fast(&trimmed_r1_seq, &trimmed_r1_qual);
qc_after.update_fast(&trimmed_r2_seq, &trimmed_r2_qual);
OwnedRecord::append_slices_to_buffer(&mut r1_buffer, &r1_name, &trimmed_r1_seq, &trimmed_r1_qual);
OwnedRecord::append_slices_to_buffer(&mut r2_buffer, &r2_name, &trimmed_r2_seq, &trimmed_r2_qual);
if r1_buffer.len() >= 65536 {
let _ = tx.send((std::mem::take(&mut r1_buffer), std::mem::take(&mut r2_buffer)));
r1_buffer = Vec::with_capacity(65536);
r2_buffer = Vec::with_capacity(65536);
}
continue;
}
let r1_name: Cow<'_, [u8]> = Cow::Borrowed(&r1.name[..]);
let r2_name: Cow<'_, [u8]> = Cow::Borrowed(&r2.name[..]);
let r1_seq_ref: &[u8] = &r1.seq[..];
let r1_qual_ref: &[u8] = &r1.qual[..];
let r2_seq_ref: &[u8] = &r2.seq[..];
let r2_qual_ref: &[u8] = &r2.qual[..];
let trim_r1 = trim.apply_with_read_type(r1_seq_ref, r1_qual_ref, false);
let trim_r2 = trim.apply_with_read_type(r2_seq_ref, r2_qual_ref, true);
if trim_r1.is_empty() || trim_r2.is_empty() {
continue;
}
if !trim.check_length(trim_r1.len()) || !trim.check_length(trim_r2.len()) {
continue;
}
let mut trimmed_r1_seq = trim_r1.apply(r1_seq_ref).to_vec();
let mut trimmed_r1_qual = trim_r1.apply(r1_qual_ref).to_vec();
let mut trimmed_r2_seq = trim_r2.apply(r2_seq_ref).to_vec();
let mut trimmed_r2_qual = trim_r2.apply(r2_qual_ref).to_vec();
if let Some(ref corr) = corrector {
if let Some(cstats) = corr.correct_pair_into(
&trimmed_r1_seq, &trimmed_r1_qual,
&trimmed_r2_seq, &trimmed_r2_qual,
&mut correction_buffers,
) {
std::mem::swap(&mut trimmed_r1_seq, &mut correction_buffers.r1_seq);
std::mem::swap(&mut trimmed_r1_qual, &mut correction_buffers.r1_qual);
std::mem::swap(&mut trimmed_r2_seq, &mut correction_buffers.r2_seq);
std::mem::swap(&mut trimmed_r2_qual, &mut correction_buffers.r2_qual);
stats.correction_stats.merge(&cstats);
}
}
let d1 = apply_filters(&r1_name, &trimmed_r1_seq, &trimmed_r1_qual, &filter);
let d2 = apply_filters(&r2_name, &trimmed_r2_seq, &trimmed_r2_qual, &filter);
if d1.is_fail() || d2.is_fail() {
continue;
}
stats.reads_passed += 2;
stats.bases_after += trimmed_r1_seq.len() as u64 + trimmed_r2_seq.len() as u64;
qc_after.update_fast(&trimmed_r1_seq, &trimmed_r1_qual);
qc_after.update_fast(&trimmed_r2_seq, &trimmed_r2_qual);
OwnedRecord::append_slices_to_buffer(&mut r1_buffer, &r1_name, &trimmed_r1_seq, &trimmed_r1_qual);
OwnedRecord::append_slices_to_buffer(&mut r2_buffer, &r2_name, &trimmed_r2_seq, &trimmed_r2_qual);
if r1_buffer.len() >= 65536 {
let _ = tx.send((std::mem::take(&mut r1_buffer), std::mem::take(&mut r2_buffer)));
r1_buffer = Vec::with_capacity(65536);
r2_buffer = Vec::with_capacity(65536);
}
}
}
if !r1_buffer.is_empty() {
let _ = tx.send((r1_buffer, r2_buffer));
}
Ok((qc_before, qc_after, stats, insert_size_est))
})
})
.collect();
drop(read_rx);
drop(output_tx);
let use_stdout = self.config.use_stdout;
let writer_handle = if use_stdout {
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
use std::io::Write;
let stdout = std::io::stdout();
let mut handle = stdout.lock();
for (r1_buf, r2_buf) in output_rx {
let r1_lines: Vec<&[u8]> = r1_buf.split(|&b| b == b'\n').collect();
let r2_lines: Vec<&[u8]> = r2_buf.split(|&b| b == b'\n').collect();
let r1_records = r1_lines.chunks(4);
let r2_records = r2_lines.chunks(4);
for (r1_rec, r2_rec) in r1_records.zip(r2_records) {
for line in r1_rec {
if !line.is_empty() {
handle.write_all(line)?;
handle.write_all(b"\n")?;
}
}
for line in r2_rec {
if !line.is_empty() {
handle.write_all(line)?;
handle.write_all(b"\n")?;
}
}
}
}
handle.flush()?;
Ok(vec![])
}))
} else if let Some((ref r1_out, ref r2_out)) = output_paths {
let r1_path = r1_out.clone();
let r2_path = r2_out.clone();
Some(thread::spawn(move || -> Result<Vec<PathBuf>> {
let compression = CompressionType::from_path(&r1_path);
let mut writer = PairedFastqWriter::new(&r1_path, &r2_path, compression)?;
for (r1_buf, r2_buf) in output_rx {
writer.write_raw(&r1_buf, &r2_buf)?;
}
writer.flush()?;
Ok(vec![r1_path, r2_path])
}))
} else {
for _ in output_rx {}
None
};
reader_handle.join().map_err(|_| anyhow::anyhow!("Reader panicked"))??;
let mut fast_qc_before = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut fast_qc_after = FastQcStats::with_full_config(
qc_mode,
self.config.eval_duplication,
self.config.overrepresentation_analysis,
self.config.overrepresentation_sampling,
);
let mut worker_stats = WorkerStats::new();
let mut combined_insert_size = InsertSizeEstimator::new();
for handle in worker_handles {
let (qc_b, qc_a, stats, ist) = handle.join().map_err(|_| anyhow::anyhow!("Worker panicked"))??;
fast_qc_before.merge(qc_b);
fast_qc_after.merge(qc_a);
worker_stats.merge(&stats);
combined_insert_size.merge(&ist);
}
shutdown.store(true, Ordering::Relaxed);
let mut qc_after = fast_qc_after.to_qc_stats();
let insert_stats = combined_insert_size.finalize();
if insert_stats.has_data() {
qc_after.set_insert_size(insert_stats);
}
let output_files = if let Some(handle) = writer_handle {
handle.join().map_err(|_| anyhow::anyhow!("Writer panicked"))??
} else {
Vec::new()
};
Ok(PipelineResult {
qc_before: fast_qc_before.to_qc_stats(),
qc_after,
worker_stats,
output_files,
})
}
}
fn process_worker_single(
rx: Receiver<ReadBatch>,
tx: Sender<ProcessedBatch>,
trim_config: &TrimConfig,
filter_config: &FilterConfig,
umi_config: &UmiConfig,
qc_mode: QcMode,
eval_duplication: bool,
overrep_analysis: bool,
overrep_sampling: u32,
shutdown: Arc<AtomicBool>,
reads_received: Arc<AtomicU64>,
) -> Result<()> {
let umi_processor = UmiProcessor::new(umi_config.clone());
for batch in rx {
if shutdown.load(Ordering::Relaxed) {
break;
}
let records = match batch {
ReadBatch::Single(r) => r,
ReadBatch::Paired(_) => continue, };
reads_received.fetch_add(records.len() as u64, Ordering::Relaxed);
let (processed, qc_before, qc_after, stats) =
process_single_batch(&records, trim_config, filter_config, &umi_processor, qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let result = ProcessedBatch {
output: ReadBatch::Single(processed),
qc_before,
qc_after,
stats,
insert_size_estimator: None,
};
if tx.send(result).is_err() {
break; }
}
Ok(())
}
fn process_worker_paired(
rx: Receiver<ReadBatch>,
tx: Sender<ProcessedBatch>,
trim_config: &TrimConfig,
filter_config: &FilterConfig,
umi_config: &UmiConfig,
correction_config: &CorrectionConfig,
qc_mode: QcMode,
eval_duplication: bool,
overrep_analysis: bool,
overrep_sampling: u32,
shutdown: Arc<AtomicBool>,
reads_received: Arc<AtomicU64>,
) -> Result<()> {
let umi_processor = UmiProcessor::new(umi_config.clone());
for batch in rx {
if shutdown.load(Ordering::Relaxed) {
break;
}
let pairs = match batch {
ReadBatch::Paired(p) => p,
ReadBatch::Single(_) => continue, };
reads_received.fetch_add(pairs.len() as u64, Ordering::Relaxed);
let (processed, qc_before, qc_after, stats, insert_size_est) =
process_paired_batch(&pairs, trim_config, filter_config, &umi_processor, correction_config, qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let result = ProcessedBatch {
output: ReadBatch::Paired(processed),
qc_before,
qc_after,
stats,
insert_size_estimator: Some(insert_size_est),
};
if tx.send(result).is_err() {
break; }
}
Ok(())
}
fn process_single_batch(
records: &[OwnedRecord],
trim_config: &TrimConfig,
filter_config: &FilterConfig,
umi_processor: &UmiProcessor,
qc_mode: QcMode,
eval_duplication: bool,
overrep_analysis: bool,
overrep_sampling: u32,
) -> (Vec<OwnedRecord>, FastQcStats, FastQcStats, WorkerStats) {
let mut qc_before = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut qc_after = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut stats = WorkerStats::new();
let mut output = Vec::with_capacity(records.len());
for record in records {
stats.reads_processed += 1;
stats.bases_before += record.seq.len() as u64;
qc_before.update_fast(record.seq(), record.qual());
if umi_processor.is_enabled() {
let (working_name, working_seq, working_qual) =
match umi_processor.process_read(&record.name, &record.seq, &record.qual, None) {
Some((name, seq, qual)) => (name, seq, qual),
None => continue, };
let trim_result = trim_config.apply(&working_seq, &working_qual);
if trim_result.is_empty() {
continue;
}
if !trim_config.check_length(trim_result.len()) {
continue;
}
let trimmed_seq = trim_result.apply(&working_seq).to_vec();
let trimmed_qual = trim_result.apply(&working_qual).to_vec();
let decision = apply_filters(&working_name, &trimmed_seq, &trimmed_qual, filter_config);
if decision.is_fail() {
continue;
}
stats.reads_passed += 1;
stats.bases_after += trimmed_seq.len() as u64;
qc_after.update_fast(&trimmed_seq, &trimmed_qual);
let output_record = OwnedRecord::new(working_name, trimmed_seq, trimmed_qual);
output.push(output_record);
continue;
}
let working_name: Cow<'_, [u8]> = Cow::Borrowed(&record.name[..]);
let seq_ref: &[u8] = &record.seq[..];
let qual_ref: &[u8] = &record.qual[..];
let trim_result = trim_config.apply(seq_ref, qual_ref);
if trim_result.is_empty() {
continue;
}
if !trim_config.check_length(trim_result.len()) {
continue;
}
let trimmed_seq = trim_result.apply(seq_ref).to_vec();
let trimmed_qual = trim_result.apply(qual_ref).to_vec();
let decision = apply_filters(&working_name, &trimmed_seq, &trimmed_qual, filter_config);
if decision.is_fail() {
continue;
}
stats.reads_passed += 1;
stats.bases_after += trimmed_seq.len() as u64;
qc_after.update_fast(&trimmed_seq, &trimmed_qual);
let output_record = OwnedRecord::new(working_name.into_owned(), trimmed_seq, trimmed_qual);
output.push(output_record);
}
(output, qc_before, qc_after, stats)
}
fn process_paired_batch(
pairs: &[(OwnedRecord, OwnedRecord)],
trim_config: &TrimConfig,
filter_config: &FilterConfig,
umi_processor: &UmiProcessor,
correction_config: &CorrectionConfig,
qc_mode: QcMode,
eval_duplication: bool,
overrep_analysis: bool,
overrep_sampling: u32,
) -> (
Vec<(OwnedRecord, OwnedRecord)>,
FastQcStats,
FastQcStats,
WorkerStats,
InsertSizeEstimator,
) {
let mut qc_before = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut qc_after = FastQcStats::with_full_config(qc_mode, eval_duplication, overrep_analysis, overrep_sampling);
let mut stats = WorkerStats::new();
let mut output = Vec::with_capacity(pairs.len());
let mut insert_size_est = InsertSizeEstimator::new();
let corrector = if correction_config.enabled {
Some(OverlapCorrector::new(correction_config.clone()))
} else {
None
};
let mut correction_buffers = CorrectionBuffers::new();
for (r1, r2) in pairs {
if insert_size_est.is_sampling() {
insert_size_est.estimate_from_pair(&r1.seq, &r2.seq);
}
stats.reads_processed += 2;
stats.bases_before += r1.seq.len() as u64 + r2.seq.len() as u64;
qc_before.update_fast(r1.seq(), r1.qual());
qc_before.update_fast(r2.seq(), r2.qual());
if umi_processor.is_enabled() {
let (r1_name, r1_seq, r1_qual, r2_name, r2_seq, r2_qual) =
match umi_processor.process_paired_reads(
&r1.name, &r1.seq, &r1.qual,
&r2.name, &r2.seq, &r2.qual,
None,
) {
Some(((n1, s1, q1), (n2, s2, q2))) => (n1, s1, q1, n2, s2, q2),
None => continue, };
let trim_r1 = trim_config.apply_with_read_type(&r1_seq, &r1_qual, false);
let trim_r2 = trim_config.apply_with_read_type(&r2_seq, &r2_qual, true);
if trim_r1.is_empty() || trim_r2.is_empty() {
continue;
}
if !trim_config.check_length(trim_r1.len()) || !trim_config.check_length(trim_r2.len()) {
continue;
}
let mut trimmed_r1_seq = trim_r1.apply(&r1_seq).to_vec();
let mut trimmed_r1_qual = trim_r1.apply(&r1_qual).to_vec();
let mut trimmed_r2_seq = trim_r2.apply(&r2_seq).to_vec();
let mut trimmed_r2_qual = trim_r2.apply(&r2_qual).to_vec();
if let Some(ref corr) = corrector {
if let Some(corr_stats) = corr.correct_pair_into(
&trimmed_r1_seq, &trimmed_r1_qual,
&trimmed_r2_seq, &trimmed_r2_qual,
&mut correction_buffers,
) {
std::mem::swap(&mut trimmed_r1_seq, &mut correction_buffers.r1_seq);
std::mem::swap(&mut trimmed_r1_qual, &mut correction_buffers.r1_qual);
std::mem::swap(&mut trimmed_r2_seq, &mut correction_buffers.r2_seq);
std::mem::swap(&mut trimmed_r2_qual, &mut correction_buffers.r2_qual);
stats.correction_stats.merge(&corr_stats);
}
}
let decision_r1 = apply_filters(&r1_name, &trimmed_r1_seq, &trimmed_r1_qual, filter_config);
let decision_r2 = apply_filters(&r2_name, &trimmed_r2_seq, &trimmed_r2_qual, filter_config);
if decision_r1.is_fail() || decision_r2.is_fail() {
continue;
}
stats.reads_passed += 2;
stats.bases_after += trimmed_r1_seq.len() as u64 + trimmed_r2_seq.len() as u64;
qc_after.update_fast(&trimmed_r1_seq, &trimmed_r1_qual);
qc_after.update_fast(&trimmed_r2_seq, &trimmed_r2_qual);
let output_r1 = OwnedRecord::new(r1_name, trimmed_r1_seq, trimmed_r1_qual);
let output_r2 = OwnedRecord::new(r2_name, trimmed_r2_seq, trimmed_r2_qual);
output.push((output_r1, output_r2));
continue;
}
let r1_name: Cow<'_, [u8]> = Cow::Borrowed(&r1.name[..]);
let r2_name: Cow<'_, [u8]> = Cow::Borrowed(&r2.name[..]);
let r1_seq_ref: &[u8] = &r1.seq[..];
let r1_qual_ref: &[u8] = &r1.qual[..];
let r2_seq_ref: &[u8] = &r2.seq[..];
let r2_qual_ref: &[u8] = &r2.qual[..];
let trim_r1 = trim_config.apply_with_read_type(r1_seq_ref, r1_qual_ref, false);
let trim_r2 = trim_config.apply_with_read_type(r2_seq_ref, r2_qual_ref, true);
if trim_r1.is_empty() || trim_r2.is_empty() {
continue;
}
if !trim_config.check_length(trim_r1.len()) || !trim_config.check_length(trim_r2.len()) {
continue;
}
let mut trimmed_r1_seq = trim_r1.apply(r1_seq_ref).to_vec();
let mut trimmed_r1_qual = trim_r1.apply(r1_qual_ref).to_vec();
let mut trimmed_r2_seq = trim_r2.apply(r2_seq_ref).to_vec();
let mut trimmed_r2_qual = trim_r2.apply(r2_qual_ref).to_vec();
if let Some(ref corr) = corrector {
if let Some(corr_stats) = corr.correct_pair_into(
&trimmed_r1_seq, &trimmed_r1_qual,
&trimmed_r2_seq, &trimmed_r2_qual,
&mut correction_buffers,
) {
std::mem::swap(&mut trimmed_r1_seq, &mut correction_buffers.r1_seq);
std::mem::swap(&mut trimmed_r1_qual, &mut correction_buffers.r1_qual);
std::mem::swap(&mut trimmed_r2_seq, &mut correction_buffers.r2_seq);
std::mem::swap(&mut trimmed_r2_qual, &mut correction_buffers.r2_qual);
stats.correction_stats.merge(&corr_stats);
}
}
let decision_r1 = apply_filters(&r1_name, &trimmed_r1_seq, &trimmed_r1_qual, filter_config);
let decision_r2 = apply_filters(&r2_name, &trimmed_r2_seq, &trimmed_r2_qual, filter_config);
if decision_r1.is_fail() || decision_r2.is_fail() {
continue;
}
stats.reads_passed += 2;
stats.bases_after += trimmed_r1_seq.len() as u64 + trimmed_r2_seq.len() as u64;
qc_after.update_fast(&trimmed_r1_seq, &trimmed_r1_qual);
qc_after.update_fast(&trimmed_r2_seq, &trimmed_r2_qual);
let output_r1 = OwnedRecord::new(r1_name.into_owned(), trimmed_r1_seq, trimmed_r1_qual);
let output_r2 = OwnedRecord::new(r2_name.into_owned(), trimmed_r2_seq, trimmed_r2_qual);
output.push((output_r1, output_r2));
}
(output, qc_before, qc_after, stats, insert_size_est)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::{tempdir, NamedTempFile};
fn create_temp_fastq(contents: &[u8]) -> NamedTempFile {
let mut file = NamedTempFile::with_suffix(".fastq").unwrap();
file.write_all(contents).unwrap();
file.flush().unwrap();
file
}
fn make_qual(scores: &[u8]) -> Vec<u8> {
scores.iter().map(|&s| s + 33).collect()
}
const SAMPLE_SE_FASTQ: &[u8] = b"@read1
ACGTACGTACGTACGTACGTACGTACGTACGT
+
IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
@read2
TGCATGCATGCATGCATGCATGCATGCATGCA
+
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
@read3
GGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGG
+
IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
";
#[test]
fn test_pipeline_config_default() {
let config = PipelineConfig::default();
assert!(config.threads > 0);
assert_eq!(config.batch_size, 5000);
assert!(config.input_files.is_empty());
}
#[test]
fn test_pipeline_config_short_read() {
let config = PipelineConfig::short_read();
assert_eq!(config.mode, Mode::Short);
}
#[test]
fn test_pipeline_config_long_read() {
let config = PipelineConfig::long_read();
assert_eq!(config.mode, Mode::Long);
}
#[test]
fn test_pipeline_config_builder() {
let config = PipelineConfig::new()
.with_threads(4)
.with_batch_size(500)
.with_mode(Mode::Long);
assert_eq!(config.threads, 4);
assert_eq!(config.batch_size, 500);
assert_eq!(config.mode, Mode::Long);
}
#[test]
fn test_pipeline_config_is_paired_end() {
let se_config = PipelineConfig::new().with_input(PathBuf::from("r1.fastq"));
assert!(!se_config.is_paired_end());
let pe_config = PipelineConfig::new()
.with_paired_input(PathBuf::from("r1.fastq"), PathBuf::from("r2.fastq"));
assert!(pe_config.is_paired_end());
}
#[test]
fn test_worker_stats_default() {
let stats = WorkerStats::default();
assert_eq!(stats.reads_processed, 0);
assert_eq!(stats.reads_passed, 0);
}
#[test]
fn test_worker_stats_merge() {
let mut stats1 = WorkerStats {
reads_processed: 100,
reads_passed: 90,
bases_before: 10000,
bases_after: 8500,
..Default::default() };
let stats2 = WorkerStats {
reads_processed: 50,
reads_passed: 45,
bases_before: 5000,
bases_after: 4500,
..Default::default() };
stats1.merge(&stats2);
assert_eq!(stats1.reads_processed, 150);
assert_eq!(stats1.reads_passed, 135);
assert_eq!(stats1.bases_before, 15000);
assert_eq!(stats1.bases_after, 13000);
}
#[test]
fn test_worker_stats_pass_rate() {
let stats = WorkerStats {
reads_processed: 100,
reads_passed: 90,
bases_before: 0,
bases_after: 0,
..Default::default() };
assert!((stats.pass_rate() - 90.0).abs() < 0.001);
}
#[test]
fn test_worker_stats_pass_rate_zero() {
let stats = WorkerStats::default();
assert_eq!(stats.pass_rate(), 0.0);
}
#[test]
fn test_worker_stats_base_retention() {
let stats = WorkerStats {
reads_processed: 0,
reads_passed: 0,
bases_before: 10000,
bases_after: 8000,
..Default::default()
};
assert!((stats.base_retention_rate() - 80.0).abs() < 0.001);
}
#[test]
fn test_process_single_batch() {
let records = vec![
OwnedRecord::new(
b"read1".to_vec(),
b"ACGTACGTACGTACGTACGTACGTACGTACGT".to_vec(),
make_qual(&[30; 32]),
),
OwnedRecord::new(
b"read2".to_vec(),
b"TGCATGCATGCATGCATGCATGCATGCATGCA".to_vec(),
make_qual(&[30; 32]),
),
];
let trim_config = TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX));
let filter_config = FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter();
let (output, qc_before, qc_after, stats) =
process_single_batch(&records, &trim_config, &filter_config, &UmiProcessor::new(UmiConfig::disabled()), QcMode::Short, true, true, 20);
assert_eq!(output.len(), 2);
assert_eq!(stats.reads_processed, 2);
assert_eq!(stats.reads_passed, 2);
assert_eq!(qc_before.total_reads(), 2);
assert_eq!(qc_after.total_reads(), 2);
}
#[test]
fn test_process_single_batch_filtering() {
let records = vec![
OwnedRecord::new(
b"good".to_vec(),
b"ACGTACGTACGTACGTACGTACGTACGTACGT".to_vec(),
make_qual(&[30; 32]),
),
OwnedRecord::new(
b"short".to_vec(),
b"ACGT".to_vec(), make_qual(&[30; 4]),
),
];
let trim_config = TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX))
.with_length(crate::trim::LengthConfig::new().with_min_length(15));
let filter_config = FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter();
let (output, _, _, stats) =
process_single_batch(&records, &trim_config, &filter_config, &UmiProcessor::new(UmiConfig::disabled()), QcMode::Short, true, true, 20);
assert_eq!(output.len(), 1);
assert_eq!(stats.reads_processed, 2);
assert_eq!(stats.reads_passed, 1);
}
#[test]
fn test_process_paired_batch() {
let pairs = vec![(
OwnedRecord::new(
b"read1/1".to_vec(),
b"ACGTACGTACGTACGTACGTACGTACGTACGT".to_vec(),
make_qual(&[30; 32]),
),
OwnedRecord::new(
b"read1/2".to_vec(),
b"TGCATGCATGCATGCATGCATGCATGCATGCA".to_vec(),
make_qual(&[30; 32]),
),
)];
let trim_config = TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX));
let filter_config = FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter();
let (output, qc_before, qc_after, stats, _insert_size) =
process_paired_batch(&pairs, &trim_config, &filter_config, &UmiProcessor::new(UmiConfig::disabled()), &CorrectionConfig::new(), QcMode::Short, true, true, 20);
assert_eq!(output.len(), 1);
assert_eq!(stats.reads_processed, 2); assert_eq!(stats.reads_passed, 2);
assert_eq!(qc_before.total_reads(), 2);
assert_eq!(qc_after.total_reads(), 2);
}
#[test]
fn test_process_paired_batch_pair_discard() {
let pairs = vec![(
OwnedRecord::new(
b"read1/1".to_vec(),
b"ACGTACGTACGTACGTACGTACGTACGTACGT".to_vec(),
make_qual(&[30; 32]),
),
OwnedRecord::new(
b"read1/2".to_vec(),
b"ACGT".to_vec(), make_qual(&[30; 4]),
),
)];
let trim_config = TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX))
.with_length(crate::trim::LengthConfig::new().with_min_length(15));
let filter_config = FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter();
let (output, _, _, stats, _insert_size) =
process_paired_batch(&pairs, &trim_config, &filter_config, &UmiProcessor::new(UmiConfig::disabled()), &CorrectionConfig::new(), QcMode::Short, true, true, 20);
assert_eq!(output.len(), 0); assert_eq!(stats.reads_processed, 2);
assert_eq!(stats.reads_passed, 0);
}
#[test]
fn test_pipeline_executor_no_input() {
let config = PipelineConfig::new();
let executor = PipelineExecutor::new(config);
let result = executor.run();
assert!(result.is_err());
}
#[test]
fn test_pipeline_executor_single_end() {
let file = create_temp_fastq(SAMPLE_SE_FASTQ);
let dir = tempdir().unwrap();
let config = PipelineConfig::new()
.with_threads(2)
.with_batch_size(10)
.with_input(file.path().to_path_buf())
.with_output_prefix(dir.path().join("output"))
.with_filter_config(
FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter(),
)
.with_trim_config(
TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX)),
);
let executor = PipelineExecutor::new(config);
let result = executor.run();
assert!(result.is_ok(), "Pipeline failed: {:?}", result.err());
let result = result.unwrap();
assert_eq!(result.qc_before.total_reads, 3);
assert!(result.worker_stats.reads_passed >= 2); }
#[test]
fn test_pipeline_executor_paired_end() {
let r1_content = b"@read1/1
ACGTACGTACGTACGTACGTACGTACGTACGT
+
IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
@read2/1
TGCATGCATGCATGCATGCATGCATGCATGCA
+
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
";
let r2_content = b"@read1/2
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+
IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
@read2/2
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
+
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
";
let r1_file = create_temp_fastq(r1_content);
let r2_file = create_temp_fastq(r2_content);
let dir = tempdir().unwrap();
let config = PipelineConfig::new()
.with_threads(2)
.with_batch_size(10)
.with_paired_input(r1_file.path().to_path_buf(), r2_file.path().to_path_buf())
.with_output_prefix(dir.path().join("output"))
.with_filter_config(
FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter(),
)
.with_trim_config(
TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX)),
);
let executor = PipelineExecutor::new(config);
let result = executor.run();
assert!(result.is_ok(), "Pipeline failed: {:?}", result.err());
let result = result.unwrap();
assert_eq!(result.qc_before.total_reads, 4); assert!(result.output_files.len() == 2); }
#[test]
fn test_pipeline_result_reads_filtered() {
let result = PipelineResult {
qc_before: QcStats::new(QcMode::Short),
qc_after: QcStats::new(QcMode::Short),
worker_stats: WorkerStats {
reads_processed: 100,
reads_passed: 80,
bases_before: 10000,
bases_after: 8000, ..Default::default()
},
output_files: Vec::new(),
};
assert_eq!(result.reads_filtered(), 20);
assert!((result.pass_rate() - 80.0).abs() < 0.001);
}
#[test]
fn test_pipeline_qc_only_mode() {
let file = create_temp_fastq(SAMPLE_SE_FASTQ);
let config = PipelineConfig::new()
.with_threads(2)
.with_batch_size(10)
.with_input(file.path().to_path_buf())
.with_filter_config(
FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter(),
)
.with_trim_config(
TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX)),
);
let executor = PipelineExecutor::new(config);
let result = executor.run();
assert!(result.is_ok());
let result = result.unwrap();
assert!(result.output_files.is_empty()); assert_eq!(result.qc_before.total_reads, 3); }
#[test]
fn test_pipeline_executor_single_threaded_single_end() {
let file = create_temp_fastq(SAMPLE_SE_FASTQ);
let dir = tempdir().unwrap();
let config = PipelineConfig::new()
.with_threads(1) .with_batch_size(10)
.with_input(file.path().to_path_buf())
.with_output_prefix(dir.path().join("output"))
.with_filter_config(
FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter(),
)
.with_trim_config(
TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX)),
);
let executor = PipelineExecutor::new(config);
let result = executor.run();
assert!(result.is_ok(), "Single-threaded pipeline failed: {:?}", result.err());
let result = result.unwrap();
assert_eq!(result.qc_before.total_reads, 3);
assert!(result.worker_stats.reads_passed >= 2); }
#[test]
fn test_pipeline_executor_single_threaded_paired_end() {
let r1_content = b"@read1/1
ACGTACGTACGTACGTACGTACGTACGTACGT
+
IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
@read2/1
TGCATGCATGCATGCATGCATGCATGCATGCA
+
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
";
let r2_content = b"@read1/2
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+
IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII
@read2/2
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
+
HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH
";
let r1_file = create_temp_fastq(r1_content);
let r2_file = create_temp_fastq(r2_content);
let dir = tempdir().unwrap();
let config = PipelineConfig::new()
.with_threads(1) .with_batch_size(10)
.with_paired_input(r1_file.path().to_path_buf(), r2_file.path().to_path_buf())
.with_output_prefix(dir.path().join("output"))
.with_filter_config(
FilterConfig::new()
.with_min_length(15)
.without_quality_filter()
.without_complexity_filter(),
)
.with_trim_config(
TrimConfig::new()
.with_adapter(crate::trim::AdapterConfig::disabled())
.with_tail(crate::trim::TailConfig::new().with_min_length(usize::MAX)),
);
let executor = PipelineExecutor::new(config);
let result = executor.run();
assert!(result.is_ok(), "Single-threaded paired pipeline failed: {:?}", result.err());
let result = result.unwrap();
assert_eq!(result.qc_before.total_reads, 4); assert!(result.output_files.len() == 2); }
}