use crate::parser::SequenceRecord;
use crate::quality::{decode_phred, PhredEncoding};
use crate::sequence::{complement, normalize};
use crate::{parse_fastx_file, parse_fastx_reader, FastxReader};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use pyo3::{create_exception, wrap_pyfunction};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Mutex;
create_exception!(needletail, NeedletailError, pyo3::exceptions::PyException);
macro_rules! py_try {
($call:expr) => {
$call.map_err(|e| PyErr::new::<NeedletailError, _>(format!("{}", e)))?
};
}
fn get_seq_snippet(seq: &str, max_len: usize) -> String {
if seq.len() > max_len {
let start = &seq[..max_len - 4];
let end = &seq[seq.len() - 3..];
format!("{start}…{end}")
} else {
seq.to_string()
}
}
#[pyclass]
#[pyo3(name = "FastxReader")]
pub struct PyFastxReader {
reader: Mutex<Box<dyn FastxReader>>,
}
#[pymethods]
impl PyFastxReader {
fn __repr__(&self) -> PyResult<String> {
Ok("<FastxReader>".to_string())
}
fn __iter__(slf: PyRefMut<Self>) -> PyRefMut<Self> {
slf
}
fn __next__(slf: PyRefMut<Self>) -> PyResult<Option<Record>> {
if let Some(rec) = slf.reader.lock().unwrap().next() {
let record = py_try!(rec);
Ok(Some(Record::from_sequence_record(&record)))
} else {
Ok(None)
}
}
}
#[pyclass]
pub struct Record {
#[pyo3(get)]
id: String,
#[pyo3(get)]
seq: String,
#[pyo3(get)]
qual: Option<String>,
}
impl Record {
fn from_sequence_record(rec: &SequenceRecord) -> Self {
Self {
id: String::from_utf8_lossy(rec.id()).to_string(),
seq: String::from_utf8_lossy(&rec.seq()).to_string(),
qual: rec.qual().map(|q| String::from_utf8_lossy(q).to_string()),
}
}
}
#[pymethods]
impl Record {
#[getter]
pub fn name(&self) -> PyResult<&str> {
if let Some(pos) = self.id.find(char::is_whitespace) {
Ok(&self.id[..pos])
} else {
Ok(&self.id)
}
}
#[getter]
pub fn description(&self) -> PyResult<Option<&str>> {
if let Some(pos) = self.id.find(char::is_whitespace) {
Ok(Some(self.id[pos..].trim_start()))
} else {
Ok(None)
}
}
pub fn is_fasta(&self) -> PyResult<bool> {
Ok(self.qual.is_none())
}
pub fn is_fastq(&self) -> PyResult<bool> {
Ok(self.qual.is_some())
}
#[pyo3(signature = (iupac=false))]
pub fn normalize(&mut self, iupac: bool) -> PyResult<()> {
if let Some(s) = normalize(self.seq.as_bytes(), iupac) {
self.seq = String::from_utf8_lossy(&s).to_string();
}
Ok(())
}
#[new]
#[pyo3(signature = (id, seq, qual=None))]
fn new(id: String, seq: String, qual: Option<String>) -> PyResult<Record> {
if let Some(qual) = &qual {
if qual.len() != seq.len() {
return Err(PyValueError::new_err(
"Sequence and quality strings must have the same length",
));
}
}
Ok(Record { id, seq, qual })
}
pub fn __hash__(&self) -> PyResult<u64> {
let mut hasher = DefaultHasher::new();
self.id.hash(&mut hasher);
self.seq.hash(&mut hasher);
if let Some(qual) = &self.qual {
qual.hash(&mut hasher);
}
Ok(hasher.finish())
}
pub fn __eq__(&self, other: &Record) -> PyResult<bool> {
Ok(self.id == other.id && self.seq == other.seq && self.qual == other.qual)
}
pub fn __len__(&self) -> PyResult<usize> {
Ok(self.seq.len())
}
pub fn __str__(&self) -> PyResult<String> {
if self.qual.is_none() {
Ok(format!(">{}\n{}\n", self.id, self.seq))
} else {
Ok(format!(
"@{}\n{}\n+\n{}\n",
self.id,
self.seq,
self.qual.clone().unwrap()
))
}
}
fn __repr__(&self) -> PyResult<String> {
let id_snippet = match self.name() {
Ok(name) if name != self.id => format!("{name}…"),
Ok(name) => name.to_string(),
Err(_) => self.id.clone(),
};
let seq_snippet = get_seq_snippet(&self.seq, 20);
let quality_snippet = match &self.qual {
Some(qual) => get_seq_snippet(qual, 20),
None => "None".to_string(),
};
Ok(format!(
"Record(id={id_snippet}, seq={seq_snippet}, qual={quality_snippet})"
))
}
}
#[pyfunction]
#[pyo3(name = "parse_fastx_file")]
fn py_parse_fastx_file(path: PathBuf) -> PyResult<PyFastxReader> {
let reader = py_try!(parse_fastx_file(path));
Ok(PyFastxReader {
reader: reader.into(),
})
}
#[pyfunction]
fn parse_fastx_string(fastx_string: &str) -> PyResult<PyFastxReader> {
let reader = py_try!(parse_fastx_reader(Cursor::new(fastx_string.to_owned())));
Ok(PyFastxReader {
reader: reader.into(),
})
}
#[pyfunction]
#[pyo3(signature = (seq, iupac=false))]
pub fn normalize_seq(seq: &str, iupac: bool) -> PyResult<String> {
if let Some(s) = normalize(seq.as_bytes(), iupac) {
Ok(String::from_utf8_lossy(&s).to_string())
} else {
Ok(seq.to_owned())
}
}
#[pyfunction]
pub fn reverse_complement(seq: &str) -> PyResult<String> {
let comp: Vec<u8> = seq
.as_bytes()
.iter()
.rev()
.map(|n| complement(*n))
.collect();
Ok(String::from_utf8_lossy(&comp).to_string())
}
#[pyfunction]
#[pyo3(name = "decode_phred", signature = (qual, base_64=false))]
pub fn py_decode_phred(qual: &str, base_64: bool, py: Python<'_>) -> PyResult<Py<PyTuple>> {
let encoding = if base_64 {
PhredEncoding::Phred64
} else {
PhredEncoding::Phred33
};
let scores = decode_phred(qual.as_bytes(), encoding)
.map_err(|e| PyValueError::new_err(format!("Invalid Phred quality: {e}")))?;
Ok(PyTuple::new(py, &scores)?.into())
}
#[pymodule]
fn needletail(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyFastxReader>()?;
m.add_class::<Record>()?;
m.add_wrapped(wrap_pyfunction!(py_parse_fastx_file))?;
m.add_wrapped(wrap_pyfunction!(parse_fastx_string))?;
m.add_wrapped(wrap_pyfunction!(normalize_seq))?;
m.add_wrapped(wrap_pyfunction!(reverse_complement))?;
m.add_wrapped(wrap_pyfunction!(py_decode_phred))?;
m.add("NeedletailError", py.get_type::<NeedletailError>())?;
Ok(())
}