1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#![doc = include_str!("../README.md")]

extern crate nafcodec;
extern crate pyo3;

use std::fs::File;
use std::io::BufReader;
use std::ops::DerefMut;

use pyo3::exceptions::PyFileNotFoundError;
use pyo3::exceptions::PyIsADirectoryError;
use pyo3::exceptions::PyOSError;

use pyo3::exceptions::PyUnicodeError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyString;

/// Convert a `nafcodec::error::Error` into a Python exception.
fn convert_error(_py: Python, error: nafcodec::error::Error, path: Option<&str>) -> PyErr {
    use nafcodec::error::Error;

    match error {
        Error::Utf8(_utf8_error) => PyUnicodeError::new_err("failed to decode UTF-8 data"),
        Error::Nom(nom_error) => {
            PyValueError::new_err(format!("parser failed: {:?}", nom_error.code))
        }
        Error::Io(io_error) => {
            let desc = io_error.to_string();
            if let Some(p) = path.map(str::to_string) {
                match io_error.raw_os_error() {
                    Some(2) => PyFileNotFoundError::new_err((p,)),
                    #[cfg(target_os = "windows")]
                    Some(3) => PyFileNotFoundError::new_err((p,)),
                    #[cfg(not(target_os = "windows"))]
                    Some(21) => PyIsADirectoryError::new_err((p,)),
                    Some(code) => PyOSError::new_err((code, desc, p)),
                    None => PyOSError::new_err((desc,)),
                }
            } else {
                match io_error.raw_os_error() {
                    Some(2) => PyFileNotFoundError::new_err((desc,)),
                    #[cfg(target_os = "windows")]
                    Some(3) => PyFileNotFoundError::new_err((desc,)),
                    #[cfg(not(target_os = "windows"))]
                    Some(21) => PyIsADirectoryError::new_err((desc,)),
                    Some(code) => PyOSError::new_err((code, desc)),
                    None => PyOSError::new_err((desc,)),
                }
            }
        }
    }
}

/// A single sequence record stored in a Nucleotide Archive Format file.
#[pyclass(module = "nafcodec.lib")]
#[derive(Clone, Debug)]
pub struct Record {
    /// `str` or `None`: The record identifier.
    #[pyo3(get, set)]
    id: Option<Py<PyString>>,
    /// `str` or `None`: The record comment.
    #[pyo3(get, set)]
    comment: Option<Py<PyString>>,
    /// `str` or `None`: The record sequence.
    #[pyo3(get, set)]
    sequence: Option<Py<PyString>>,
    /// `str` or `None`: The record quality.
    #[pyo3(get, set)]
    quality: Option<Py<PyString>>,
    /// `str` or `None`: The record sequence length.
    #[pyo3(get, set)]
    length: Option<u64>,
}

impl pyo3::conversion::IntoPy<Record> for nafcodec::data::Record {
    fn into_py(self, py: Python<'_>) -> Record {
        let id = self.id.map(|x| PyString::new(py, &x).into());
        let sequence = self.sequence.map(|x| PyString::new(py, &x).into());
        let comment = self.comment.map(|x| PyString::new(py, &x).into());
        let quality = self.quality.map(|x| PyString::new(py, &x).into());
        let length = self.length;
        Record {
            id,
            sequence,
            comment,
            quality,
            length,
        }
    }
}

/// A streaming decoder to read a Nucleotide Archive Format file.
#[pyclass(module = "nafcodec.lib")]
pub struct Decoder {
    decoder: nafcodec::Decoder<'static, BufReader<File>>,
}

#[pymethods]
impl Decoder {
    #[new]
    fn __init__(path: &PyAny) -> PyResult<PyClassInitializer<Self>> {
        let py = path.py();
        let fspath = py
            .import("os")?
            .call_method1(pyo3::intern!(py, "fspath"), (path,))?
            .downcast::<PyString>()?;
        let fspath_str = fspath.to_str()?;
        let decoder = nafcodec::Decoder::from_path(fspath_str)
            .map_err(|e| convert_error(py, e, Some(fspath_str)))?;
        Ok(Decoder { decoder }.into())
    }

    fn __iter__(slf: PyRefMut<'_, Self>) -> PyResult<PyRefMut<'_, Self>> {
        Ok(slf)
    }

    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Record>> {
        let result = slf.deref_mut().decoder.next().transpose();
        let py = slf.py();
        match result {
            Ok(None) => Ok(None),
            Ok(Some(record)) => Ok(Some(record.into_py(py))),
            Err(e) => Err(convert_error(py, e, None)),
        }
    }
}

/// An encoder/decoder for Nucleotide Archive Format files.
#[pymodule]
#[pyo3(name = "lib")]
pub fn init(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add("__package__", "nafcodec")?;
    m.add("__version__", env!("CARGO_PKG_VERSION"))?;
    m.add("__author__", env!("CARGO_PKG_AUTHORS").replace(':', "\n"))?;

    m.add_class::<Decoder>()?;
    m.add_class::<Record>()?;

    Ok(())
}