use anyhow::Result;
use bio::io::fastq;
use bio::io::fastq::{FastqRead, Record};
use bio::stats::probs::LogProb;
use derive_new::new;
use ordered_float::NotNaN;
use rgsl::randist::gaussian::ugaussian_P;
use rocksdb::DB;
use std::io;
use std::io::Write;
use std::mem;
use std::process::{Command, Stdio};
use std::str;
use tempfile::tempdir;
use uuid::Uuid;
use super::calc_consensus::{CalcNonOverlappingConsensus, CalcOverlappingConsensus};
const HAMMING_THRESHOLD: f64 = 10.0;
fn parse_cluster(record: csv::StringRecord) -> Result<Vec<usize>> {
let seqids = &record[2];
Ok(csv::ReaderBuilder::new()
.delimiter(b',')
.has_headers(false)
.from_reader(seqids.as_bytes())
.deserialize()
.next()
.unwrap()?)
}
fn median_hamming_distance(
insert_size: usize,
f_recs: &[fastq::Record],
r_recs: &[fastq::Record],
) -> Option<f64> {
let distances = f_recs.iter().zip(r_recs).filter_map(|(f_rec, r_rec)| {
if (insert_size < f_rec.seq().len()) | (insert_size < r_rec.seq().len()) {
return None;
}
if insert_size >= (f_rec.seq().len() + r_rec.seq().len()) {
return None;
}
let overlap = (f_rec.seq().len() + r_rec.seq().len()) - insert_size;
let suffix_start_idx: usize = f_rec.seq().len() - overlap;
Some(bio::alignment::distance::hamming(
&f_rec.seq()[suffix_start_idx..],
&bio::alphabets::dna::revcomp(r_rec.seq())[..overlap],
))
});
stats::median(distances)
}
fn isize_pmf(value: f64, mean: f64, sd: f64) -> LogProb {
LogProb((ugaussian_P((value + 0.5 - mean) / sd) - ugaussian_P((value - 0.5 - mean) / sd)).ln())
}
#[derive(Debug)]
struct FastqStorage {
db: DB,
}
impl FastqStorage {
pub fn new() -> Result<Self> {
let storage_dir = tempdir()?.path().join("db");
Ok(FastqStorage {
db: DB::open_default(storage_dir)?,
})
}
#[allow(clippy::wrong_self_convention)]
fn as_key(i: u64) -> [u8; 8] {
unsafe { mem::transmute::<u64, [u8; 8]>(i) }
}
pub fn put(&mut self, i: usize, f_rec: &fastq::Record, r_rec: &fastq::Record) -> Result<()> {
Ok(self.db.put(
&Self::as_key(i as u64),
serde_json::to_string(&(f_rec, r_rec))?.as_bytes(),
)?)
}
pub fn get(&self, i: usize) -> Result<(fastq::Record, fastq::Record)> {
Ok(serde_json::from_str(
str::from_utf8(&self.db.get(&Self::as_key(i as u64))?.unwrap()).unwrap(),
)?)
}
}
pub struct OverlappingConsensus {
record: Record,
likelihood: LogProb,
}
pub struct NonOverlappingConsensus {
f_record: Record,
r_record: Record,
likelihood: LogProb,
}
pub trait CallConsensusReads<'a, R: io::Read + io::BufRead + 'a, W: io::Write + 'a> {
fn call_consensus_reads(&'a mut self) -> Result<()> {
let spinner_style = indicatif::ProgressStyle::default_spinner()
.tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ ")
.template("{prefix:.bold.dim} {spinner} {wide_msg}");
let mut umi_cluster = Command::new("starcode")
.arg("--dist")
.arg(format!("{}", self.umi_dist()))
.arg("--seq-id")
.arg("-s")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Error in starcode call. Starcode might not be installed.");
let mut f_rec = fastq::Record::new();
let mut r_rec = fastq::Record::new();
let mut read_storage = FastqStorage::new()?;
let mut i = 0;
let pb = indicatif::ProgressBar::new_spinner();
pb.set_style(spinner_style.clone());
pb.set_prefix("[1/2] Clustering input reads by UMI using starcode.");
loop {
pb.set_message(&format!(" Processed {:>10} reads", i));
pb.inc(1);
self.fq1_reader().read(&mut f_rec)?;
self.fq2_reader().read(&mut r_rec)?;
match (f_rec.is_empty(), r_rec.is_empty()) {
(true, true) => break,
(false, false) => (),
(true, false) => {
let error_message = format!("Given FASTQ files have unequal lengths. Forward file returned record {} as empty, reverse record is not: id:'{}' seq:'{:?}'.", i, r_rec.id(), str::from_utf8(r_rec.seq()));
panic!("{}", error_message);
}
(false, true) => {
let error_message = format!("Given FASTQ files have unequal lengths. Reverse file returned record {} as empty, forward record is not: id:'{}' seq:'{:?}'.", i, f_rec.id(), str::from_utf8(f_rec.seq()));
panic!("{}", error_message);
}
}
let umi = if self.reverse_umi() {
r_rec.seq()[..self.umi_len()].to_owned()
} else {
f_rec.seq()[..self.umi_len()].to_owned()
};
umi_cluster.stdin.as_mut().unwrap().write_all(&umi)?;
umi_cluster.stdin.as_mut().unwrap().write_all(b"\n")?;
if self.reverse_umi() {
r_rec = self.strip_umi_from_record(&r_rec)
} else {
f_rec = self.strip_umi_from_record(&f_rec)
}
read_storage.put(i, &f_rec, &r_rec)?;
i += 1;
}
umi_cluster.stdin.as_mut().unwrap().flush()?;
drop(umi_cluster.stdin.take());
pb.finish_with_message(&format!("Done. Analyzed {} reads.", i));
let mut j = 0;
let pb = indicatif::ProgressBar::new_spinner();
pb.set_style(spinner_style);
pb.set_prefix("[1/2] Clustering input reads by UMI using starcode.");
for record in csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(false)
.from_reader(umi_cluster.stdout.as_mut().unwrap())
.records()
{
pb.inc(1);
pb.set_message(&format!("Processed {:>10} cluster", j));
let seqids = parse_cluster(record?)?;
let mut seq_cluster = Command::new("starcode")
.arg("--dist")
.arg(format!("{}", self.seq_dist()))
.arg("--seq-id")
.arg("-s")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
for &seqid in &seqids {
let (f_rec, r_rec) = read_storage.get(seqid - 1).unwrap();
seq_cluster
.stdin
.as_mut()
.unwrap()
.write_all(&[f_rec.seq(), r_rec.seq()].concat())?;
seq_cluster.stdin.as_mut().unwrap().write_all(b"\n")?;
}
seq_cluster.stdin.as_mut().unwrap().flush()?;
drop(seq_cluster.stdin.take());
for record in csv::ReaderBuilder::new()
.delimiter(b'\t')
.has_headers(false)
.from_reader(seq_cluster.stdout.as_mut().unwrap())
.records()
{
let inner_seqids = parse_cluster(record?)?;
let mut f_recs = Vec::new();
let mut r_recs = Vec::new();
let mut outer_seqids = Vec::new();
for inner_seqid in inner_seqids {
let seqid = seqids[inner_seqid - 1];
let (f_rec, r_rec) = read_storage.get(seqid - 1)?;
f_recs.push(f_rec);
r_recs.push(r_rec);
outer_seqids.push(seqid);
}
self.write_records(f_recs, r_recs, outer_seqids)?;
}
match seq_cluster
.wait()
.expect("process did not even start")
.code()
{
Some(0) => (),
Some(s) => eprintln!("Starcode failed with error code {}", s),
None => eprintln!("Starcode was terminated by signal"),
}
j += 1;
}
pb.finish_with_message(&format!("Done. Processed {} cluster.", j));
Ok(())
}
fn strip_umi_from_record(&mut self, record: &Record) -> Record {
let rec_seq = &record.seq()[self.umi_len()..];
let rec_qual = &record.qual()[self.umi_len()..];
Record::with_attrs(record.id(), record.desc(), rec_seq, rec_qual)
}
fn write_records(
&mut self,
f_recs: Vec<Record>,
r_recs: Vec<Record>,
outer_seqids: Vec<usize>,
) -> Result<()>;
fn fq1_reader(&mut self) -> &mut fastq::Reader<R>;
fn fq2_reader(&mut self) -> &mut fastq::Reader<R>;
fn umi_len(&self) -> usize;
fn seq_dist(&self) -> usize;
fn umi_dist(&self) -> usize;
fn reverse_umi(&self) -> bool;
}
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct CallNonOverlappingConsensusRead<'a, R: io::Read, W: io::Write> {
fq1_reader: &'a mut fastq::Reader<R>,
fq2_reader: &'a mut fastq::Reader<R>,
fq1_writer: &'a mut fastq::Writer<W>,
fq2_writer: &'a mut fastq::Writer<W>,
umi_len: usize,
seq_dist: usize,
umi_dist: usize,
reverse_umi: bool,
verbose_read_names: bool,
}
impl<'a, R: io::Read + io::BufRead, W: io::Write> CallConsensusReads<'a, R, W>
for CallNonOverlappingConsensusRead<'a, R, W>
{
fn write_records(
&mut self,
f_recs: Vec<Record>,
r_recs: Vec<Record>,
outer_seqids: Vec<usize>,
) -> Result<()> {
if f_recs.len() > 1 {
let uuid = &Uuid::new_v4().to_hyphenated().to_string();
self.fq1_writer.write_record(
&CalcNonOverlappingConsensus::new(
&f_recs,
&outer_seqids,
uuid,
self.verbose_read_names,
)
.calc_consensus()
.0,
)?;
self.fq2_writer.write_record(
&CalcNonOverlappingConsensus::new(
&r_recs,
&outer_seqids,
uuid,
self.verbose_read_names,
)
.calc_consensus()
.0,
)?;
} else {
self.fq1_writer.write_record(&f_recs[0])?;
self.fq2_writer.write_record(&r_recs[0])?;
}
Ok(())
}
fn fq1_reader(&mut self) -> &mut fastq::Reader<R> {
self.fq1_reader
}
fn fq2_reader(&mut self) -> &mut fastq::Reader<R> {
self.fq2_reader
}
fn umi_len(&self) -> usize {
self.umi_len
}
fn seq_dist(&self) -> usize {
self.seq_dist
}
fn umi_dist(&self) -> usize {
self.umi_dist
}
fn reverse_umi(&self) -> bool {
self.reverse_umi
}
}
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct CallOverlappingConsensusRead<'a, R: io::Read, W: io::Write> {
fq1_reader: &'a mut fastq::Reader<R>,
fq2_reader: &'a mut fastq::Reader<R>,
fq1_writer: &'a mut fastq::Writer<W>,
fq2_writer: &'a mut fastq::Writer<W>,
fq3_writer: &'a mut fastq::Writer<W>,
umi_len: usize,
seq_dist: usize,
umi_dist: usize,
insert_size: usize,
std_dev: usize,
reverse_umi: bool,
verbose_read_names: bool,
}
impl<'a, R: io::Read, W: io::Write> CallOverlappingConsensusRead<'a, R, W> {
fn isize_highest_probability(&mut self, f_seq_len: usize, r_seq_len: usize) -> f64 {
if f_seq_len + f_seq_len < self.insert_size {
self.insert_size as f64
} else if f_seq_len + r_seq_len > self.insert_size + 2 * self.std_dev {
(self.insert_size + 2 * self.std_dev) as f64
} else {
(f_seq_len + r_seq_len) as f64
}
}
fn maximum_likelihood_overlapping_consensus(
&mut self,
f_recs: &[Record],
r_recs: &[Record],
outer_seqids: &[usize],
uuid: &str,
) -> OverlappingConsensus {
let insert_sizes = ((self.insert_size - 2 * self.std_dev)
..(self.insert_size + 2 * self.std_dev))
.filter_map(|insert_size| {
median_hamming_distance(insert_size, f_recs, r_recs)
.filter(|&median_distance| median_distance < HAMMING_THRESHOLD)
.map(|_| insert_size)
});
insert_sizes
.map(|insert_size| {
let overlap = (f_recs[0].seq().len() + r_recs[0].seq().len()) - insert_size;
let (consensus_record, lh_isize) = CalcOverlappingConsensus::new(
f_recs,
r_recs,
overlap,
outer_seqids,
uuid,
self.verbose_read_names,
)
.calc_consensus();
let likelihood = lh_isize
+ isize_pmf(
insert_size as f64,
self.insert_size as f64,
self.std_dev as f64,
);
OverlappingConsensus {
record: consensus_record,
likelihood,
}
})
.max_by_key(|consensus| NotNaN::new(*consensus.likelihood).unwrap())
.unwrap()
}
fn maximum_likelihood_nonoverlapping_consensus(
&mut self,
f_recs: &[Record],
r_recs: &[Record],
outer_seqids: &[usize],
uuid: &str,
) -> NonOverlappingConsensus {
let (f_consensus_rec, f_lh) =
CalcNonOverlappingConsensus::new(f_recs, outer_seqids, uuid, self.verbose_read_names)
.calc_consensus();
let (r_consensus_rec, r_lh) =
CalcNonOverlappingConsensus::new(r_recs, outer_seqids, uuid, self.verbose_read_names)
.calc_consensus();
let overall_lh_isize = f_lh + r_lh;
let likeliest_isize =
self.isize_highest_probability(f_recs[0].seq().len(), r_recs[0].seq().len());
let overall_lh = overall_lh_isize
+ isize_pmf(
likeliest_isize,
self.insert_size as f64,
self.std_dev as f64,
);
NonOverlappingConsensus {
f_record: f_consensus_rec,
r_record: r_consensus_rec,
likelihood: overall_lh,
}
}
}
impl<'a, R: io::Read + io::BufRead, W: io::Write> CallConsensusReads<'a, R, W>
for CallOverlappingConsensusRead<'a, R, W>
{
fn write_records(
&mut self,
f_recs: Vec<Record>,
r_recs: Vec<Record>,
outer_seqids: Vec<usize>,
) -> Result<()> {
let uuid = &Uuid::new_v4().to_hyphenated().to_string();
let ol_consensus =
self.maximum_likelihood_overlapping_consensus(&f_recs, &r_recs, &outer_seqids, uuid);
let non_ol_consensus =
self.maximum_likelihood_nonoverlapping_consensus(&f_recs, &r_recs, &outer_seqids, uuid);
match ol_consensus.likelihood > non_ol_consensus.likelihood {
true => self.fq3_writer.write_record(&ol_consensus.record)?,
false => {
self.fq1_writer.write_record(&non_ol_consensus.f_record)?;
self.fq2_writer.write_record(&non_ol_consensus.r_record)?;
}
}
Ok(())
}
fn fq1_reader(&mut self) -> &mut fastq::Reader<R> {
self.fq1_reader
}
fn fq2_reader(&mut self) -> &mut fastq::Reader<R> {
self.fq2_reader
}
fn umi_len(&self) -> usize {
self.umi_len
}
fn seq_dist(&self) -> usize {
self.seq_dist
}
fn umi_dist(&self) -> usize {
self.umi_dist
}
fn reverse_umi(&self) -> bool {
self.reverse_umi
}
}