use std::{
path::{Path, PathBuf},
sync::mpsc::channel,
};
use colored::Colorize;
use indicatif::ProgressBar;
use rayon::prelude::*;
use crate::{
helper::{
types::{SeqReadFmt, SummaryMode},
utils::set_spinner,
},
stats::fastq::{FastqMappedRead, FastqSummary, FastqSummaryMin},
writer::read::{ReadPosSummaryWriter, ReadSummaryWriter},
};
pub struct GenomicReadSummary<'a> {
pub inputs: &'a mut [PathBuf],
pub input_fmt: &'a SeqReadFmt,
pub mode: &'a SummaryMode,
pub output: &'a Path,
pub prefix: Option<&'a str>,
}
impl<'a> GenomicReadSummary<'a> {
pub fn new(
inputs: &'a mut [PathBuf],
input_fmt: &'a SeqReadFmt,
mode: &'a SummaryMode,
output: &'a Path,
prefix: Option<&'a str>,
) -> Self {
Self {
inputs,
input_fmt,
mode,
output,
prefix,
}
}
pub fn summarize(&self) {
let spin = set_spinner();
spin.set_message("Calculating summary of fastq files");
match self.mode {
SummaryMode::Minimal => {
let mut records = self.par_summarize_minimal();
self.write_record_min(&spin, &mut records);
}
SummaryMode::Default => {
let mut records = self.par_summarize_default();
self.write_record_default(&spin, &mut records);
}
SummaryMode::Complete => {
let all_records = self.par_summarize_complete();
let (mut records, read_records): (Vec<FastqSummary>, Vec<FastqMappedRead>) =
all_records.into_iter().unzip();
self.write_record_complete(&spin, &mut records, &read_records);
}
}
spin.finish_with_message("Finished processing fastq files\n");
self.print_output_info();
}
fn par_summarize_default(&self) -> Vec<FastqSummary> {
let (sender, receiver) = channel();
self.inputs.par_iter().for_each_with(sender, |s, p| {
let record = self.summarize_default(p);
s.send(record)
.expect("Failed parallel processing fastq files");
});
receiver.iter().collect()
}
fn summarize_default(&self, path: &Path) -> FastqSummary {
let mut summary = FastqSummary::new(path);
summary.summarize(self.input_fmt);
summary
}
fn par_summarize_complete(&self) -> Vec<(FastqSummary, FastqMappedRead)> {
let (sender, receiver) = channel();
self.inputs.par_iter().for_each_with(sender, |s, p| {
let record = self.summarize_complete(p);
s.send(record)
.expect("Failed parallel processing fastq files");
});
receiver.iter().collect()
}
fn summarize_complete(&self, path: &Path) -> (FastqSummary, FastqMappedRead) {
let mut summary = FastqSummary::new(path);
let mapped_records = summary.summarize_map(self.input_fmt);
(summary, mapped_records)
}
fn par_summarize_minimal(&self) -> Vec<FastqSummaryMin> {
let (sender, receiver) = channel();
self.inputs.par_iter().for_each_with(sender, |s, p| {
let summary = self.summarize_minimal(p, self.input_fmt);
s.send(summary)
.expect("Failed parallel processing fastq files");
});
receiver.iter().collect()
}
fn summarize_minimal(&self, p: &Path, input_fmt: &SeqReadFmt) -> FastqSummaryMin {
let mut summary = FastqSummaryMin::new(p);
summary.summarize(input_fmt);
summary
}
fn write_record_min(&self, spin: &ProgressBar, records: &mut [FastqSummaryMin]) {
let writer = ReadSummaryWriter::new(self.output, self.prefix);
spin.set_message("Writing records\n");
writer
.write_read_count_only(records)
.expect("Failed writing to file");
}
fn write_record_default(&self, spin: &ProgressBar, records: &mut [FastqSummary]) {
records.sort_by(|a, b| a.path.cmp(&b.path));
spin.set_message("Writing records\n");
let writer = ReadSummaryWriter::new(self.output, self.prefix);
writer.write(records).expect("Failed writing to file");
}
fn write_record_complete(
&self,
spin: &ProgressBar,
records: &mut [FastqSummary],
read_records: &[FastqMappedRead],
) {
records.sort_by(|a, b| a.path.cmp(&b.path));
let writer = ReadSummaryWriter::new(self.output, self.prefix);
writer.write(records).expect("Failed writing to file");
spin.set_message("Writing records\n");
let pos_writer = ReadPosSummaryWriter::new(self.output, self.prefix);
pos_writer
.write(read_records)
.expect("Failed writing to file");
}
fn print_output_info(&self) {
log::info!("{}", "Output".yellow());
log::info!("{:18}: {}", "Dir", self.output.display());
}
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
use tempdir::TempDir;
use crate::core::read::summarize::GenomicReadSummary;
use crate::helper::types::{SeqReadFmt, SummaryMode};
use crate::stats::fastq::{FastqMappedRead, FastqSummary};
#[test]
fn test_summarize() {
let mut files = vec![
PathBuf::from("tests/files/raw/read_1.fastq"),
PathBuf::from("tests/files/raw/read_2.fastq"),
];
let output = TempDir::new("tempt").unwrap();
let handler = GenomicReadSummary::new(
&mut files,
&SeqReadFmt::Auto,
&SummaryMode::Default,
output.path(),
None,
);
handler.summarize();
assert!(output.path().exists());
}
#[test]
fn test_read_count_only() {
let mut files = vec![
PathBuf::from("tests/files/raw/read_1.fastq"),
PathBuf::from("tests/files/raw/read_2.fastq"),
];
let output = TempDir::new("tempt").unwrap();
let handler = GenomicReadSummary::new(
&mut files,
&SeqReadFmt::Auto,
&SummaryMode::Minimal,
output.path(),
None,
);
let records = handler.par_summarize_complete();
let (_, pos): (Vec<FastqSummary>, Vec<FastqMappedRead>) = records.into_iter().unzip();
pos.iter().for_each(|p| {
assert_eq!(p.reads.len(), 36);
assert_eq!(p.qscores.len(), 36);
});
assert_eq!(pos.len(), 2);
}
}