use std::error::Error;
use futures::executor::block_on_stream;
use crate::bam::BamFile;
use crate::coverage::CoverageConfig;
use crate::read_stream::ReadStream;
use crate::track_generic::GenericMutableTrack;
impl<'a> GenericMutableTrack<'a> {
pub fn coverage_from_bam(
mut config: CoverageConfig,
mut track1: GenericMutableTrack,
track2_arg: Option<GenericMutableTrack>,
filenames_treatment: &Vec<&str>,
filenames_control: &Vec<&str>,
fraglen_treatment: &Vec<usize>,
fraglen_control: &Vec<usize>,
) -> Result<(), Box<dyn Error>> {
let mut n_treatment = 0;
let mut n_control = 0;
for (i, filename) in filenames_treatment.iter().enumerate() {
let mut err_opt = None;
let fraglen = fraglen_treatment[i];
log!(config.logger, "Reading treatment tags from `{}`", filename);
let mut bam = BamFile::open(filename, None)?;
let treatment = Box::pin(bam.reader.read_simple_stream(
!config.paired_as_single_end,
config.paired_end_strand_specific,
));
let treatment = ReadStream::filter_paired_end(
treatment,
Some(&config.logger),
config.filter_paired_end,
);
let treatment = ReadStream::filter_single_end(
treatment,
Some(&config.logger),
config.filter_single_end,
);
let treatment = ReadStream::paired_as_single_end(
treatment,
Some(&config.logger),
config.paired_as_single_end,
);
let treatment = ReadStream::filter_read_length(
treatment,
Some(&config.logger),
&config.filter_read_lengths,
);
let treatment = ReadStream::filter_duplicates(
treatment,
Some(&config.logger),
config.filter_duplicates,
);
let treatment =
ReadStream::filter_mapq(treatment, Some(&config.logger), config.filter_mapq);
let treatment =
ReadStream::filter_strand(treatment, Some(&config.logger), config.filter_strand);
let treatment =
ReadStream::shift_reads(treatment, Some(&config.logger), &config.shift_reads);
let treatment_iter = block_on_stream(treatment).map_while(|item| match item {
Ok(read) => Some(read),
Err(err) => {
err_opt = Some(err);
None
}
});
n_treatment += track1.add_reads(treatment_iter, fraglen, &config.binning_method);
if let Some(err) = err_opt {
return Err(Box::new(err));
}
}
if config.normalize_track == "rpkm" {
log!(config.logger, "Normalizing treatment track (rpkm)");
let c = 1_000_000.0 / (n_treatment as f64 * config.bin_size as f64);
track1.map(|_name, _i, x| c * x)?;
config.pseudocounts[0] *= c;
}
if config.normalize_track == "cpm" {
log!(config.logger, "Normalizing treatment track (cpm)");
let c = 1_000_000.0 / n_treatment as f64;
track1.map(|_name, _i, x| c * x)?;
config.pseudocounts[0] *= c;
}
if let Some(mut track2) = track2_arg {
for (i, filename) in filenames_control.iter().enumerate() {
let mut err_opt = None;
let fraglen = fraglen_control[i];
log!(config.logger, "Reading control tags from `{}`", filename);
let mut bam = BamFile::open(filename, None)?;
let control = Box::pin(bam.reader.read_simple_stream(
!config.paired_as_single_end,
config.paired_end_strand_specific,
));
let control = ReadStream::filter_paired_end(
control,
Some(&config.logger),
config.filter_paired_end,
);
let control = ReadStream::filter_single_end(
control,
Some(&config.logger),
config.filter_single_end,
);
let control = ReadStream::paired_as_single_end(
control,
Some(&config.logger),
config.paired_as_single_end,
);
let control = ReadStream::filter_read_length(
control,
Some(&config.logger),
&config.filter_read_lengths,
);
let control = ReadStream::filter_duplicates(
control,
Some(&config.logger),
config.filter_duplicates,
);
let control =
ReadStream::filter_mapq(control, Some(&config.logger), config.filter_mapq);
let control =
ReadStream::filter_strand(control, Some(&config.logger), config.filter_strand);
let control =
ReadStream::shift_reads(control, Some(&config.logger), &config.shift_reads);
let control_iter = block_on_stream(control).map_while(|item| match item {
Ok(read) => Some(read),
Err(err) => {
err_opt = Some(err);
None
}
});
n_control += track2.add_reads(control_iter, fraglen, &config.binning_method);
if let Some(err) = err_opt {
return Err(Box::new(err));
}
}
if config.normalize_track == "rpkm" {
log!(config.logger, "Normalizing control track (rpkm)");
let c = 1_000_000.0 / (n_control as f64 * config.bin_size as f64);
track2.map(|_name, _i, x| c * x)?;
config.pseudocounts[1] *= c;
}
if config.normalize_track == "cpm" {
log!(config.logger, "Normalizing control track (cpm)");
let c = 1_000_000.0 / n_control as f64;
track2.map(|_name, _i, x| c * x)?;
config.pseudocounts[1] *= c;
}
if config.smoothen_control {
track2.smoothen(config.smoothen_min, config.smoothen_sizes.clone())?;
}
log!(config.logger, "Combining treatment and control tracks...");
track1.normalize(
track2.track.as_track(),
config.pseudocounts[0],
config.pseudocounts[1],
config.log_scale,
)?;
} else {
if config.pseudocounts[0] != 0.0 {
log!(
config.logger,
"Adding pseudocount `{}`",
config.pseudocounts[0]
);
track1.map(|_name, _i, x| x + config.pseudocounts[0])?;
}
if config.log_scale {
log!(config.logger, "Log-transforming data");
track1.map(|_name, _i, x| x.ln())?;
}
}
if config.remove_filtered_chroms {
if !config.filter_chroms.is_empty() {
log!(
config.logger,
"Removing chromosomes `{}`",
config.filter_chroms.join(", ")
);
track1.track.filter_genome(&|name: &str, _length: usize| {
!config.filter_chroms.contains(&name.to_string())
});
}
} else {
if !config.filter_chroms.is_empty() {
log!(
config.logger,
"Removing all reads from `{}`",
config.filter_chroms.join(", ")
);
for chr in &config.filter_chroms {
if let Ok(mut s) = track1.track.get_sequence_mut(chr) {
for i in 0..s.n_bins() {
s.set_bin(i, 0.0);
}
}
}
}
}
Ok(())
}
}