use std::error::Error;
use futures::executor::block_on_stream;
use crate::coverage::CoverageConfig;
use crate::read_stream::ReadStream;
use crate::track_generic::GenericMutableTrack;
use crate::bam::BamFile;
impl<'a> GenericMutableTrack<'a> {
pub fn coverage_from_bam(
mut config : CoverageConfig,
mut track1 : GenericMutableTrack,
track2_arg : Option<GenericMutableTrack>,
filenames_treatment: &Vec<&str>,
filenames_control : &Vec<&str>,
fraglen_treatment : &Vec<usize>,
fraglen_control : &Vec<usize>,
) -> Result<(), Box<dyn Error>> {
let mut n_treatment = 0;
let mut n_control = 0;
for (i, filename) in filenames_treatment.iter().enumerate() {
let mut err_opt = None;
let fraglen = fraglen_treatment[i];
log!(config.logger, "Reading treatment tags from `{}`", filename);
let mut bam = BamFile::open(filename, None)?;
let treatment = Box::pin(bam.reader.read_simple_stream(!config.paired_as_single_end, config.paired_end_strand_specific));
let treatment = ReadStream::filter_paired_end (treatment, Some(&config.logger), config.filter_paired_end);
let treatment = ReadStream::filter_single_end (treatment, Some(&config.logger), config.filter_single_end);
let treatment = ReadStream::paired_as_single_end(treatment, Some(&config.logger), config.paired_as_single_end);
let treatment = ReadStream::filter_read_length (treatment, Some(&config.logger), &config.filter_read_lengths);
let treatment = ReadStream::filter_duplicates (treatment, Some(&config.logger), config.filter_duplicates);
let treatment = ReadStream::filter_mapq (treatment, Some(&config.logger), config.filter_mapq);
let treatment = ReadStream::filter_strand (treatment, Some(&config.logger), config.filter_strand);
let treatment = ReadStream::shift_reads (treatment, Some(&config.logger), &config.shift_reads);
let treatment_iter = block_on_stream(treatment).map_while(|item| {
match item {
Ok(read) => Some(read),
Err(err) => {
err_opt = Some(err);
None
}
}
});
n_treatment += track1.add_reads(treatment_iter, fraglen, &config.binning_method);
if let Some(err) = err_opt {
return Err(Box::new(err));
}
}
if config.normalize_track == "rpkm" {
log!(config.logger, "Normalizing treatment track (rpkm)");
let c = 1_000_000.0 / (n_treatment as f64 * config.bin_size as f64);
track1.map(|_name, _i, x| c * x)?;
config.pseudocounts[0] *= c;
}
if config.normalize_track == "cpm" {
log!(config.logger, "Normalizing treatment track (cpm)");
let c = 1_000_000.0 / n_treatment as f64;
track1.map(|_name, _i, x| c * x)?;
config.pseudocounts[0] *= c;
}
if let Some(mut track2) = track2_arg {
for (i, filename) in filenames_control.iter().enumerate() {
let mut err_opt = None;
let fraglen = fraglen_control[i];
log!(config.logger, "Reading control tags from `{}`", filename);
let mut bam = BamFile::open(filename, None)?;
let control = Box::pin(bam.reader.read_simple_stream(!config.paired_as_single_end, config.paired_end_strand_specific));
let control = ReadStream::filter_paired_end (control, Some(&config.logger), config.filter_paired_end);
let control = ReadStream::filter_single_end (control, Some(&config.logger), config.filter_single_end);
let control = ReadStream::paired_as_single_end(control, Some(&config.logger), config.paired_as_single_end);
let control = ReadStream::filter_read_length (control, Some(&config.logger), &config.filter_read_lengths);
let control = ReadStream::filter_duplicates (control, Some(&config.logger), config.filter_duplicates);
let control = ReadStream::filter_mapq (control, Some(&config.logger), config.filter_mapq);
let control = ReadStream::filter_strand (control, Some(&config.logger), config.filter_strand);
let control = ReadStream::shift_reads (control, Some(&config.logger), &config.shift_reads);
let control_iter = block_on_stream(control).map_while(|item| {
match item {
Ok(read) => Some(read),
Err(err) => {
err_opt = Some(err);
None
}
}
});
n_control += track2.add_reads(control_iter, fraglen, &config.binning_method);
if let Some(err) = err_opt {
return Err(Box::new(err));
}
}
if config.normalize_track == "rpkm" {
log!(config.logger, "Normalizing control track (rpkm)");
let c = 1_000_000.0 / (n_control as f64 * config.bin_size as f64);
track2.map(|_name, _i, x| c * x)?;
config.pseudocounts[1] *= c;
}
if config.normalize_track == "cpm" {
log!(config.logger, "Normalizing control track (cpm)");
let c = 1_000_000.0 / n_control as f64;
track2.map(|_name, _i, x| c * x)?;
config.pseudocounts[1] *= c;
}
if config.smoothen_control {
track2.smoothen(config.smoothen_min, config.smoothen_sizes.clone())?;
}
log!(config.logger, "Combining treatment and control tracks...");
track1.normalize(
track2.track.as_track(), config.pseudocounts[0], config.pseudocounts[1], config.log_scale)?;
} else {
if config.pseudocounts[0] != 0.0 {
log!(config.logger, "Adding pseudocount `{}`", config.pseudocounts[0]);
track1.map(|_name, _i, x| x + config.pseudocounts[0])?;
}
if config.log_scale {
log!(config.logger, "Log-transforming data");
track1.map(|_name, _i, x| x.ln())?;
}
}
if config.remove_filtered_chroms {
if !config.filter_chroms.is_empty() {
log!(config.logger, "Removing chromosomes `{}`", config.filter_chroms.join(", "));
track1.track.filter_genome(&|name : &str, _length : usize| {
!config.filter_chroms.contains(&name.to_string())
});
}
} else {
if !config.filter_chroms.is_empty() {
log!(config.logger, "Removing all reads from `{}`", config.filter_chroms.join(", "));
for chr in &config.filter_chroms {
if let Ok(mut s) = track1.track.get_sequence_mut(chr) {
for i in 0..s.n_bins() {
s.set_bin(i, 0.0);
}
}
}
}
}
Ok(())
}
}