use crate::bam_io::{RawBamWriter, create_raw_bam_writer};
use crate::commands::command::Command;
use crate::commands::common::{
CompressionOptions, QueueMemoryOptions, SchedulerOptions, ThreadingOptions,
};
use crate::fastq::FastqSegment;
use crate::fastq::FastqSet;
use crate::fastq::ReadSetIterator;
use crate::grouper::FastqTemplate;
use crate::logging::OperationTimer;
use crate::progress::ProgressTracker;
use crate::sam::SamTag;
use crate::unified_pipeline::{FastqPipelineConfig, MemoryEstimate, run_fastq_pipeline};
use crate::validation::validate_file_exists;
use anyhow::{Result, bail, ensure};
use bstr::{BString, ByteSlice};
use clap::Parser;
use fgumi_raw_bam::UnmappedSamBuilder;
use fgumi_raw_bam::fields::flags;
use log::{debug, info};
use noodles_bgzf::io::MultithreadedReader;
#[cfg(test)]
use crate::bam_io::create_bam_reader;
use fgumi_simd_fastq::SimdFastqReader;
use noodles::sam::header::Header;
use noodles::sam::header::record::value::Map;
use noodles::sam::header::record::value::map::ReadGroup;
use noodles::sam::header::record::value::map::builder::Builder;
use noodles::sam::header::record::value::map::header::group_order;
use noodles::sam::header::record::value::map::header::sort_order;
use noodles::sam::header::record::value::map::read_group::tag as rg_tag;
use noodles::sam::header::record::value::{
Map as HeaderRecordMap,
map::{Header as HeaderRecord, Tag as HeaderTag},
};
use read_structure::{ReadStructure, SegmentType};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::str::FromStr;
const BUFFER_SIZE: usize = 1024 * 1024;
const QUALITY_DETECTION_SAMPLE_SIZE: usize = 400;
struct ExtractedBatch {
data: Vec<u8>,
num_records: u64,
}
impl MemoryEstimate for ExtractedBatch {
fn estimate_heap_size(&self) -> usize {
self.data.capacity()
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum CompressionFormat {
Bgzf,
Gzip,
Plain,
}
fn detect_compression_format(path: &Path) -> Result<CompressionFormat> {
let mut file = File::open(path)?;
let mut header = [0u8; 18];
use std::io::Read;
let bytes_read = file.read(&mut header)?;
if bytes_read < 2 {
return Ok(CompressionFormat::Plain);
}
if header[0] != 0x1f || header[1] != 0x8b {
return Ok(CompressionFormat::Plain);
}
if bytes_read >= 18 {
if header[2] == 0x08 && (header[3] & 0x04) != 0 {
let xlen = u16::from_le_bytes([header[10], header[11]]) as usize;
if xlen >= 6 {
if header[12] == b'B' && header[13] == b'C' {
return Ok(CompressionFormat::Bgzf);
}
}
}
}
Ok(CompressionFormat::Gzip)
}
fn open_fastq_reader(
path: &Path,
threads: usize,
async_reader: bool,
) -> Result<Box<dyn BufRead + Send>> {
use flate2::read::MultiGzDecoder;
let format = detect_compression_format(path)?;
let open_reader = |path: &Path| -> Result<Box<dyn std::io::Read + Send>> {
let file = File::open(path)?;
if async_reader {
crate::os_hints::advise_sequential(&file);
log::info!(
"async FASTQ reader enabled: spawning fgumi-prefetch thread for {}",
path.display()
);
Ok(Box::new(crate::prefetch_reader::PrefetchReader::from_file(file)))
} else {
Ok(Box::new(file))
}
};
match format {
CompressionFormat::Bgzf if threads > 1 => {
info!("Detected BGZF-compressed FASTQ, using {threads} decompression threads");
let reader = open_reader(path)?;
let worker_count = std::num::NonZero::new(threads).expect("threads > 1 checked above");
let reader = MultithreadedReader::with_worker_count(worker_count, reader);
Ok(Box::new(BufReader::with_capacity(BUFFER_SIZE, reader)))
}
CompressionFormat::Bgzf | CompressionFormat::Gzip => {
debug!("Detected {format:?}-compressed FASTQ, using single-threaded decompression");
let reader = open_reader(path)?;
Ok(Box::new(BufReader::with_capacity(
BUFFER_SIZE,
MultiGzDecoder::new(BufReader::with_capacity(BUFFER_SIZE, reader)),
)))
}
CompressionFormat::Plain => {
debug!("Detected uncompressed FASTQ");
let reader = open_reader(path)?;
Ok(Box::new(BufReader::with_capacity(BUFFER_SIZE, reader)))
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum QualityEncoding {
Standard, Illumina, }
impl QualityEncoding {
fn to_standard_numeric(self, quals: &[u8]) -> Vec<u8> {
match self {
QualityEncoding::Standard => quals.iter().map(|&q| q.saturating_sub(33)).collect(),
QualityEncoding::Illumina => quals.iter().map(|&q| q.saturating_sub(64)).collect(),
}
}
fn detect(records: &[Vec<u8>]) -> Result<Self> {
if records.is_empty() {
bail!("Cannot detect quality encoding: no records provided");
}
let mut min_qual = u8::MAX;
let mut max_qual = u8::MIN;
let mut total_bases = 0;
for qual in records {
if qual.is_empty() {
continue; }
for &q in qual {
min_qual = min_qual.min(q);
max_qual = max_qual.max(q);
total_bases += 1;
}
}
if total_bases == 0 {
return Ok(QualityEncoding::Standard);
}
if min_qual < 33 || max_qual > 126 {
bail!(
"Invalid quality scores detected: range [{min_qual}, {max_qual}]. \
Quality scores must be in the printable ASCII range (33-126)"
);
}
if min_qual < 59 {
Ok(QualityEncoding::Standard)
} else if min_qual >= 64 {
if max_qual >= 75 {
Ok(QualityEncoding::Illumina)
} else {
Ok(QualityEncoding::Standard)
}
} else {
Ok(QualityEncoding::Standard)
}
}
}
#[derive(Parser, Debug)]
#[command(
name = "extract",
author,
version,
about = "\x1b[38;5;30m[UMI EXTRACTION]\x1b[0m \x1b[36mExtract UMIs from FASTQ and create unmapped BAM\x1b[0m",
long_about = r#"
Generates an unmapped BAM file from FASTQ files with UMI extraction.
Takes in one or more FASTQ files (optionally gzipped), each representing a different sequencing
read (e.g. R1, R2, I1 or I2) and can use a set of read structures to allocate bases in those
reads to template reads, sample indices, unique molecular indices, or to designate bases to be
skipped over.
Only template bases will be retained as read bases (stored in the `SEQ` field) as specified by
the read structure.
## Read Structures
Read structures are made up of `<number><operator>` pairs much like the CIGAR string in BAM files.
Five kinds of operators are recognized:
1. `T` identifies a template read
2. `B` identifies a sample barcode read
3. `M` identifies a unique molecular index read
4. `C` identifies a cell barcode read
5. `S` identifies a set of bases that should be skipped or ignored
The last `<number><operator>` pair may be specified using a `+` sign instead of number to denote
"all remaining bases". This is useful if, e.g., FASTQs have been trimmed and contain reads of
varying length.
For example, to convert a paired-end run with an index read and where the first 5 bases of R1 are
a UMI and the second five bases are monotemplate:
fgumi extract --input r1.fq r2.fq i1.fq --read-structures 5M5S+T +T +B
Alternatively, if reads are fixed length:
fgumi extract --input r1.fq r2.fq i1.fq --read-structures 5M5S65T 75T 8B
## UMI Extraction
A read structure should be provided for each read of a template. For paired end reads, two read
structures should be specified. The tags to store the molecular indices will be associated with
the molecular index segment(s) in the read structure based on the order specified. If only one
molecular index tag is given, then the molecular indices will be concatenated and stored in that
tag. In the resulting BAM file each end of a pair will contain the same molecular index tags and
values.
UMIs may be extracted from the read sequences, the read names, or both. If
`--extract-umis-from-read-names` is specified, any UMIs present in the read names are extracted;
read names are expected to be `:`-separated and the UMI is taken from the **last** field. At
least 8 fields must be present — the standard Illumina shape
`@<instrument>:<run>:<flowcell>:<lane>:<tile>:<x>:<y>:<UMI>`. Names with 9+ fields (e.g.
produced by demultiplexers that fold the sample index into the colon-separated portion) are
also handled, with the UMI still coming from the last field. Any `+` characters in the
extracted UMI are normalized to `-`. If UMI segments are present in the read structures those
will also be extracted. If UMIs are present in both, the final UMIs are constructed by first
taking the UMIs from the read names, then adding a hyphen, then the UMIs extracted from the
reads.
"#
)]
#[command(verbatim_doc_comment)]
#[allow(clippy::struct_excessive_bools)]
pub struct Extract {
#[arg(long, short = 'i', required = true, num_args = 1..)]
inputs: Vec<PathBuf>,
#[arg(long, short = 'o', required = true)]
output: PathBuf,
#[arg(long, short = 'r', num_args = 0..)]
read_structures: Vec<ReadStructure>,
#[arg(long, short = 'q', conflicts_with = "extract_umis_from_read_names")]
store_umi_quals: bool,
#[arg(long, short = 'C')]
store_cell_quals: bool,
#[arg(long, short = 'Q')]
store_sample_barcode_qualities: bool,
#[arg(long, short = 'n')]
#[allow(clippy::struct_field_names)]
extract_umis_from_read_names: bool,
#[arg(long, short = 'a')]
annotate_read_names: bool,
#[arg(long, short = 's')]
single_tag: Option<SamTag>,
#[arg(long)]
clipping_attribute: Option<SamTag>,
#[arg(long, default_value = "A")]
read_group_id: String,
#[arg(long, required = true)]
sample: String,
#[arg(long, required = true)]
library: String,
#[arg(long, short = 'b')]
barcode: Option<String>,
#[arg(long, default_value = "illumina")]
platform: String,
#[arg(long)]
platform_unit: Option<String>,
#[arg(long)]
platform_model: Option<String>,
#[arg(long)]
sequencing_center: Option<String>,
#[arg(long)]
predicted_insert_size: Option<u32>,
#[arg(long)]
description: Option<String>,
#[arg(long, num_args = 0..)]
comment: Vec<String>,
#[arg(long)]
run_date: Option<String>,
#[command(flatten)]
pub threading: ThreadingOptions,
#[command(flatten)]
pub compression: CompressionOptions,
#[command(flatten)]
pub scheduler_opts: SchedulerOptions,
#[command(flatten)]
pub queue_memory: QueueMemoryOptions,
#[arg(long = "async-reader", default_value_t = false, hide = true)]
pub async_reader: bool,
}
impl Extract {
fn get_read_structures(&self) -> Result<Vec<ReadStructure>> {
if self.read_structures.is_empty() && (1..=2).contains(&self.inputs.len()) {
Ok(vec![ReadStructure::from_str("+T")?; self.inputs.len()])
} else {
Ok(self.read_structures.clone())
}
}
fn validate(&self) -> Result<()> {
let read_structures = self.get_read_structures()?;
ensure!(
self.inputs.len() == read_structures.len(),
"input and read-structure must be supplied the same number of times."
);
let template_count: usize = read_structures
.iter()
.map(|rs| rs.segments_by_type(SegmentType::Template).count())
.sum();
ensure!(
(1..=2).contains(&template_count),
"read structures must contain 1-2 template reads total."
);
ensure!(
!self.extract_umis_from_read_names || !self.store_umi_quals,
"Cannot store UMI qualities (--store-umi-quals) when also extracting UMIs from read names (--extract-umis-from-read-names)."
);
for input in &self.inputs {
validate_file_exists(input, "Input FASTQ")?;
}
for (i, rs) in read_structures.iter().enumerate() {
ensure!(!rs.segments().is_empty(), "Read structure {} is empty", i + 1);
}
const RESERVED_OUTPUT_TAGS: &[SamTag] =
&[SamTag::RX, SamTag::QX, SamTag::CB, SamTag::CY, SamTag::BC, SamTag::QT, SamTag::RG];
if let Some(tag) = self.single_tag {
ensure!(
!RESERVED_OUTPUT_TAGS.contains(&tag),
"Single tag cannot collide with tags emitted by extract \
(RX, QX, CB, CY, BC, QT, RG): {tag}"
);
}
Ok(())
}
fn add_to_read_group(
rg: Builder<ReadGroup>,
tag: noodles::sam::header::record::value::map::tag::Other<rg_tag::Standard>,
value: Option<&String>,
) -> Builder<ReadGroup> {
if let Some(v) = value { rg.insert(tag, v.clone()) } else { rg }
}
fn create_header(&self, command_line: &str) -> Result<Header> {
let mut header = Header::builder();
let HeaderTag::Other(so_tag) = HeaderTag::from([b'S', b'O']) else { unreachable!() };
let HeaderTag::Other(go_tag) = HeaderTag::from([b'G', b'O']) else { unreachable!() };
let map = HeaderRecordMap::<HeaderRecord>::builder()
.insert(so_tag, sort_order::UNSORTED)
.insert(go_tag, group_order::QUERY)
.build()?;
header = header.set_header(map);
for comment in &self.comment {
header = header.add_comment(comment.clone());
}
let mut rg = Map::<ReadGroup>::builder();
rg = Self::add_to_read_group(rg, rg_tag::SAMPLE, Some(&self.sample.clone()));
rg = Self::add_to_read_group(rg, rg_tag::LIBRARY, Some(&self.library.clone()));
rg = Self::add_to_read_group(rg, rg_tag::BARCODE, self.barcode.as_ref());
rg = Self::add_to_read_group(rg, rg_tag::PLATFORM, Some(&self.platform));
rg = Self::add_to_read_group(rg, rg_tag::PLATFORM_UNIT, self.platform_unit.as_ref());
rg = Self::add_to_read_group(rg, rg_tag::PLATFORM_MODEL, self.platform_model.as_ref());
rg =
Self::add_to_read_group(rg, rg_tag::SEQUENCING_CENTER, self.sequencing_center.as_ref());
rg = Self::add_to_read_group(
rg,
rg_tag::PREDICTED_MEDIAN_INSERT_SIZE,
self.predicted_insert_size.map(|i| i.to_string()).as_ref(),
);
rg = Self::add_to_read_group(rg, rg_tag::DESCRIPTION, self.description.as_ref());
rg = Self::add_to_read_group(rg, rg_tag::PRODUCED_AT, self.run_date.as_ref());
header = header.add_read_group(self.read_group_id.clone(), rg.build()?);
header = crate::commands::common::add_pg_to_builder(header, command_line)?;
Ok(header.build())
}
fn join_bytes_with_separator<'a>(
segments: impl Iterator<Item = &'a [u8]>,
separator: u8,
) -> BString {
let segments: Vec<&[u8]> = segments.collect();
if segments.is_empty() {
return BString::default();
}
let total_len: usize = segments.iter().map(|s| s.len()).sum();
let capacity = total_len + segments.len().saturating_sub(1); let mut result = Vec::with_capacity(capacity);
for (i, seg) in segments.iter().enumerate() {
if i > 0 {
result.push(separator);
}
result.extend_from_slice(seg);
}
BString::from(result)
}
fn extract_read_name_and_umi(
header: &Vec<u8>,
extract_umis: bool,
) -> (Vec<u8>, Option<Vec<u8>>) {
let name_bytes = if header.starts_with(b"@") { &header[1..] } else { header.as_slice() };
let name_part = name_bytes.find_byte(b' ').map_or(name_bytes, |pos| &name_bytes[..pos]);
if !extract_umis {
return (name_part.to_vec(), None);
}
let parts: Vec<&[u8]> = name_part.split(|&b| b == b':').collect();
if parts.len() >= 8 {
if let Some(last) = parts.last() {
if !last.is_empty() {
let mut umi = last.to_vec();
for byte in &mut umi {
if *byte == b'+' {
*byte = b'-';
}
}
return (name_part.to_vec(), Some(umi));
}
}
}
(name_part.to_vec(), None)
}
fn validate_read_names_match(read_sets: &[FastqSet]) -> Result<()> {
if read_sets.is_empty() {
return Ok(());
}
let first_header = &read_sets[0].header;
let first_name = if first_header.starts_with(b"@") {
&first_header[1..]
} else {
first_header.as_slice()
};
let first_name_part = strip_read_suffix_extract(first_name);
for (i, read_set) in read_sets.iter().enumerate().skip(1) {
let header = &read_set.header;
let name = if header.starts_with(b"@") { &header[1..] } else { header.as_slice() };
let name_part = strip_read_suffix_extract(name);
if name_part != first_name_part {
bail!(
"Read names do not match across FASTQs: '{}' vs '{}' (FASTQ index 0 vs {})",
String::from_utf8_lossy(first_name_part),
String::from_utf8_lossy(name_part),
i
);
}
}
Ok(())
}
#[allow(clippy::too_many_lines)]
fn make_raw_records(
&self,
read_set: &FastqSet,
encoding: QualityEncoding,
builder: &mut UnmappedSamBuilder,
writer: &mut RawBamWriter,
) -> Result<u64> {
let templates: Vec<&FastqSegment> = read_set.template_segments().collect();
let read_name = String::from_utf8_lossy(&read_set.header);
ensure!(!templates.is_empty(), "No template segments found for read: {read_name}");
let cell_barcode_bs = Self::join_bytes_with_separator(
read_set.cell_barcode_segments().map(|s| s.seq.as_slice()),
b'-',
);
let cell_quals_bs = Self::join_bytes_with_separator(
read_set.cell_barcode_segments().map(|s| s.quals.as_slice()),
b' ',
);
let sample_barcode_bs = Self::join_bytes_with_separator(
read_set.sample_barcode_segments().map(|s| s.seq.as_slice()),
b'-',
);
let sample_quals_bs = Self::join_bytes_with_separator(
read_set.sample_barcode_segments().map(|s| s.quals.as_slice()),
b' ',
);
let umi_bs = Self::join_bytes_with_separator(
read_set.molecular_barcode_segments().map(|s| s.seq.as_slice()),
b'-',
);
let umi_qual_bs = Self::join_bytes_with_separator(
read_set.molecular_barcode_segments().map(|s| s.quals.as_slice()),
b' ',
);
let (read_name_bytes, umi_from_name) =
Self::extract_read_name_and_umi(&read_set.header, self.extract_umis_from_read_names);
let final_umi_bs: BString = match (umi_bs.is_empty(), &umi_from_name) {
(true, Some(from_name)) => BString::from(from_name.as_slice()),
(true, None) => BString::default(),
(false, Some(from_name)) => {
let mut combined = Vec::with_capacity(from_name.len() + 1 + umi_bs.len());
combined.extend_from_slice(from_name);
combined.push(b'-');
combined.extend_from_slice(umi_bs.as_bytes());
BString::from(combined)
}
(false, None) => umi_bs,
};
let num_templates = templates.len();
let umi_tag: &[u8; 2] = &SamTag::RX;
let cell_tag: &[u8; 2] = &SamTag::CB;
for (index, template) in templates.iter().enumerate() {
let mut flag = flags::UNMAPPED;
if num_templates == 2 {
flag |= flags::PAIRED | flags::MATE_UNMAPPED;
if index == 0 {
flag |= flags::FIRST_SEGMENT;
} else {
flag |= flags::LAST_SEGMENT;
}
}
let annotated_name: Option<Vec<u8>> = if self.annotate_read_names
&& !final_umi_bs.is_empty()
{
let mut name = Vec::with_capacity(read_name_bytes.len() + 1 + final_umi_bs.len());
name.extend_from_slice(&read_name_bytes);
name.push(b'+');
name.extend_from_slice(final_umi_bs.as_bytes());
Some(name)
} else {
None
};
let final_read_name: &[u8] = annotated_name.as_deref().unwrap_or(&read_name_bytes);
if template.seq.is_empty() {
builder.build_record(final_read_name, flag, b"N", &[2u8]);
} else {
let numeric_quals = encoding.to_standard_numeric(&template.quals);
builder.build_record(final_read_name, flag, &template.seq, &numeric_quals);
}
builder.append_string_tag(&SamTag::RG, self.read_group_id.as_bytes());
if !cell_barcode_bs.is_empty() {
builder.append_string_tag(cell_tag, cell_barcode_bs.as_bytes());
}
if !cell_quals_bs.is_empty() && self.store_cell_quals {
builder.append_string_tag(&SamTag::CY, cell_quals_bs.as_bytes());
}
if !sample_barcode_bs.is_empty() {
builder.append_string_tag(&SamTag::BC, sample_barcode_bs.as_bytes());
}
if self.store_sample_barcode_qualities && !sample_quals_bs.is_empty() {
builder.append_string_tag(&SamTag::QT, sample_quals_bs.as_bytes());
}
if !final_umi_bs.is_empty() {
builder.append_string_tag(umi_tag, final_umi_bs.as_bytes());
if let Some(single_tag) = self.single_tag {
builder.append_string_tag(&single_tag, final_umi_bs.as_bytes());
}
if umi_from_name.is_none() && !umi_qual_bs.is_empty() && self.store_umi_quals {
builder.append_string_tag(&SamTag::QX, umi_qual_bs.as_bytes());
}
}
writer.write_raw_record(builder.as_bytes())?;
builder.clear();
}
Ok(num_templates as u64)
}
fn process_singlethreaded(
&self,
fq_iterators: &mut [ReadSetIterator],
writer: &mut RawBamWriter,
encoding: QualityEncoding,
) -> Result<u64> {
let progress = ProgressTracker::new("Processed records").with_interval(1_000_000);
let mut read_pair_count: u64 = 0;
let mut builder = UnmappedSamBuilder::new();
loop {
let mut next_read_sets = Vec::with_capacity(fq_iterators.len());
let mut saw_eof = false;
for iter in fq_iterators.iter_mut() {
match iter.next() {
Some(Ok(rec)) if !saw_eof => next_read_sets.push(rec),
Some(Ok(_)) => bail!("FASTQ sources out of sync: some ended before others"),
Some(Err(e)) => return Err(e),
None => saw_eof = true,
}
}
if saw_eof {
ensure!(
next_read_sets.is_empty(),
"FASTQ sources out of sync: some ended before others"
);
break;
}
Self::validate_read_names_match(&next_read_sets)?;
let read_set = FastqSet::combine_readsets(next_read_sets);
let num_records = self.make_raw_records(&read_set, encoding, &mut builder, writer)?;
read_pair_count += num_records;
progress.log_if_needed(num_records);
}
progress.log_final();
Ok(read_pair_count)
}
fn process_with_pipeline(
&self,
header: &Header,
output: Box<dyn std::io::Write + Send>,
encoding: QualityEncoding,
read_structures: &[ReadStructure],
) -> Result<u64> {
let all_bgzf = self.inputs.iter().all(|p| {
detect_compression_format(p).map(|f| f == CompressionFormat::Bgzf).unwrap_or(false)
});
let num_threads = self.threading.threads.unwrap_or(1);
let mut config =
FastqPipelineConfig::new(num_threads, all_bgzf, self.compression.compression_level)
.with_stats(self.scheduler_opts.collect_stats())
.with_scheduler_strategy(self.scheduler_opts.strategy())
.with_deadlock_timeout(self.scheduler_opts.deadlock_timeout_secs())
.with_deadlock_recovery(self.scheduler_opts.deadlock_recover_enabled())
.with_async_reader(self.async_reader);
let queue_memory_limit_bytes = self.queue_memory.calculate_memory_limit(num_threads)?;
config.queue_memory_limit = queue_memory_limit_bytes;
self.queue_memory.log_memory_config(num_threads, queue_memory_limit_bytes);
let decomp_threads = self.threading.num_threads().max(1);
let decompressed_readers: Option<Vec<Box<dyn BufRead + Send>>> = if all_bgzf {
None
} else {
Some(
self.inputs
.iter()
.map(|p| open_fastq_reader(p, decomp_threads, self.async_reader))
.collect::<Result<Vec<_>>>()?,
)
};
let read_structures = read_structures.to_vec();
let extract_config = ExtractConfig {
read_group_id: self.read_group_id.clone(),
store_umi_quals: self.store_umi_quals,
store_cell_quals: self.store_cell_quals,
single_tag: self.single_tag,
annotate_read_names: self.annotate_read_names,
extract_umis_from_read_names: self.extract_umis_from_read_names,
store_sample_barcode_qualities: self.store_sample_barcode_qualities,
};
let records_written = run_fastq_pipeline(
config,
&self.inputs,
decompressed_readers,
header,
output,
move |template: FastqTemplate| -> std::io::Result<ExtractedBatch> {
if template.records.len() != read_structures.len() {
log::warn!(
"Template has {} records but expected {} (read_structures.len())",
template.records.len(),
read_structures.len()
);
}
if template.records.len() >= 2 {
let base_name = strip_read_suffix_extract(template.records[0].name());
for (i, record) in template.records.iter().enumerate().skip(1) {
let other_base = strip_read_suffix_extract(record.name());
if base_name != other_base {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"FASTQ files out of sync: R1 has '{}', R{} has '{}'",
String::from_utf8_lossy(base_name),
i + 1,
String::from_utf8_lossy(other_base),
),
));
}
}
}
let mut fastq_sets: Vec<FastqSet> = Vec::with_capacity(template.records.len());
for (record, rs) in template.records.iter().zip(read_structures.iter()) {
let fastq_set = FastqSet::from_record_with_structure(
record.name(),
record.sequence(),
record.quality(),
rs,
&[], )
.map_err(std::io::Error::other)?;
fastq_sets.push(fastq_set);
}
let combined = FastqSet::combine_readsets(fastq_sets);
let result = make_raw_records_static(&combined, encoding, &extract_config)
.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string())
})?;
if result.num_records as usize != template.records.len() {
log::warn!(
"Template with {} FASTQ records produced {} BAM records",
template.records.len(),
result.num_records
);
}
Ok(result)
},
|batch: ExtractedBatch, _header: &Header, buffer: &mut Vec<u8>| {
buffer.extend_from_slice(&batch.data);
Ok(batch.num_records)
},
)?;
info!("Wrote {records_written} records via 7-step pipeline");
Ok(records_written)
}
}
#[derive(Clone)]
#[allow(clippy::struct_excessive_bools)]
struct ExtractConfig {
read_group_id: String,
store_umi_quals: bool,
store_cell_quals: bool,
single_tag: Option<SamTag>,
annotate_read_names: bool,
extract_umis_from_read_names: bool,
store_sample_barcode_qualities: bool,
}
fn make_raw_records_static(
read_set: &FastqSet,
encoding: QualityEncoding,
cfg: &ExtractConfig,
) -> Result<ExtractedBatch> {
let templates: Vec<&FastqSegment> = read_set.template_segments().collect();
let read_name = String::from_utf8_lossy(&read_set.header);
ensure!(!templates.is_empty(), "No template segments found for read: {read_name}");
let cell_barcode_bs = Extract::join_bytes_with_separator(
read_set.cell_barcode_segments().map(|s| s.seq.as_slice()),
b'-',
);
let cell_quals_bs = Extract::join_bytes_with_separator(
read_set.cell_barcode_segments().map(|s| s.quals.as_slice()),
b' ',
);
let sample_barcode_bs = Extract::join_bytes_with_separator(
read_set.sample_barcode_segments().map(|s| s.seq.as_slice()),
b'-',
);
let sample_quals_bs = Extract::join_bytes_with_separator(
read_set.sample_barcode_segments().map(|s| s.quals.as_slice()),
b' ',
);
let umi_bs = Extract::join_bytes_with_separator(
read_set.molecular_barcode_segments().map(|s| s.seq.as_slice()),
b'-',
);
let umi_qual_bs = Extract::join_bytes_with_separator(
read_set.molecular_barcode_segments().map(|s| s.quals.as_slice()),
b' ',
);
let (read_name_bytes, umi_from_name) =
Extract::extract_read_name_and_umi(&read_set.header, cfg.extract_umis_from_read_names);
let final_umi_bs: BString = match (umi_bs.is_empty(), &umi_from_name) {
(true, Some(from_name)) => BString::from(from_name.as_slice()),
(true, None) => BString::default(),
(false, Some(from_name)) => {
let mut combined = Vec::with_capacity(from_name.len() + 1 + umi_bs.len());
combined.extend_from_slice(from_name);
combined.push(b'-');
combined.extend_from_slice(umi_bs.as_bytes());
BString::from(combined)
}
(false, None) => umi_bs,
};
let num_templates = templates.len();
let mut builder = UnmappedSamBuilder::new();
let mut data = Vec::new();
for (index, template) in templates.iter().enumerate() {
let mut flag = flags::UNMAPPED;
if num_templates == 2 {
flag |= flags::PAIRED | flags::MATE_UNMAPPED;
if index == 0 {
flag |= flags::FIRST_SEGMENT;
} else {
flag |= flags::LAST_SEGMENT;
}
}
let annotated_name: Option<Vec<u8>> = if cfg.annotate_read_names && !final_umi_bs.is_empty()
{
let mut name = Vec::with_capacity(read_name_bytes.len() + 1 + final_umi_bs.len());
name.extend_from_slice(&read_name_bytes);
name.push(b'+');
name.extend_from_slice(final_umi_bs.as_bytes());
Some(name)
} else {
None
};
let final_read_name: &[u8] = annotated_name.as_deref().unwrap_or(&read_name_bytes);
if template.seq.is_empty() {
builder.build_record(final_read_name, flag, b"N", &[2u8]);
} else {
let numeric_quals = encoding.to_standard_numeric(&template.quals);
builder.build_record(final_read_name, flag, &template.seq, &numeric_quals);
}
builder.append_string_tag(&SamTag::RG, cfg.read_group_id.as_bytes());
if !cell_barcode_bs.is_empty() {
builder.append_string_tag(&SamTag::CB, cell_barcode_bs.as_bytes());
}
if !cell_quals_bs.is_empty() && cfg.store_cell_quals {
builder.append_string_tag(&SamTag::CY, cell_quals_bs.as_bytes());
}
if !sample_barcode_bs.is_empty() {
builder.append_string_tag(&SamTag::BC, sample_barcode_bs.as_bytes());
}
if cfg.store_sample_barcode_qualities && !sample_quals_bs.is_empty() {
builder.append_string_tag(&SamTag::QT, sample_quals_bs.as_bytes());
}
if !final_umi_bs.is_empty() {
builder.append_string_tag(&SamTag::RX, final_umi_bs.as_bytes());
if let Some(st) = cfg.single_tag {
builder.append_string_tag(&st, final_umi_bs.as_bytes());
}
if umi_from_name.is_none() && !umi_qual_bs.is_empty() && cfg.store_umi_quals {
builder.append_string_tag(&SamTag::QX, umi_qual_bs.as_bytes());
}
}
builder.write_with_block_size(&mut data);
builder.clear();
}
Ok(ExtractedBatch { data, num_records: num_templates as u64 })
}
impl Command for Extract {
fn execute(&self, command_line: &str) -> Result<()> {
self.validate()?;
let timer = OperationTimer::new("Extracting UMIs");
let read_structures = self.get_read_structures()?;
let mut sample_quals = Vec::new();
let mut temp_reader = SimdFastqReader::with_capacity(
open_fastq_reader(&self.inputs[0], 1, self.async_reader)?,
BUFFER_SIZE,
);
for _i in 0..QUALITY_DETECTION_SAMPLE_SIZE {
match temp_reader.next() {
Some(Ok(rec)) => sample_quals.push(rec.quality),
Some(Err(e)) => return Err(e.into()),
None => break,
}
}
let encoding = QualityEncoding::detect(&sample_quals)?;
let header = self.create_header(command_line)?;
let records_written = if self.threading.is_parallel() {
let output: Box<dyn std::io::Write + Send> =
Box::new(std::io::BufWriter::new(File::create(&self.output)?));
self.process_with_pipeline(&header, output, encoding, &read_structures)?
} else {
let decomp_threads = self.threading.num_threads().max(1);
let fq_readers: Vec<Box<dyn BufRead + Send>> = self
.inputs
.iter()
.map(|p| open_fastq_reader(p, decomp_threads, self.async_reader))
.collect::<Result<Vec<_>>>()?;
let fq_sources: Vec<SimdFastqReader<Box<dyn BufRead + Send>>> = fq_readers
.into_iter()
.map(|fq| SimdFastqReader::with_capacity(fq, BUFFER_SIZE))
.collect();
let fq_iterators: Vec<ReadSetIterator> = fq_sources
.into_iter()
.zip(read_structures.iter())
.map(|(source, rs)| ReadSetIterator::new(rs.clone(), source, Vec::new()))
.collect();
let writer_threads = self.threading.num_threads();
let mut writer = create_raw_bam_writer(
&self.output,
&header,
writer_threads,
self.compression.compression_level,
)?;
let mut fq_iterators = fq_iterators;
let count = self.process_singlethreaded(&mut fq_iterators, &mut writer, encoding)?;
writer.finish()?;
count
};
timer.log_completion(records_written);
Ok(())
}
}
fn strip_read_suffix_extract(name: &[u8]) -> &[u8] {
let name = if let Some(space_pos) = name.iter().position(|&b| b == b' ') {
&name[..space_pos]
} else {
name
};
if name.len() >= 2 {
let last = name[name.len() - 1];
let sep = name[name.len() - 2];
if (last == b'1' || last == b'2')
&& (sep == b'/' || sep == b'.' || sep == b'_' || sep == b':')
{
return &name[..name.len() - 2];
}
}
name
}
#[cfg(test)]
mod tests {
use super::*;
use bstr::BString;
use noodles::sam::alignment::RecordBuf;
use noodles::sam::alignment::record::data::field::Tag;
use noodles::sam::alignment::record_buf::data::field::Value as RecordBufValue;
use rstest::rstest;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
fn create_fastq(dir: &TempDir, name: &str, records: &[(&str, &str, &str)]) -> PathBuf {
let path = dir.path().join(name);
let mut file = File::create(&path).expect("failed to create file");
for (name, seq, qual) in records {
writeln!(file, "@{name}").expect("failed to write line");
writeln!(file, "{seq}").expect("failed to write line");
writeln!(file, "+").expect("failed to write line");
writeln!(file, "{qual}").expect("failed to write line");
}
path
}
fn read_bam_records(path: &PathBuf) -> Vec<RecordBuf> {
let (mut reader, header) = create_bam_reader(path, 1).expect("failed to create BAM reader");
let mut records = Vec::new();
for result in reader.record_bufs(&header) {
records.push(result.expect("failed to read BAM record"));
}
records
}
fn get_tag_string(record: &RecordBuf, tag_name: &str) -> Option<String> {
let tag_bytes = tag_name.as_bytes();
let tag = Tag::from([tag_bytes[0], tag_bytes[1]]);
record.data().get(&tag).and_then(|value| match value {
RecordBufValue::String(s) => Some(String::from_utf8_lossy(s.as_ref()).to_string()),
_ => None,
})
}
#[test]
fn test_single_fastq_no_read_structure() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[("q1", "AAAAAAAAAA", "=========="), ("q2", "CCCCCCCCCC", "##########")],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "foo".to_string(),
library: "bar".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let q1 = &recs[0];
assert_eq!(q1.name().map(|n| n.as_bytes()), Some(b"q1".as_slice()));
assert!(!q1.flags().is_segmented());
assert_eq!(q1.sequence().as_ref(), b"AAAAAAAAAA");
assert_eq!(q1.quality_scores().as_ref(), &[28; 10]);
let q2 = &recs[1];
assert_eq!(q2.name().map(|n| n.as_bytes()), Some(b"q2".as_slice()));
assert!(!q2.flags().is_segmented());
assert_eq!(q2.sequence().as_ref(), b"CCCCCCCCCC");
assert_eq!(q2.quality_scores().as_ref(), &[2; 10]);
}
#[test]
fn test_paired_end_no_read_structures() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "pip".to_string(),
library: "pop".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(r1.name().map(|n| n.as_bytes()), Some(b"q1".as_slice()));
assert!(r1.flags().is_segmented());
assert!(r1.flags().is_first_segment());
assert!(!r1.flags().is_last_segment());
assert_eq!(r1.sequence().as_ref(), b"AAAAAAAAAA");
assert_eq!(r1.quality_scores().as_ref(), &[28; 10]);
let r2 = &recs[1];
assert_eq!(r2.name().map(|n| n.as_bytes()), Some(b"q1".as_slice()));
assert!(r2.flags().is_segmented());
assert!(!r2.flags().is_first_segment());
assert!(r2.flags().is_last_segment());
assert_eq!(r2.sequence().as_ref(), b"CCCCCCCCCC");
assert_eq!(r2.quality_scores().as_ref(), &[2; 10]);
}
#[test]
fn test_paired_end_ignore_umi_qual_tag_when_no_umis() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![],
store_umi_quals: true,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "pip".to_string(),
library: "pop".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert!(get_tag_string(&recs[0], "RX").is_none());
assert!(get_tag_string(&recs[0], "QX").is_none());
}
#[test]
fn test_paired_end_with_inline_umi() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "ACGTAAAAAA", "IIII======")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("4M+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(r1.name().map(|n| n.as_bytes()), Some(b"q1".as_slice()));
assert_eq!(r1.sequence().as_ref(), b"AAAAAA");
assert_eq!(r1.quality_scores().as_ref(), &[28; 6]);
assert_eq!(get_tag_string(r1, "RX"), Some("ACGT".to_string()));
let r2 = &recs[1];
assert_eq!(r2.name().map(|n| n.as_bytes()), Some(b"q1".as_slice()));
assert_eq!(r2.sequence().as_ref(), b"CCCCCCCCCC");
assert_eq!(r2.quality_scores().as_ref(), &[2; 10]);
assert_eq!(get_tag_string(r2, "RX"), Some("ACGT".to_string()));
}
#[test]
fn test_paired_end_with_inline_umi_keep_qualities() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "ACGTAAAAAA", "IIII======")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("4M+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
],
store_umi_quals: true,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("ACGT".to_string()));
assert_eq!(get_tag_string(&recs[0], "QX"), Some("IIII".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("ACGT".to_string()));
assert_eq!(get_tag_string(&recs[1], "QX"), Some("IIII".to_string()));
}
#[test]
fn test_complex_read_structures_multiple_segments() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAACCCTTTAAAAA", "==============")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "GGGTTTAAACCCCC", "##############")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("3B3M3B5T").expect("valid read structure"),
ReadStructure::from_str("3B3M3B5T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(r1.sequence().as_ref(), b"AAAAA");
assert_eq!(r1.quality_scores().as_ref(), &[28; 5]);
assert_eq!(get_tag_string(r1, "RX"), Some("CCC-TTT".to_string()));
assert_eq!(get_tag_string(r1, "BC"), Some("AAA-TTT-GGG-AAA".to_string()));
let r2 = &recs[1];
assert_eq!(r2.sequence().as_ref(), b"CCCCC");
assert_eq!(r2.quality_scores().as_ref(), &[2; 5]);
assert_eq!(get_tag_string(r2, "RX"), Some("CCC-TTT".to_string()));
assert_eq!(get_tag_string(r2, "BC"), Some("AAA-TTT-GGG-AAA".to_string()));
}
#[test]
fn test_complex_read_structures_with_umi_qualities() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAACCCTTTAAAAA", "===III========")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "GGGTTTAAACCCCC", "###JJJ########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("3B3M3B5T").expect("valid read structure"),
ReadStructure::from_str("3B3M3B5T").expect("valid read structure"),
],
store_umi_quals: true,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("CCC-TTT".to_string()));
assert_eq!(get_tag_string(&recs[0], "QX"), Some("III JJJ".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("CCC-TTT".to_string()));
assert_eq!(get_tag_string(&recs[1], "QX"), Some("III JJJ".to_string()));
}
#[test]
fn test_four_fastqs() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let r3 = create_fastq(&tmp, "r3.fq", &[("q1", "ACGT", "????")]);
let r4 = create_fastq(&tmp, "r4.fq", &[("q1", "GAGA", "????")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2, r3, r4],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
ReadStructure::from_str("4B").expect("valid read structure"),
ReadStructure::from_str("4M").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(r1.sequence().as_ref(), b"AAAAAAAAAA");
assert_eq!(r1.quality_scores().as_ref(), &[28; 10]);
assert_eq!(get_tag_string(r1, "RX"), Some("GAGA".to_string()));
assert_eq!(get_tag_string(r1, "BC"), Some("ACGT".to_string()));
let r2 = &recs[1];
assert_eq!(r2.sequence().as_ref(), b"CCCCCCCCCC");
assert_eq!(r2.quality_scores().as_ref(), &[2; 10]);
assert_eq!(get_tag_string(r2, "RX"), Some("GAGA".to_string()));
assert_eq!(get_tag_string(r2, "BC"), Some("ACGT".to_string()));
}
#[test]
fn test_four_fastqs_with_umi_qualities() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let r3 = create_fastq(&tmp, "r3.fq", &[("q1", "ACGT", "????")]);
let r4 = create_fastq(&tmp, "r4.fq", &[("q1", "GAGA", "????")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2, r3, r4],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
ReadStructure::from_str("4B").expect("valid read structure"),
ReadStructure::from_str("4M").expect("valid read structure"),
],
store_umi_quals: true,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("GAGA".to_string()));
assert_eq!(get_tag_string(&recs[0], "QX"), Some("????".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("GAGA".to_string()));
assert_eq!(get_tag_string(&recs[1], "QX"), Some("????".to_string()));
}
#[test]
fn test_header_metadata() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("10T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "MyRG".to_string(),
sample: "foo".to_string(),
library: "bar".to_string(),
barcode: Some("TATA-GAGA".to_string()),
platform: "Illumina".to_string(),
platform_unit: Some("pee-eww".to_string()),
platform_model: Some("hiseq2500".to_string()),
sequencing_center: Some("nowhere".to_string()),
predicted_insert_size: Some(300),
description: Some("Some reads!".to_string()),
comment: vec!["hello world".to_string(), "comment two".to_string()],
run_date: Some("2024-01-01".to_string()),
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let (_reader, header) = create_bam_reader(&output, 1).expect("failed to create BAM reader");
let comments: Vec<String> =
header.comments().iter().map(std::string::ToString::to_string).collect();
assert_eq!(comments, vec!["hello world", "comment two"]);
let rg_id = BString::from("MyRG");
let rg = header.read_groups().get(&rg_id);
assert!(rg.is_some(), "Read group MyRG should exist");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 1);
assert_eq!(get_tag_string(&recs[0], "RG"), Some("MyRG".to_string()));
}
#[test]
fn test_zero_length_reads() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[("q1", "AAAAAAAAAA", "=========="), ("q2", "", ""), ("q3", "GGGGGG", "IIIIII")],
);
let r2 = create_fastq(
&tmp,
"r2.fq",
&[("q1", "TTTTTTTTTT", "~~~~~~~~~~"), ("q2", "", ""), ("q3", "", "")],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("+T").expect("valid read structure"), ReadStructure::from_str("+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 6, "Should have 6 records (3 pairs)");
let q2_r1 = &records[2]; let q2_r2 = &records[3];
assert_eq!(q2_r1.sequence().as_ref(), b"N", "Zero-length R1 should become 'N'");
assert_eq!(q2_r1.quality_scores().as_ref(), &[2u8], "Zero-length R1 should have Q2");
assert_eq!(q2_r2.sequence().as_ref(), b"N", "Zero-length R2 should become 'N'");
assert_eq!(q2_r2.quality_scores().as_ref(), &[2u8], "Zero-length R2 should have Q2");
let q3_r1 = &records[4]; let q3_r2 = &records[5];
assert_eq!(q3_r1.sequence().as_ref(), b"GGGGGG", "Non-zero R1 should retain sequence");
assert_eq!(q3_r2.sequence().as_ref(), b"N", "Zero-length R2 should become 'N'");
assert_eq!(q3_r2.quality_scores().as_ref(), &[2u8], "Zero-length R2 should have Q2");
}
#[test]
fn test_zero_length_reads_with_fixed_structure_errors() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "", "")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("10T").expect("valid read structure"), ],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
let err = extract
.execute("test")
.expect_err("should error for zero-length read with fixed structure");
assert!(err.to_string().contains("had too few bases to demux"));
}
#[test]
fn test_extract_sample_barcode_qualities() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1:2:3:4:5:6:7", "AAACCCAAAA", "ABCDEFGHIJ"),
("q2:2:3:4:5:6:7", "TAANNNAAAA", "BCDEFGHIJK"),
("q3:2:3:4:5:6:7", "GAACCCTCGA", "CDEFGHIJKL"),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1.clone()],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("3B3M3B+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: true,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 3);
assert_eq!(get_tag_string(&recs[0], "BC"), Some("AAA-AAA".to_string()));
assert_eq!(get_tag_string(&recs[1], "BC"), Some("TAA-AAA".to_string()));
assert_eq!(get_tag_string(&recs[2], "BC"), Some("GAA-TCG".to_string()));
assert_eq!(get_tag_string(&recs[0], "QT"), Some("ABC GHI".to_string()));
assert_eq!(get_tag_string(&recs[1], "QT"), Some("BCD HIJ".to_string()));
assert_eq!(get_tag_string(&recs[2], "QT"), Some("CDE IJK".to_string()));
let output2 = tmp.path().join("output2.bam");
let extract2 = Extract {
inputs: vec![r1],
output: output2.clone(),
read_structures: vec![
ReadStructure::from_str("3B3M3B+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract2.execute("test").expect("execute should succeed");
let recs2 = read_bam_records(&output2);
assert_eq!(recs2.len(), 3);
assert!(get_tag_string(&recs2[0], "QT").is_none());
assert!(get_tag_string(&recs2[1], "QT").is_none());
assert!(get_tag_string(&recs2[2], "QT").is_none());
}
#[test]
fn test_extract_umis_from_read_names() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1:2:3:4:5:6:7:ACGT", "AAAAAAAAAA", "=========="),
("q2:2:3:4:5:6:7:TTGA", "TAAAAAAAAA", "=========="),
("q3:2:3:4:5:6:7", "TAAAAAAAAA", "=========="),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: true,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 3);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("ACGT".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("TTGA".to_string()));
assert!(get_tag_string(&recs[2], "RX").is_none());
}
#[test]
fn test_extract_umis_from_read_names_and_sequences() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1:2:3:4:5:6:7:ACGT+CGTA", "GGNCCGAAAAAAA", "============="),
("q2:2:3:4:5:6:7:TTGA+TAAT", "TANAACAAAAAAA", "============="),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("2M1S2M+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: true,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("ACGT-CGTA-GG-CC".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("TTGA-TAAT-TA-AA".to_string()));
}
#[test]
fn test_extract_read_name_and_umi_nine_fields_takes_last() {
let header =
b"@LH00620:304:23LLHJLT4:7:1101:2578:1070:CAATCTATAA+rTTACATAGTT:TCNGCG".to_vec();
let (name, umi) = Extract::extract_read_name_and_umi(&header, true);
assert_eq!(
name,
b"LH00620:304:23LLHJLT4:7:1101:2578:1070:CAATCTATAA+rTTACATAGTT:TCNGCG".to_vec()
);
assert_eq!(umi, Some(b"TCNGCG".to_vec()));
}
#[test]
fn test_extract_read_name_and_umi_eight_fields_takes_last() {
let header = b"@q1:2:3:4:5:6:7:ACGT".to_vec();
let (name, umi) = Extract::extract_read_name_and_umi(&header, true);
assert_eq!(name, b"q1:2:3:4:5:6:7:ACGT".to_vec());
assert_eq!(umi, Some(b"ACGT".to_vec()));
}
#[test]
fn test_extract_read_name_and_umi_seven_fields_returns_none() {
let header = b"@q1:2:3:4:5:6:7".to_vec();
let (name, umi) = Extract::extract_read_name_and_umi(&header, true);
assert_eq!(name, b"q1:2:3:4:5:6:7".to_vec());
assert_eq!(umi, None);
}
#[test]
fn test_extract_read_name_and_umi_normalizes_plus_to_hyphen() {
let header = b"@q1:2:3:4:5:6:7:ACGT+CGTA".to_vec();
let (_name, umi) = Extract::extract_read_name_and_umi(&header, true);
assert_eq!(umi, Some(b"ACGT-CGTA".to_vec()));
}
#[test]
fn test_extract_read_name_and_umi_normalizes_plus_in_nine_fields() {
let header = b"@A:B:C:D:E:F:G:CAATCTATAA+TTACATAGTT:ACGT+CGTA".to_vec();
let (_name, umi) = Extract::extract_read_name_and_umi(&header, true);
assert_eq!(umi, Some(b"ACGT-CGTA".to_vec()));
}
#[test]
fn test_extract_read_name_and_umi_strips_space_comment() {
let header = b"@q1:2:3:4:5:6:7:ACGT 1:N:0:NNNN".to_vec();
let (name, umi) = Extract::extract_read_name_and_umi(&header, true);
assert_eq!(name, b"q1:2:3:4:5:6:7:ACGT".to_vec());
assert_eq!(umi, Some(b"ACGT".to_vec()));
}
#[test]
fn test_extract_read_name_and_umi_extract_false_returns_none() {
let header = b"@q1:2:3:4:5:6:7:ACGT".to_vec();
let (name, umi) = Extract::extract_read_name_and_umi(&header, false);
assert_eq!(name, b"q1:2:3:4:5:6:7:ACGT".to_vec());
assert_eq!(umi, None);
}
#[test]
fn test_extract_umis_from_read_names_nine_fields_via_execute() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
(
"LH00620:304:23LLHJLT4:7:1101:2578:1070:CAATCTATAA+rTTACATAGTT:TCNGCG",
"AAAAAAAAAA",
"==========",
),
(
"LH00620:304:23LLHJLT4:7:1101:3565:1070:CAATCTATAA+rTTACATAGTT:TTNCCT",
"TAAAAAAAAA",
"==========",
),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: true,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("TCNGCG".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("TTNCCT".to_string()));
}
#[test]
fn test_extract_cell_barcodes() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1:2:3:4:5:6:7:ACGT+CGTA", "GGNCCGAAAAAAA", "============="),
("q2:2:3:4:5:6:7:TTGA+TAAT", "TANAACAAAAAAA", "============="),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("2C1S2C+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: true,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
assert_eq!(get_tag_string(&recs[0], "CB"), Some("GG-CC".to_string()));
assert_eq!(get_tag_string(&recs[0], "CY"), Some("== ==".to_string()));
assert_eq!(get_tag_string(&recs[1], "CB"), Some("TA-AA".to_string()));
assert_eq!(get_tag_string(&recs[1], "CY"), Some("== ==".to_string()));
}
#[test]
#[should_panic(expected = "Read names do not match")]
fn test_fail_mismatched_read_names() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("x1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output,
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
}
#[test]
#[should_panic(expected = "out of sync")]
fn test_fail_mismatched_read_counts() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[("q1", "AAAAAAAAAA", "=========="), ("q2", "TTTTTTTTTT", "??????????")],
);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output,
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
}
#[test]
#[should_panic(expected = "must be supplied the same number of times")]
fn test_fail_mismatched_inputs_and_read_structures() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output,
read_structures: vec![
ReadStructure::from_str("+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
}
#[test]
fn test_annotate_read_names_with_umis() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "ACGTAAAAAA", "IIII======")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("4M+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: true, single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(r1.name().map(|n| n.as_bytes()), Some(b"q1+ACGT".as_slice()));
assert_eq!(get_tag_string(r1, "RX"), Some("ACGT".to_string()));
let r2 = &recs[1];
assert_eq!(r2.name().map(|n| n.as_bytes()), Some(b"q1+ACGT".as_slice()));
assert_eq!(get_tag_string(r2, "RX"), Some("ACGT".to_string()));
}
#[test]
fn test_annotate_read_names_no_umis() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: true, single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 1);
let r1 = &recs[0];
assert_eq!(r1.name().map(|n| n.as_bytes()), Some(b"q1".as_slice()));
}
#[test]
fn test_single_tag() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAACCCTTTAAAAA", "==============")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "GGGTTTAAACCCCC", "##############")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("3B3M3B5T").expect("valid read structure"),
ReadStructure::from_str("3B3M3B5T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: Some("ZU".parse().expect("valid tag")), clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(get_tag_string(r1, "RX"), Some("CCC-TTT".to_string()));
assert_eq!(get_tag_string(r1, "ZU"), Some("CCC-TTT".to_string()));
let r2 = &recs[1];
assert_eq!(get_tag_string(r2, "RX"), Some("CCC-TTT".to_string()));
assert_eq!(get_tag_string(r2, "ZU"), Some("CCC-TTT".to_string()));
}
#[test]
#[should_panic(expected = "Single tag cannot collide with tags emitted by extract")]
fn test_single_tag_same_as_umi_tag() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "ACGTAAAAAA", "IIII======")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output,
read_structures: vec![ReadStructure::from_str("4M+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: Some(SamTag::RX), clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
}
#[test]
fn test_combined_annotate_read_names_and_single_tag() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "ACGTAAAAAA", "IIII======")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "CCCCCCCCCC", "##########")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("4M+T").expect("valid read structure"),
ReadStructure::from_str("+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: true, single_tag: Some("ZU".parse().expect("valid tag")),
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 2);
let r1 = &recs[0];
assert_eq!(r1.name().map(|n| n.as_bytes()), Some(b"q1+ACGT".as_slice()));
assert_eq!(get_tag_string(r1, "RX"), Some("ACGT".to_string()));
assert_eq!(get_tag_string(r1, "ZU"), Some("ACGT".to_string()));
let r2 = &recs[1];
assert_eq!(r2.name().map(|n| n.as_bytes()), Some(b"q1+ACGT".as_slice()));
assert_eq!(get_tag_string(r2, "RX"), Some("ACGT".to_string()));
assert_eq!(get_tag_string(r2, "ZU"), Some("ACGT".to_string()));
}
#[test]
fn test_fail_read_too_short_for_structure() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAAAAAAA", "==========")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output,
read_structures: vec![ReadStructure::from_str("8M8S+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
let err =
extract.execute("test").expect_err("should error for read too short for structure");
assert!(err.to_string().contains("had too few bases to demux"));
}
#[test]
fn test_variable_length_reads_with_plus() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1", "AAAATTTTCCCCGGGGAAAAA", "====================="), ("q2", "AAAATTTTCCCCGGGGAAAAATTTT", "========================="), ("q3", "AAAATTTTCCCCGGGG", "================"), ],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("4M4B4S+T").expect("valid read structure"),
],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let recs = read_bam_records(&output);
assert_eq!(recs.len(), 3);
assert_eq!(recs[0].sequence().len(), 9); assert_eq!(recs[1].sequence().len(), 13); assert_eq!(recs[2].sequence().len(), 4);
assert_eq!(get_tag_string(&recs[0], "RX"), Some("AAAA".to_string()));
assert_eq!(get_tag_string(&recs[1], "RX"), Some("AAAA".to_string()));
assert_eq!(get_tag_string(&recs[2], "RX"), Some("AAAA".to_string()));
}
#[test]
fn test_quality_encoding_detection_phred33() {
let records = vec![
vec![33u8, 40, 50, 60, 70, 80], vec![35u8, 45, 55, 65, 75, 85], vec![40u8, 50, 60, 70, 80, 90], ];
let encoding =
QualityEncoding::detect(&records).expect("quality encoding detection should succeed");
assert_eq!(encoding, QualityEncoding::Standard);
}
#[test]
fn test_quality_encoding_detection_phred64() {
let records = vec![
vec![64u8, 70, 80, 90, 100], vec![65u8, 75, 85, 95, 105], vec![70u8, 80, 90, 100, 110], ];
let encoding =
QualityEncoding::detect(&records).expect("quality encoding detection should succeed");
assert_eq!(encoding, QualityEncoding::Illumina);
}
#[test]
fn test_quality_encoding_detection_high_quality_phred33() {
let records = vec![
vec![64u8, 65, 66, 67, 68], vec![64u8, 65, 66, 67, 69], ];
let encoding =
QualityEncoding::detect(&records).expect("quality encoding detection should succeed");
assert_eq!(encoding, QualityEncoding::Standard);
}
#[test]
fn test_quality_encoding_detection_empty_reads() {
let records = vec![vec![], vec![], vec![]];
let encoding =
QualityEncoding::detect(&records).expect("quality encoding detection should succeed");
assert_eq!(encoding, QualityEncoding::Standard);
}
#[test]
fn test_quality_encoding_detection_mixed_empty_and_valid() {
let records = vec![
vec![], vec![40u8, 50, 60], vec![], vec![45u8, 55, 65], ];
let encoding =
QualityEncoding::detect(&records).expect("quality encoding detection should succeed");
assert_eq!(encoding, QualityEncoding::Standard);
}
#[test]
fn test_quality_encoding_detection_empty_input() {
let records: Vec<Vec<u8>> = vec![];
let result = QualityEncoding::detect(&records);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("no records provided"));
}
#[test]
fn test_quality_encoding_detection_invalid_low() {
let records = vec![vec![20u8, 30, 40, 50]];
let result = QualityEncoding::detect(&records);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Invalid quality scores"));
assert!(err_msg.contains("20"));
}
#[test]
fn test_quality_encoding_detection_invalid_high() {
let records = vec![vec![50u8, 60, 70, 127]];
let result = QualityEncoding::detect(&records);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Invalid quality scores"));
assert!(err_msg.contains("127"));
}
#[test]
fn test_quality_encoding_detection_ambiguous_range() {
let records = vec![
vec![59u8, 60, 61, 62, 63], vec![60u8, 61, 62], ];
let encoding =
QualityEncoding::detect(&records).expect("quality encoding detection should succeed");
assert_eq!(encoding, QualityEncoding::Standard);
}
#[test]
fn test_quality_encoding_to_standard_numeric_phred33() {
let encoding = QualityEncoding::Standard;
let quals = vec![33u8, 43, 53, 63, 73]; let numeric = encoding.to_standard_numeric(&quals);
assert_eq!(numeric, vec![0u8, 10, 20, 30, 40]); }
#[test]
fn test_quality_encoding_to_standard_numeric_phred64() {
let encoding = QualityEncoding::Illumina;
let quals = vec![64u8, 74, 84, 94, 104]; let numeric = encoding.to_standard_numeric(&quals);
assert_eq!(numeric, vec![0u8, 10, 20, 30, 40]); }
#[test]
fn test_quality_encoding_to_standard_numeric_empty() {
let encoding = QualityEncoding::Standard;
let quals: Vec<u8> = vec![];
let numeric = encoding.to_standard_numeric(&quals);
assert_eq!(numeric, Vec::<u8>::new());
}
#[test]
fn test_zero_length_reads_integration_with_quality_detection() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1", "ACGTACGT", "IIIIIIII"), ("q2", "", ""), ("q3", "TTTTTTTT", "########"), ],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 3);
assert_eq!(records[0].sequence().as_ref(), b"ACGTACGT");
assert_eq!(records[1].sequence().as_ref(), b"N");
assert_eq!(records[1].quality_scores().as_ref(), &[2u8]);
assert_eq!(records[2].sequence().as_ref(), b"TTTTTTTT");
}
#[test]
fn test_phred64_fastq_end_to_end() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1", "ACGTACGT", "@@DDHHLL"), ("q2", "GGGGGGGG", "PPPPPPPP"), ],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 2);
assert_eq!(records[0].quality_scores().as_ref(), &[0u8, 0, 4, 4, 8, 8, 12, 12]);
assert_eq!(records[1].quality_scores().as_ref(), &[16u8; 8]);
}
#[test]
fn test_paired_end_with_different_read_structures() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAATTTTCCCCGGGG", "IIIIIIIIIIIIIIII")]);
let r2 = create_fastq(&tmp, "r2.fq", &[("q1", "TTTTGGGG", "IIIIIIII")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1, r2],
output: output.clone(),
read_structures: vec![
ReadStructure::from_str("4M4S+T").expect("valid read structure"), ReadStructure::from_str("4M+T").expect("valid read structure"), ],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 2);
assert_eq!(records[0].sequence().as_ref(), b"CCCCGGGG");
assert_eq!(records[1].sequence().as_ref(), b"GGGG");
let rx_tag = noodles::sam::alignment::record::data::field::Tag::from(SamTag::RX);
let r1_umi = records[0].data().get(&rx_tag).expect("expected tag not found");
let r2_umi = records[1].data().get(&rx_tag).expect("expected tag not found");
if let noodles::sam::alignment::record_buf::data::field::Value::String(s) = r1_umi {
assert_eq!(<bstr::BString as AsRef<[u8]>>::as_ref(s), b"AAAA-TTTT");
}
if let noodles::sam::alignment::record_buf::data::field::Value::String(s) = r2_umi {
assert_eq!(<bstr::BString as AsRef<[u8]>>::as_ref(s), b"AAAA-TTTT");
}
}
#[test]
fn test_multithreaded_extraction() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1", "AAAAACGTACGT", "IIIIIIIIIIII"),
("q2", "TTTTTTTTTTTT", "IIIIIIIIIIII"),
("q3", "CCCCGGGGAAAA", "IIIIIIIIIIII"),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("5M+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::new(4), compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 3);
assert_eq!(records[0].sequence().as_ref(), b"CGTACGT");
assert_eq!(records[1].sequence().as_ref(), b"TTTTTTT");
assert_eq!(records[2].sequence().as_ref(), b"GGGAAAA");
}
#[test]
fn test_multithreaded_extraction_preserves_order() {
let tmp = TempDir::new().expect("failed to create temp dir");
let reads: Vec<(&str, &str, &str)> = (0..100)
.map(|i| {
let name: &'static str = Box::leak(format!("read_{i:03}").into_boxed_str());
let seq: &'static str =
Box::leak(format!("AAAAA{}", "ACGT".repeat(10)).into_boxed_str());
let qual: &'static str = Box::leak("I".repeat(45).into_boxed_str());
(name, seq, qual)
})
.collect();
let r1 = create_fastq(&tmp, "r1.fq", &reads);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("5M+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::new(8), compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 100);
for (i, record) in records.iter().enumerate() {
let expected_name = format!("read_{i:03}");
let actual_name = record.name().map(|n| n.as_bytes()).unwrap_or_default();
assert_eq!(
actual_name,
expected_name.as_bytes(),
"Output order mismatch at position {i}: expected {expected_name}, got {}",
String::from_utf8_lossy(actual_name)
);
}
}
#[test]
fn test_sample_barcode_with_quality_tags_specified() {
let tmp = TempDir::new().expect("failed to create temp dir");
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "AAAAACGTACGT", "IIIIIIIIIIII")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("5B+T").expect("valid read structure")], store_umi_quals: true,
store_cell_quals: true,
store_sample_barcode_qualities: true,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: Some("AAAAA".to_string()),
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test").expect("execute should succeed");
let records = read_bam_records(&output);
assert_eq!(records.len(), 1);
assert_eq!(records[0].sequence().as_ref(), b"CGTACGT");
}
#[test]
fn test_mapping_quality_is_zero_for_unmapped_reads() -> Result<()> {
let dir = TempDir::new()?;
let fastq1 = create_fastq(&dir, "r1.fq", &[("read1", "ACGTACGTAC", "IIIIIIIIII")]);
let fastq2 = create_fastq(&dir, "r2.fq", &[("read1", "TGCATGCATG", "IIIIIIIIII")]);
let output = dir.path().join("output.bam");
let extract = Extract {
inputs: vec![fastq1, fastq2],
output: output.clone(),
read_structures: vec![],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "sample".to_string(),
library: "library".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading: ThreadingOptions::none(),
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test")?;
let records = read_bam_records(&output);
assert_eq!(records.len(), 2, "Should have 2 records for paired-end");
for record in &records {
let mapq = record.mapping_quality();
assert!(mapq.is_some(), "Mapping quality should be set (not None/255)");
let mapq_value: u8 = mapq.expect("mapping quality should be set").into();
assert_eq!(mapq_value, 0, "Mapping quality should be 0 for unmapped reads");
}
Ok(())
}
#[test]
fn test_compression_format_detection_plain() {
let tmp = TempDir::new().expect("failed to create temp dir");
let plain_path = tmp.path().join("test.fq");
let mut file = File::create(&plain_path).expect("failed to create file");
use std::io::Write;
writeln!(file, "@read1").expect("failed to write line");
writeln!(file, "ACGT").expect("failed to write line");
writeln!(file, "+").expect("failed to write line");
writeln!(file, "IIII").expect("failed to write line");
let format = detect_compression_format(&plain_path)
.expect("compression format detection should succeed");
assert_eq!(format, CompressionFormat::Plain);
}
#[test]
fn test_compression_format_detection_gzip() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let tmp = TempDir::new().expect("failed to create temp dir");
let gz_path = tmp.path().join("test.fq.gz");
let file = File::create(&gz_path).expect("failed to create file");
let mut encoder = GzEncoder::new(file, Compression::default());
writeln!(encoder, "@read1").expect("failed to write line");
writeln!(encoder, "ACGT").expect("failed to write line");
writeln!(encoder, "+").expect("failed to write line");
writeln!(encoder, "IIII").expect("failed to write line");
encoder.finish().expect("failed to finish gzip encoding");
let format = detect_compression_format(&gz_path)
.expect("compression format detection should succeed");
assert_eq!(format, CompressionFormat::Gzip);
}
#[test]
fn test_compression_format_detection_bgzf() {
use noodles_bgzf::io::Writer as BgzfWriter;
use std::io::Write;
let tmp = TempDir::new().expect("failed to create temp dir");
let bgzf_path = tmp.path().join("test.fq.bgz");
let file = File::create(&bgzf_path).expect("failed to create file");
let mut writer = BgzfWriter::new(file);
writeln!(writer, "@read1").expect("failed to write line");
writeln!(writer, "ACGT").expect("failed to write line");
writeln!(writer, "+").expect("failed to write line");
writeln!(writer, "IIII").expect("failed to write line");
writer.finish().expect("failed to finish BGZF writing");
let format = detect_compression_format(&bgzf_path)
.expect("compression format detection should succeed");
assert_eq!(format, CompressionFormat::Bgzf);
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::pipeline_1(ThreadingOptions::new(1))]
#[case::pipeline_2(ThreadingOptions::new(2))]
fn test_threading_modes(#[case] threading: ThreadingOptions) -> Result<()> {
let tmp = TempDir::new()?;
let r1 = create_fastq(
&tmp,
"r1.fq",
&[
("q1", "AAAAACGTACGT", "IIIIIIIIIIII"),
("q2", "TTTTTTTTTTTT", "IIIIIIIIIIII"),
("q3", "CCCCGGGGAAAA", "IIIIIIIIIIII"),
],
);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("5M+T").expect("valid read structure")],
store_umi_quals: false,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "A".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading,
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test")?;
let records = read_bam_records(&output);
assert_eq!(records.len(), 3);
assert_eq!(records[0].sequence().as_ref(), b"CGTACGT");
assert_eq!(records[1].sequence().as_ref(), b"TTTTTTT");
assert_eq!(records[2].sequence().as_ref(), b"GGGAAAA");
Ok(())
}
#[rstest]
#[case::fast_path(ThreadingOptions::none())]
#[case::threaded(ThreadingOptions::new(2))]
fn test_threading_modes_emit_correct_tags(#[case] threading: ThreadingOptions) -> Result<()> {
let tmp = TempDir::new()?;
let r1 = create_fastq(&tmp, "r1.fq", &[("q1", "ACGTAAAAAA", "IIII======")]);
let output = tmp.path().join("output.bam");
let extract = Extract {
inputs: vec![r1],
output: output.clone(),
read_structures: vec![ReadStructure::from_str("4M+T").expect("valid read structure")],
store_umi_quals: true,
store_cell_quals: false,
store_sample_barcode_qualities: false,
extract_umis_from_read_names: false,
annotate_read_names: false,
single_tag: None,
clipping_attribute: None,
read_group_id: "RG1".to_string(),
sample: "s".to_string(),
library: "l".to_string(),
barcode: None,
platform: "illumina".to_string(),
platform_unit: None,
platform_model: None,
sequencing_center: None,
predicted_insert_size: None,
description: None,
comment: vec![],
run_date: None,
threading,
compression: CompressionOptions { compression_level: 1 },
scheduler_opts: SchedulerOptions::default(),
queue_memory: QueueMemoryOptions::default(),
async_reader: false,
};
extract.execute("test")?;
let records = read_bam_records(&output);
assert_eq!(records.len(), 1);
let record = &records[0];
let rx = get_tag_string(record, "RX");
assert_eq!(rx.as_deref(), Some("ACGT"), "RX tag should contain the extracted UMI");
let qx = get_tag_string(record, "QX");
assert!(qx.is_some(), "QX tag should be present when store_umi_quals is true");
let rg = get_tag_string(record, "RG");
assert_eq!(rg.as_deref(), Some("RG1"), "RG tag should match read_group_id");
Ok(())
}
}