use crate::cli::CompressionExt;
use crate::source::RecordSource;
use needletail::errors::ParseErrorKind::EmptyFile;
use niffler::compression;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum FastxError {
#[error("Read error")]
ReadError {
source: needletail::errors::ParseError,
},
#[error("Failed to parse record")]
ParseError {
source: needletail::errors::ParseError,
},
#[error("Output file could not be created")]
CreateError { source: std::io::Error },
#[error(transparent)]
CompressOutputError(#[from] niffler::Error),
#[error("Some expected indices were not in the input file")]
IndicesNotFound,
#[error("Could not write to output file")]
WriteError { source: anyhow::Error },
#[error("Alignment read error: {source}")]
AlignmentReadError { source: std::io::Error },
#[error("Error: Mapped read detected, please use `rasusa aln` for aligned data")]
MappedReadDetected,
}
#[derive(Debug, PartialEq)]
pub struct Fastx {
path: PathBuf,
}
impl Fastx {
pub fn from_path(path: &Path) -> Self {
Fastx {
path: path.to_path_buf(),
}
}
}
impl RecordSource for Fastx {
fn read_lengths(&self) -> Result<Vec<u32>, FastxError> {
let mut read_lengths: Vec<u32> = vec![];
let reader = match niffler::send::from_path(&self.path) {
Ok((rdr, _)) => rdr,
Err(source) => match source {
niffler::error::Error::FileTooShort => return Ok(read_lengths),
_ => return Err(FastxError::CompressOutputError(source)),
},
};
let mut reader = match needletail::parse_fastx_reader(reader) {
Ok(rdr) => rdr,
Err(e) if e.kind == EmptyFile => return Ok(read_lengths),
Err(source) => return Err(FastxError::ReadError { source }),
};
while let Some(record) = reader.next() {
match record {
Ok(rec) => read_lengths.push(rec.num_bases() as u32),
Err(err) => return Err(FastxError::ParseError { source: err }),
}
}
Ok(read_lengths)
}
fn filter_reads_into(
&self,
reads_to_keep: &[bool],
nb_reads_keep: usize,
write_to: &mut dyn Write,
_output_format: Option<noodles_util::alignment::io::Format>,
is_fasta: bool,
) -> Result<usize, FastxError> {
let mut total_len = 0;
let (reader, _) = niffler::send::from_path(&self.path)?;
let mut reader = needletail::parse_fastx_reader(reader)
.map_err(|source| FastxError::ReadError { source })?;
let mut read_idx: usize = 0;
let mut nb_reads_written = 0;
while let Some(record) = reader.next() {
match record {
Err(source) => return Err(FastxError::ParseError { source }),
Ok(rec) if reads_to_keep[read_idx] => {
total_len += rec.num_bases();
if is_fasta {
write_to
.write_all(b">")
.map_err(|err| FastxError::WriteError {
source: anyhow::Error::from(err),
})?;
write_to
.write_all(rec.id())
.map_err(|err| FastxError::WriteError {
source: anyhow::Error::from(err),
})?;
write_to
.write_all(b"\n")
.map_err(|err| FastxError::WriteError {
source: anyhow::Error::from(err),
})?;
write_to
.write_all(&rec.seq())
.map_err(|err| FastxError::WriteError {
source: anyhow::Error::from(err),
})?;
write_to
.write_all(b"\n")
.map_err(|err| FastxError::WriteError {
source: anyhow::Error::from(err),
})?;
} else {
rec.write(write_to, None)
.map_err(|err| FastxError::WriteError {
source: anyhow::Error::from(err),
})?;
}
nb_reads_written += 1;
if nb_reads_keep == nb_reads_written {
break;
}
}
Ok(_) => (),
}
read_idx += 1;
}
if nb_reads_written == nb_reads_keep {
Ok(total_len)
} else {
Err(FastxError::IndicesNotFound)
}
}
}
pub fn create_output_writer(
path: &Path,
compression_lvl: Option<niffler::compression::Level>,
compression_fmt: Option<niffler::compression::Format>,
) -> Result<Box<dyn Write>, FastxError> {
let file = File::create(path).map_err(|source| FastxError::CreateError { source })?;
let file_handle = Box::new(BufWriter::new(file));
let fmt = compression_fmt.unwrap_or_else(|| niffler::Format::from_path(path));
let compression_lvl = compression_lvl.unwrap_or(match fmt {
compression::Format::Gzip => compression::Level::Six,
compression::Format::Bzip => compression::Level::Nine,
compression::Format::Lzma => compression::Level::Six,
compression::Format::Zstd => compression::Level::Three,
_ => compression::Level::Zero,
});
niffler::get_writer(file_handle, fmt, compression_lvl).map_err(FastxError::CompressOutputError)
}
#[cfg(test)]
mod tests {
use super::*;
use std::any::Any;
use std::io::{Read, Write};
use std::path::Path;
use tempfile::Builder;
#[test]
fn fastx_from_fasta() {
let path = Path::new("data/my.fa");
let actual = Fastx::from_path(path);
let expected = Fastx {
path: path.to_path_buf(),
};
assert_eq!(actual, expected)
}
#[test]
fn create_invalid_output_file_raises_error() {
let path = Path::new("invalid/out/path.fq");
let actual = create_output_writer(path, Some(niffler::Level::Eight), None)
.err()
.unwrap();
let expected = FastxError::CreateError {
source: std::io::Error::other(String::from("No such file or directory (os error 2)")),
};
assert_eq!(actual.type_id(), expected.type_id())
}
#[test]
fn create_valid_output_file_and_can_write_to_it() {
let file = Builder::new().suffix(".fastq").tempfile().unwrap();
let mut writer =
create_output_writer(file.path(), Some(niffler::Level::Eight), None).unwrap();
let actual = writer.write(b"foo\nbar");
assert!(actual.is_ok())
}
#[test]
fn create_valid_compressed_output_file_and_can_write_to_it() {
let file = Builder::new().suffix(".fastq.gz").tempfile().unwrap();
let mut writer =
create_output_writer(file.path(), Some(niffler::Level::Four), None).unwrap();
let actual = writer.write(b"foo\nbar");
assert!(actual.is_ok())
}
#[test]
fn get_read_lengths_for_empty_fasta_returns_empty_vector() {
let text = "";
let mut file = Builder::new().suffix(".fa").tempfile().unwrap();
file.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(file.path());
let actual = fastx.read_lengths().unwrap();
let expected: Vec<u32> = Vec::new();
assert_eq!(actual, expected)
}
#[test]
fn get_read_lengths_for_fasta() {
let text = ">read1\nACGT\n>read2\nG";
let mut file = Builder::new().suffix(".fa").tempfile().unwrap();
file.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(file.path());
let actual = fastx.read_lengths().unwrap();
let expected: Vec<u32> = vec![4, 1];
assert_eq!(actual, expected)
}
#[test]
fn get_read_lengths_for_fastq() {
let text = "@read1\nACGT\n+\n!!!!\n@read2\nG\n+\n!";
let mut file = Builder::new().suffix(".fq").tempfile().unwrap();
file.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(file.path());
let actual = fastx.read_lengths().unwrap();
let expected: Vec<u32> = vec![4, 1];
assert_eq!(actual, expected)
}
#[test]
fn filter_reads_empty_indices_no_output() {
let text = "@read1\nACGT\n+\n!!!!";
let mut input = Builder::new().suffix(".fastq").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![false];
let output = Builder::new().suffix(".fastq").tempfile().unwrap();
let mut out_fh = create_output_writer(output.path(), None, None).unwrap();
let filter_result = fastx.filter_reads_into(&reads_to_keep, 0, &mut out_fh, None, false);
assert!(filter_result.is_ok());
let mut actual = String::new();
output.into_file().read_to_string(&mut actual).unwrap();
let expected = String::new();
assert_eq!(actual, expected)
}
#[test]
fn filter_fastq_reads_one_index_matches_only_read() {
let text = "@read1\nACGT\n+\n!!!!\n";
let mut input = Builder::new().suffix(".fastq").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![true];
let output = Builder::new().suffix(".fastq").tempfile().unwrap();
{
let mut out_fh = create_output_writer(output.path(), None, None).unwrap();
let filter_result =
fastx.filter_reads_into(&reads_to_keep, 1, &mut out_fh, None, false);
assert!(filter_result.is_ok());
}
let actual = std::fs::read_to_string(output).unwrap();
let expected = text;
assert_eq!(actual, expected)
}
#[test]
fn filter_fasta_reads_one_index_matches_only_read() {
let text = ">read1\nACGT\n";
let mut input = Builder::new().suffix(".fa").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![true];
let output = Builder::new().suffix(".fa").tempfile().unwrap();
{
let mut out_fh = create_output_writer(output.path(), None, None).unwrap();
let filter_result = fastx.filter_reads_into(&reads_to_keep, 1, &mut out_fh, None, true);
assert!(filter_result.is_ok());
}
let actual = std::fs::read_to_string(output).unwrap();
let expected = text;
assert_eq!(actual, expected)
}
#[test]
fn filter_fastq_reads_one_index_matches_one_of_two_reads() {
let text = "@read1\nACGT\n+\n!!!!\n@read2\nCCCC\n+\n$$$$\n";
let mut input = Builder::new().suffix(".fastq").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![false, true];
let output = Builder::new().suffix(".fastq").tempfile().unwrap();
{
let mut out_fh = create_output_writer(output.path(), None, None).unwrap();
let filter_result =
fastx.filter_reads_into(&reads_to_keep, 1, &mut out_fh, None, false);
assert!(filter_result.is_ok());
}
let actual = std::fs::read_to_string(output).unwrap();
let expected = "@read2\nCCCC\n+\n$$$$\n";
assert_eq!(actual, expected)
}
#[test]
fn filter_fastq_reads_two_indices_matches_first_and_last_reads() {
let text = "@read1\nACGT\n+\n!!!!\n@read2\nCCCC\n+\n$$$$\n@read3\nA\n+\n$\n";
let mut input = Builder::new().suffix(".fastq").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![true, false, true];
let output = Builder::new().suffix(".fastq").tempfile().unwrap();
{
let mut out_fh = create_output_writer(output.path(), None, None).unwrap();
let filter_result =
fastx.filter_reads_into(&reads_to_keep, 2, &mut out_fh, None, false);
assert!(filter_result.is_ok());
}
let actual = std::fs::read_to_string(output).unwrap();
let expected = "@read1\nACGT\n+\n!!!!\n@read3\nA\n+\n$\n";
assert_eq!(actual, expected)
}
#[test]
fn filter_fasta_reads_one_index_out_of_range() {
let text = ">read1 length=4\nACGT\n>read2\nCCCC\n";
let mut input = Builder::new().suffix(".fa").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![true, false, true];
let output = Builder::new().suffix(".fa").tempfile().unwrap();
{
let mut out_fh =
create_output_writer(output.path(), Some(niffler::Level::Four), None).unwrap();
let filter_result = fastx.filter_reads_into(&reads_to_keep, 2, &mut out_fh, None, true);
assert!(filter_result.is_err());
}
let actual = std::fs::read_to_string(output).unwrap();
let expected = ">read1 length=4\nACGT\n";
assert_eq!(actual, expected)
}
#[test]
fn filter_fastq_reads_one_index_out_of_range() {
let text = "@read1 length=4\nACGT\n+\n!!!!\n@read2\nC\n+\n^\n";
let mut input = Builder::new().suffix(".fq").tempfile().unwrap();
input.write_all(text.as_bytes()).unwrap();
let fastx = Fastx::from_path(input.path());
let reads_to_keep: Vec<bool> = vec![true, false, true];
let output = Builder::new().suffix(".fq").tempfile().unwrap();
{
let mut out_fh =
create_output_writer(output.path(), Some(niffler::Level::Four), None).unwrap();
let filter_result =
fastx.filter_reads_into(&reads_to_keep, 2, &mut out_fh, None, false);
assert!(filter_result.is_err());
}
let actual = std::fs::read_to_string(output).unwrap();
let expected = "@read1 length=4\nACGT\n+\n!!!!\n";
assert_eq!(actual, expected)
}
}