noodles_gff/async/io/
reader.rs

1mod line;
2
3use futures::{stream, Stream, TryStreamExt};
4use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};
5
6use crate::{directive_buf::key, feature::RecordBuf, Line, LineBuf};
7
8/// An async GFF reader.
9pub struct Reader<R> {
10    inner: R,
11}
12
13impl<R> Reader<R> {
14    /// Returns a reference to the underlying reader.
15    ///
16    /// # Examples
17    ///
18    /// ```
19    /// use noodles_gff as gff;
20    /// use tokio::io;
21    /// let reader = gff::r#async::io::Reader::new(io::empty());
22    /// let _inner = reader.get_ref();
23    /// ```
24    pub fn get_ref(&self) -> &R {
25        &self.inner
26    }
27
28    /// Returns a mutable reference to the underlying reader.
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use noodles_gff as gff;
34    /// use tokio::io;
35    /// let mut reader = gff::r#async::io::Reader::new(io::empty());
36    /// let _inner = reader.get_mut();
37    /// ```
38    pub fn get_mut(&mut self) -> &mut R {
39        &mut self.inner
40    }
41
42    /// Unwraps and returns the underlying reader.
43    ///
44    /// # Examples
45    ///
46    /// ```
47    /// use noodles_gff as gff;
48    /// use tokio::io;
49    /// let reader = gff::r#async::io::Reader::new(io::empty());
50    /// let _inner = reader.into_inner();
51    /// ```
52    pub fn into_inner(self) -> R {
53        self.inner
54    }
55}
56
57impl<R> Reader<R>
58where
59    R: AsyncBufRead + Unpin,
60{
61    /// Creates an async GFF reader.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use noodles_gff as gff;
67    /// use tokio::io;
68    /// let reader = gff::r#async::io::Reader::new(io::empty());
69    /// ```
70    pub fn new(inner: R) -> Self {
71        Self { inner }
72    }
73
74    /// Reads a lazy line.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// # #[tokio::main]
80    /// # async fn main() -> tokio::io::Result<()> {
81    /// use noodles_gff as gff;
82    ///
83    /// let data = b"##gff-version 3\n";
84    /// let mut reader = gff::r#async::io::Reader::new(&data[..]);
85    ///
86    /// let mut line = gff::Line::default();
87    ///
88    /// reader.read_line(&mut line).await?;
89    /// assert_eq!(line.kind(), gff::line::Kind::Directive);
90    ///
91    /// assert_eq!(reader.read_line(&mut line).await?, 0);
92    /// # Ok(())
93    /// # }
94    /// ```
95    pub async fn read_line(&mut self, line: &mut Line) -> io::Result<usize> {
96        line::read_line(&mut self.inner, line).await
97    }
98
99    /// Returns a stream over lines.
100    ///
101    /// When using this, the caller is responsible to stop reading at either EOF or when the
102    /// `FASTA` directive is read, whichever comes first.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// # #[tokio::main]
108    /// # async fn main() -> tokio::io::Result<()> {
109    /// use futures::TryStreamExt;
110    /// use noodles_gff::{self as gff, directive_buf::key};
111    /// use tokio::io;
112    ///
113    /// let mut reader = gff::r#async::io::Reader::new(io::empty());
114    /// let mut lines = reader.lines();
115    ///
116    /// while let Some(line) = lines.try_next().await? {
117    ///     if let Some(key::FASTA) = line.as_directive().map(|directive| directive.key().as_ref()) {
118    ///         break;
119    ///     }
120    ///
121    ///     // ...
122    /// }
123    /// # Ok(())
124    /// # }
125    /// ```
126    pub fn lines(&mut self) -> impl Stream<Item = io::Result<Line>> + '_ {
127        Box::pin(stream::try_unfold(
128            (self, Line::default()),
129            |(reader, mut line)| async {
130                reader.read_line(&mut line).await.map(|n| match n {
131                    0 => None,
132                    _ => Some((line.clone(), (reader, line))),
133                })
134            },
135        ))
136    }
137
138    /// Returns a stream over line buffers.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// # #[tokio::main]
144    /// # async fn main() -> tokio::io::Result<()> {
145    /// use futures::TryStreamExt;
146    /// use noodles_gff::{self as gff, LineBuf};
147    ///
148    /// let data = b"##gff-version 3\n";
149    /// let mut reader = gff::r#async::io::Reader::new(&data[..]);
150    /// let mut lines = reader.line_bufs();
151    ///
152    /// let line = lines.try_next().await?;
153    /// assert!(matches!(line, Some(LineBuf::Directive(_))));
154    ///
155    /// assert!(lines.try_next().await?.is_none());
156    /// # Ok(())
157    /// # }
158    /// ```
159    pub fn line_bufs(&mut self) -> impl Stream<Item = io::Result<LineBuf>> + '_ {
160        use crate::line::Kind;
161
162        Box::pin(stream::try_unfold(
163            (self, Line::default()),
164            |(reader, mut line)| async {
165                reader.read_line(&mut line).await.and_then(|n| match n {
166                    0 => Ok(None),
167                    _ => match line.kind() {
168                        Kind::Directive => {
169                            let directive = line
170                                .as_directive()
171                                .map(|d| LineBuf::Directive(d.into()))
172                                .unwrap(); // SAFETY: `line` is a directive.
173
174                            Ok(Some((directive, (reader, line))))
175                        }
176                        Kind::Comment => Ok(Some((
177                            LineBuf::Comment(line.as_ref().into()),
178                            (reader, line),
179                        ))),
180                        Kind::Record => {
181                            let record = line
182                                .as_record()
183                                .unwrap() // SAFETY: `line` is a record.
184                                .and_then(|record| {
185                                    RecordBuf::try_from_feature_record(&record).map(LineBuf::Record)
186                                })?;
187
188                            Ok(Some((record, (reader, line))))
189                        }
190                    },
191                })
192            },
193        ))
194    }
195
196    /// Returns a stream over records.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// # #[tokio::main]
202    /// # async fn main() -> tokio::io::Result<()> {
203    /// use futures::TryStreamExt;
204    /// use noodles_gff as gff;
205    ///
206    /// let data = b"##gff-version 3\n";
207    /// let mut reader = gff::r#async::io::Reader::new(&data[..]);
208    /// let mut records = reader.record_bufs();
209    ///
210    /// assert!(records.try_next().await?.is_none());
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub fn record_bufs(&mut self) -> impl Stream<Item = io::Result<RecordBuf>> + '_ {
215        Box::pin(stream::try_unfold(self.line_bufs(), |mut lines| async {
216            loop {
217                match lines.try_next().await? {
218                    None => return Ok(None),
219                    Some(LineBuf::Directive(directive)) if directive.key() == key::FASTA => {
220                        return Ok(None)
221                    }
222                    Some(LineBuf::Record(record)) => return Ok(Some((record, lines))),
223                    _ => {}
224                }
225            }
226        }))
227    }
228}
229
230async fn read_line<R>(reader: &mut R, buf: &mut Vec<u8>) -> io::Result<usize>
231where
232    R: AsyncBufRead + Unpin,
233{
234    const LINE_FEED: u8 = b'\n';
235    const CARRIAGE_RETURN: u8 = b'\r';
236
237    match reader.read_until(LINE_FEED, buf).await? {
238        0 => Ok(0),
239        n => {
240            if buf.ends_with(&[LINE_FEED]) {
241                buf.pop();
242
243                if buf.ends_with(&[CARRIAGE_RETURN]) {
244                    buf.pop();
245                }
246            }
247
248            Ok(n)
249        }
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[tokio::test]
258    async fn test_read_line() -> io::Result<()> {
259        async fn t(buf: &mut Vec<u8>, mut data: &[u8], expected: &[u8]) -> io::Result<()> {
260            buf.clear();
261            read_line(&mut data, buf).await?;
262            assert_eq!(buf, expected);
263            Ok(())
264        }
265
266        let mut buf = Vec::new();
267
268        t(&mut buf, b"noodles\n", b"noodles").await?;
269        t(&mut buf, b"noodles\r\n", b"noodles").await?;
270        t(&mut buf, b"noodles", b"noodles").await?;
271
272        Ok(())
273    }
274}