use std;
use std::cmp::Ordering;
use std::process;
use std::str;
use rand::prelude::*;
use rust_htslib::bam;
use rust_htslib::bam::record::{Cigar, CigarString};
use rust_htslib::bam::Read as BamRead;
use rust_htslib::bam::Record;
use rust_htslib::errors::Result as HtslibResult;
use aux_as;
use mapping_parameters::ReadFormat;
use nix::sys::stat;
use nix::unistd;
use tempdir::TempDir;
use tempfile;
use bam_generator::complete_processes;
use bam_generator::*;
use genome_exclusion::*;
use mapping_index_maintenance::MappingIndex;
use std::ffi;
use std::slice;
fn bam_header_target_name(header: &bam::HeaderView, tid: usize) -> &[u8] {
let names = unsafe {
slice::from_raw_parts(header.inner().target_name, header.target_count() as usize)
};
return unsafe { ffi::CStr::from_ptr(names[tid]).to_bytes() };
}
pub struct ReadSortedShardedBamReader<'a, T>
where
T: GenomeExclusion,
{
shard_bam_readers: Vec<bam::Reader>,
previous_read_records: Option<Vec<bam::Record>>,
next_record_to_return: Option<bam::Record>,
winning_index: Option<usize>,
tid_offsets: Vec<i32>,
genome_exclusion: &'a T,
}
impl<'a, T> ReadSortedShardedBamReader<'a, T>
where
T: GenomeExclusion,
{
fn read_a_record_set(&mut self) -> Option<Vec<Record>> {
let mut current_alignments: Vec<Record> = Vec::with_capacity(self.shard_bam_readers.len());
let mut current_qname: Option<String> = None;
let mut some_unfinished = false;
let mut some_finished = false;
let mut record;
let mut current_alignment;
for (i, reader) in self.shard_bam_readers.iter_mut().enumerate() {
loop {
{
current_alignment = bam::Record::new();
match reader.read(&mut current_alignment) {
None => {
debug!("BAM reader #{} appears to be finished", i);
some_finished = true;
break;
}
Some(Ok(())) => {}
Some(e) => {
panic!("EFailure to read from a shard BAM file: {:?}", e)
}
}
}
{
record = current_alignment.clone();
if !record.is_paired() {
error!(
"This code can only handle paired-end \
input (at the moment), sorry. Found \
record {:?}",
record
);
process::exit(1);
}
if !record.is_secondary() && !record.is_supplementary() {
some_unfinished = true;
match current_qname.clone() {
None => {
current_qname =
Some(str::from_utf8(record.qname()).unwrap().to_string())
}
Some(prev) => {
if prev != *str::from_utf8(record.qname()).unwrap() {
error!(
"BAM files do not appear to be \
properly sorted by read name. \
Expected read name {:?} from a \
previous reader but found {:?} \
in the current.",
prev,
str::from_utf8(record.qname()).unwrap().to_string()
);
process::exit(1);
}
}
}
current_alignments.push(current_alignment);
break;
}
}
}
}
if some_unfinished && some_finished {
error!("Unexpectedly one BAM file input finished while another had further reads");
process::exit(1);
}
debug!("Read records {:?}", current_alignments);
match some_unfinished {
true => Some(current_alignments),
false => None,
}
}
fn clone_record_into(from: &Record, to: &mut Record) {
to.set(
from.qname(),
Some(&CigarString(
from.cigar().iter().copied().collect::<Vec<Cigar>>(),
)),
from.seq().as_bytes().as_slice(),
from.qual(),
);
to.set_pos(from.pos());
to.set_bin(from.bin());
to.set_mapq(from.mapq());
to.set_flags(from.flags());
to.set_mtid(from.mtid());
to.set_mpos(from.mpos());
to.set_insert_size(from.insert_size());
to.set_tid(from.tid());
match &to.aux("NM".as_bytes()) {
Ok(_) => to
.remove_aux(b"NM")
.expect("Failed to remove previous NM aux value when cloning"),
Err(_) => {}
};
match &from.aux("NM".as_bytes()) {
Ok(value) => {
if let rust_htslib::bam::record::Aux::U8(v) = value {
to.push_aux("NM".as_bytes(), rust_htslib::bam::record::Aux::U8(*v))
.expect("Failed to push new AUX NM tag during record clone");
} else {
panic!("Unexpected data type of NM aux tag")
}
}
Err(e) => {
if from.tid() >= 0 && from.cigar_len() != 0 {
panic!(
"record {:?} with name {} had no NM tag: {}",
from,
str::from_utf8(from.qname()).unwrap(),
e,
);
}
}
}
debug!(
"from cigar {:?}, to cigar {:?}",
from.cigar().to_string(),
to.cigar().to_string()
)
}
}
impl<'a, T> ReadSortedShardedBamReader<'a, T>
where
T: GenomeExclusion,
{
fn read(&mut self, to_return: &mut bam::Record) -> HtslibResult<bool> {
if self.next_record_to_return.is_some() {
{
let record = self.next_record_to_return.as_ref().unwrap();
ReadSortedShardedBamReader::<T>::clone_record_into(record, to_return);
}
self.next_record_to_return = None;
let tid_now = to_return.tid();
to_return.set_tid(tid_now + self.tid_offsets[self.winning_index.unwrap()]);
Ok(true)
} else {
if self.previous_read_records.is_none() {
self.previous_read_records = self.read_a_record_set();
if self.previous_read_records.is_none() {
return Ok(false);
}
}
let second_read_alignments = self
.read_a_record_set()
.expect("Unexpectedly was able to read a first read set, but not a second. Hmm.");
debug!("Previous records {:?}", self.previous_read_records);
debug!("Second read records {:?}", second_read_alignments);
let mut max_score: Option<i64> = None;
let mut winning_indices: Vec<usize> = vec![];
match self.previous_read_records {
None => unreachable!(),
Some(ref previous_records) => {
for (i, aln1) in previous_records.iter().enumerate() {
let tid = aln1.tid();
if tid < 0
|| !self.genome_exclusion.is_excluded(bam_header_target_name(
self.shard_bam_readers[i].header(),
tid as usize,
))
{
let mut score: i64 = 0;
if !aln1.is_unmapped() {
score += aux_as(aln1);
}
if !second_read_alignments[i].is_unmapped() {
score += aux_as(&second_read_alignments[i]);
}
if max_score.is_none() || score > max_score.unwrap() {
max_score = Some(score);
winning_indices = vec![i]
} else if score == max_score.unwrap() {
winning_indices.push(i)
}
}
}
}
};
let winning_index = match winning_indices.len().cmp(&1) {
Ordering::Greater => *winning_indices.choose(&mut thread_rng()).unwrap(),
Ordering::Equal => winning_indices[0],
Ordering::Less => {
error!(
"CoverM cannot currently deal with reads that only map to excluded genomes"
);
process::exit(1);
}
};
debug!(
"Choosing winning index {} from winner pool {:?}",
winning_index, winning_indices
);
self.winning_index = Some(winning_index);
self.next_record_to_return = Some(second_read_alignments[winning_index].clone());
match self.previous_read_records {
None => unreachable!(),
Some(ref prev) => {
debug!("previous tid {}", &prev[winning_index].tid());
ReadSortedShardedBamReader::<T>::clone_record_into(
&prev[winning_index],
to_return,
)
}
};
let tid_now = to_return.tid();
to_return.set_tid(tid_now + self.tid_offsets[winning_index]);
debug!(
"Reindexed TID is {}, tid_offsets are {:?} and tid_now is {}",
to_return.tid(),
self.tid_offsets,
tid_now
);
self.previous_read_records = None;
Ok(true)
}
}
}
pub struct ShardedBamReaderGenerator<'a, T>
where
T: GenomeExclusion,
{
pub stoit_name: String,
pub read_sorted_bam_readers: Vec<bam::Reader>,
pub sort_threads: u16,
pub genome_exclusion: &'a T,
}
impl<'a, T> NamedBamReaderGenerator<ShardedBamReader> for ShardedBamReaderGenerator<'a, T>
where
T: GenomeExclusion,
{
fn start(self) -> ShardedBamReader {
let mut new_header = bam::header::Header::new();
let mut tid_offsets: Vec<i32> = vec![];
let mut current_tid_offset: i32 = 0;
for reader in self.read_sorted_bam_readers.iter() {
let header = reader.header();
let names = header.target_names();
for (current_tid, name) in names.iter().enumerate() {
let length = header.target_len(current_tid as u32).unwrap_or_else(|| {
panic!("Failed to get target length for TID {}", current_tid)
});
let mut current_record = bam::header::HeaderRecord::new(b"SQ");
current_record.push_tag(b"SN", std::str::from_utf8(name).unwrap());
current_record.push_tag(b"LN", length);
new_header.push_record(¤t_record);
}
tid_offsets.push(current_tid_offset);
current_tid_offset += header.target_count() as i32;
}
let tmp_dir = TempDir::new("coverm_fifo")
.expect("Unable to create samtools sort temporary directory");
let sort_input_fifo_path = tmp_dir.path().join("sort_input.pipe");
let sort_output_fifo_path = tmp_dir.path().join("sort_output.pipe");
let sort_temp_fifo_path = tmp_dir.path().join("sort_temp.pipe");
let sort_log_file = tempfile::Builder::new()
.prefix("coverm-shard-sort-log")
.tempfile()
.expect("Failed to create samtools sort log tempfile");
debug!("Creating FIFOs ..");
unistd::mkfifo(&sort_input_fifo_path, stat::Mode::S_IRWXU)
.unwrap_or_else(|_| panic!("Error creating named pipe {:?}", sort_input_fifo_path));
unistd::mkfifo(&sort_output_fifo_path, stat::Mode::S_IRWXU)
.unwrap_or_else(|_| panic!("Error creating named pipe {:?}", sort_output_fifo_path));
debug!("Instantiating ReadSortedShardedBamReader ..");
let mut demux = ReadSortedShardedBamReader {
shard_bam_readers: self.read_sorted_bam_readers,
tid_offsets,
previous_read_records: None,
next_record_to_return: None,
winning_index: None,
genome_exclusion: self.genome_exclusion,
};
let sort_output_fifo_path2 = sort_output_fifo_path.clone();
let sorted_reader_join_handle: std::thread::JoinHandle<bam::Reader> =
std::thread::spawn(move || {
debug!("Starting to open sorted read BAM ..");
let reader = bam::Reader::from_path(sort_output_fifo_path2)
.expect("Unable to open reader from samtools sort output FIFO");
debug!("Finished opening reader to samtools sort output FIFO..");
reader
});
let sort_command_string = format!(
"set -e -o pipefail; \
samtools sort -l0 {} -o {} -T {} -@ {}",
sort_input_fifo_path
.to_str()
.expect("Failed to convert sort tempfile input path to str"),
sort_output_fifo_path
.to_str()
.expect("Failed to convert sort tempfile output path to str"),
sort_temp_fifo_path
.to_str()
.expect("Failed to convert sort tempfile tempfile path to str"),
&self.sort_threads,
);
debug!("Running cmd_string: {}", sort_command_string);
let mut cmd = std::process::Command::new("bash");
cmd.arg("-c").arg(&sort_command_string);
let sort_child = cmd.spawn().expect("Unable to execute bash");
{
debug!(
"Opening BAM writer to {}",
&sort_input_fifo_path.to_str().unwrap()
);
let mut writer = bam::Writer::from_path(
&sort_input_fifo_path,
&new_header,
rust_htslib::bam::Format::Bam,
)
.expect("Failed to open BAM to write to samtools sort process");
writer
.set_compression_level(bam::CompressionLevel::Uncompressed)
.expect("Failure to set BAM writer compression level - programming bug?");
debug!("Writing records to samtools sort input FIFO..");
let mut record = bam::Record::new();
while demux
.read(&mut record)
.expect("Failed to read demux BAM stream")
{
debug!(
"Writing tid {} for qname {}",
record.tid(),
str::from_utf8(record.qname()).unwrap()
);
writer
.write(&record)
.expect("Failed to write BAM record to samtools sort input fifo");
}
debug!("Finished writing records to samtools sort input FIFO.");
}
let reader = sorted_reader_join_handle
.join()
.expect("sorted reader thread failed");
ShardedBamReader {
stoit_name: self.stoit_name,
bam_reader: reader,
tempdir: tmp_dir,
sort_process: sort_child,
sort_command_string,
sort_log_file_description: "samtools sort".to_string(),
sort_log_file,
num_detected_primary_alignments: 0,
}
}
}
pub struct ShardedBamReader {
stoit_name: String,
bam_reader: bam::Reader,
tempdir: TempDir,
sort_process: std::process::Child,
sort_command_string: String,
sort_log_file_description: String,
sort_log_file: tempfile::NamedTempFile,
num_detected_primary_alignments: u64,
}
impl NamedBamReader for ShardedBamReader {
fn name(&self) -> &str {
&(self.stoit_name)
}
fn read(&mut self, record: &mut bam::record::Record) -> Option<HtslibResult<()>> {
let res = self.bam_reader.read(record);
if res == Some(Ok(())) && !record.is_secondary() && !record.is_supplementary() {
self.num_detected_primary_alignments += 1;
}
res
}
fn header(&self) -> &bam::HeaderView {
self.bam_reader.header()
}
fn finish(self) {
complete_processes(
vec![self.sort_process],
vec![self.sort_command_string],
vec![self.sort_log_file_description],
vec![self.sort_log_file],
Some(self.tempdir),
);
}
fn set_threads(&mut self, n_threads: usize) {
if n_threads > 1 {
self.bam_reader.set_threads(n_threads - 1).unwrap();
}
}
fn num_detected_primary_alignments(&self) -> u64 {
self.num_detected_primary_alignments
}
}
pub fn generate_sharded_bam_reader_from_bam_files<'a, T>(
bam_paths: Vec<&str>,
sort_threads: u16,
genome_exclusion: &'a T,
) -> Vec<ShardedBamReaderGenerator<'a, T>>
where
T: GenomeExclusion,
{
let bam_readers = bam_paths
.iter()
.map(|f| {
debug!("Opening BAM {} ..", f);
bam::Reader::from_path(f).unwrap_or_else(|_| panic!("Unable to open bam file {}", f))
})
.collect();
let stoit_name = bam_paths
.iter()
.map(|f| {
std::path::Path::new(f)
.file_stem()
.unwrap()
.to_str()
.expect("failure to convert bam file name to stoit name - UTF8 error maybe?")
.to_string()
})
.fold(None, {
|acc, s| match acc {
None => Some(s),
Some(prev) => Some(format!("{}|{}", prev, s)),
}
})
.unwrap();
debug!("Opened all input BAM files");
let gen = ShardedBamReaderGenerator {
stoit_name,
read_sorted_bam_readers: bam_readers,
sort_threads,
genome_exclusion,
};
vec![gen]
}
#[allow(clippy::too_many_arguments)]
pub fn generate_named_sharded_bam_readers_from_reads(
mapping_program: MappingProgram,
index: &dyn MappingIndex,
read1_path: &str,
read2_path: Option<&str>,
read_format: ReadFormat,
threads: u16,
cached_bam_file: Option<&str>,
_discard_unmapped: bool,
mapping_options: Option<&str>,
) -> bam::Reader {
let tmp_dir = TempDir::new("coverm_fifo").expect("Unable to create temporary directory");
let fifo_path = tmp_dir.path().join("foo.pipe");
unistd::mkfifo(&fifo_path, stat::Mode::S_IRWXU)
.unwrap_or_else(|_| panic!("Error creating named pipe {:?}", fifo_path));
let mapping_log = tempfile::Builder::new()
.prefix("coverm-shard-mapping-log")
.tempfile()
.unwrap_or_else(|_| panic!("Failed to create {:?} log tempfile", mapping_program));
let samtools2_log = tempfile::Builder::new()
.prefix("coverm-shard-samtools2-log")
.tempfile()
.expect("Failed to create second samtools log tempfile");
let samtools_view_cache_log = tempfile::Builder::new()
.prefix("coverm-shard-samtools-view-log")
.tempfile()
.expect("Failed to create cache samtools view log tempfile");
let cached_bam_file_args = match cached_bam_file {
Some(path) => {
format!(
"|tee {:?} |samtools view {} -@ {} -b -o '{}' 2>{}",
fifo_path,
"",
threads - 1,
path,
samtools_view_cache_log
.path()
.to_str()
.expect("Failed to convert tempfile path to str")
)
}
None => format!("> {:?}", fifo_path),
};
let mapping_command = build_mapping_command(
mapping_program,
read_format,
threads,
read1_path,
index,
read2_path,
mapping_options,
);
let bwa_sort_prefix = tempfile::Builder::new()
.prefix("coverm-make-samtools-sort")
.tempfile_in(tmp_dir.path())
.expect("Failed to create tempfile as samtools sort prefix");
let cmd_string = format!(
"set -e -o pipefail; \
{} 2>{}\
| samtools sort -n -T '{}' -l0 -@ {} 2>{} \
{}",
mapping_command,
mapping_log
.path()
.to_str()
.expect("Failed to convert tempfile path to str"),
bwa_sort_prefix
.path()
.to_str()
.expect("Failed to convert bwa_sort_prefix tempfile to str"),
threads - 1,
samtools2_log
.path()
.to_str()
.expect("Failed to convert tempfile path to str"),
cached_bam_file_args
);
debug!("Queuing cmd_string: {}", cmd_string);
let mut cmd = std::process::Command::new("bash");
cmd.arg("-c")
.arg(&cmd_string)
.stderr(std::process::Stdio::piped());
let mut log_descriptions = vec![
format!("{:?}", mapping_program),
"samtools sort".to_string(),
];
let mut log_files = vec![mapping_log, samtools2_log];
if cached_bam_file.is_some() {
log_descriptions.push("samtools view for cache".to_string());
log_files.push(samtools_view_cache_log);
}
debug!("Starting mapping processes");
let pre_processes = vec![cmd];
let command_strings = vec![format!("bash -c \"{}\"", cmd_string)];
let mut processes = vec![];
for (i, mut preprocess) in pre_processes.into_iter().enumerate() {
debug!("Running mapping command: {}", command_strings[i]);
processes.push(preprocess.spawn().expect("Unable to execute bash"));
}
match bam::Reader::from_path(&fifo_path) {
Ok(reader) => reader,
Err(upstream_error) => {
error!(
"Failed to correctly find or parse BAM file at {:?}: {}",
fifo_path, upstream_error
);
complete_processes(
processes,
command_strings,
log_descriptions,
log_files,
Some(tmp_dir),
);
panic!("Failure to find or parse BAM file, cannot continue");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_shard_hello_world() {
let gen = ShardedBamReaderGenerator {
stoit_name: "stoiter".to_string(),
read_sorted_bam_readers: vec![
bam::Reader::from_path("tests/data/2seqs.fastaVbad_read.bam").unwrap(),
bam::Reader::from_path("tests/data/7seqs.fnaVbad_read.bam").unwrap(),
],
sort_threads: 1,
genome_exclusion: &NoExclusionGenomeFilter {},
};
let mut reader = gen.start();
assert_eq!("stoiter".to_string(), reader.stoit_name);
let mut r = bam::Record::new();
reader.bam_reader.read(&mut r).expect("").expect("");
println!("{}", str::from_utf8(r.qname()).unwrap());
println!("{}", r.tid());
}
}