#![doc = include_str!("../README.md")]
extern crate nafcodec;
extern crate pyo3;
use std::fs::File;
use std::io::BufReader;
use std::ops::DerefMut;
use pyo3::exceptions::PyFileNotFoundError;
use pyo3::exceptions::PyIsADirectoryError;
use pyo3::exceptions::PyOSError;
use pyo3::exceptions::PyUnicodeError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyString;
fn convert_error(_py: Python, error: nafcodec::error::Error, path: Option<&str>) -> PyErr {
use nafcodec::error::Error;
match error {
Error::Utf8(_utf8_error) => PyUnicodeError::new_err("failed to decode UTF-8 data"),
Error::Nom(nom_error) => {
PyValueError::new_err(format!("parser failed: {:?}", nom_error.code))
}
Error::Io(io_error) => {
let desc = io_error.to_string();
if let Some(p) = path.map(str::to_string) {
match io_error.raw_os_error() {
Some(2) => PyFileNotFoundError::new_err((p,)),
#[cfg(target_os = "windows")]
Some(3) => PyFileNotFoundError::new_err((p,)),
#[cfg(not(target_os = "windows"))]
Some(21) => PyIsADirectoryError::new_err((p,)),
Some(code) => PyOSError::new_err((code, desc, p)),
None => PyOSError::new_err((desc,)),
}
} else {
match io_error.raw_os_error() {
Some(2) => PyFileNotFoundError::new_err((desc,)),
#[cfg(target_os = "windows")]
Some(3) => PyFileNotFoundError::new_err((desc,)),
#[cfg(not(target_os = "windows"))]
Some(21) => PyIsADirectoryError::new_err((desc,)),
Some(code) => PyOSError::new_err((code, desc)),
None => PyOSError::new_err((desc,)),
}
}
}
}
}
#[pyclass(module = "nafcodec.lib")]
#[derive(Clone, Debug)]
pub struct Record {
#[pyo3(get, set)]
id: Option<Py<PyString>>,
#[pyo3(get, set)]
comment: Option<Py<PyString>>,
#[pyo3(get, set)]
sequence: Option<Py<PyString>>,
#[pyo3(get, set)]
quality: Option<Py<PyString>>,
#[pyo3(get, set)]
length: Option<u64>,
}
impl pyo3::conversion::IntoPy<Record> for nafcodec::data::Record {
fn into_py(self, py: Python<'_>) -> Record {
let id = self.id.map(|x| PyString::new(py, &x).into());
let sequence = self.sequence.map(|x| PyString::new(py, &x).into());
let comment = self.comment.map(|x| PyString::new(py, &x).into());
let quality = self.quality.map(|x| PyString::new(py, &x).into());
let length = self.length;
Record {
id,
sequence,
comment,
quality,
length,
}
}
}
#[pyclass(module = "nafcodec.lib")]
pub struct Decoder {
decoder: nafcodec::Decoder<'static, BufReader<File>>,
}
#[pymethods]
impl Decoder {
#[new]
fn __init__(path: &PyAny) -> PyResult<PyClassInitializer<Self>> {
let py = path.py();
let fspath = py
.import("os")?
.call_method1(pyo3::intern!(py, "fspath"), (path,))?
.downcast::<PyString>()?;
let fspath_str = fspath.to_str()?;
let decoder = nafcodec::Decoder::from_path(fspath_str)
.map_err(|e| convert_error(py, e, Some(fspath_str)))?;
Ok(Decoder { decoder }.into())
}
fn __iter__(slf: PyRefMut<'_, Self>) -> PyResult<PyRefMut<'_, Self>> {
Ok(slf)
}
fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Record>> {
let result = slf.deref_mut().decoder.next().transpose();
let py = slf.py();
match result {
Ok(None) => Ok(None),
Ok(Some(record)) => Ok(Some(record.into_py(py))),
Err(e) => Err(convert_error(py, e, None)),
}
}
}
#[pymodule]
#[pyo3(name = "lib")]
pub fn init(_py: Python, m: &PyModule) -> PyResult<()> {
m.add("__package__", "nafcodec")?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add("__author__", env!("CARGO_PKG_AUTHORS").replace(':', "\n"))?;
m.add_class::<Decoder>()?;
m.add_class::<Record>()?;
Ok(())
}