noodles_gff/async/io/reader.rs
1mod line;
2
3use futures::{stream, Stream, TryStreamExt};
4use tokio::io::{self, AsyncBufRead, AsyncBufReadExt};
5
6use crate::{directive_buf::key, feature::RecordBuf, Line, LineBuf};
7
8/// An async GFF reader.
9pub struct Reader<R> {
10 inner: R,
11}
12
13impl<R> Reader<R> {
14 /// Returns a reference to the underlying reader.
15 ///
16 /// # Examples
17 ///
18 /// ```
19 /// use noodles_gff as gff;
20 /// use tokio::io;
21 /// let reader = gff::r#async::io::Reader::new(io::empty());
22 /// let _inner = reader.get_ref();
23 /// ```
24 pub fn get_ref(&self) -> &R {
25 &self.inner
26 }
27
28 /// Returns a mutable reference to the underlying reader.
29 ///
30 /// # Examples
31 ///
32 /// ```
33 /// use noodles_gff as gff;
34 /// use tokio::io;
35 /// let mut reader = gff::r#async::io::Reader::new(io::empty());
36 /// let _inner = reader.get_mut();
37 /// ```
38 pub fn get_mut(&mut self) -> &mut R {
39 &mut self.inner
40 }
41
42 /// Unwraps and returns the underlying reader.
43 ///
44 /// # Examples
45 ///
46 /// ```
47 /// use noodles_gff as gff;
48 /// use tokio::io;
49 /// let reader = gff::r#async::io::Reader::new(io::empty());
50 /// let _inner = reader.into_inner();
51 /// ```
52 pub fn into_inner(self) -> R {
53 self.inner
54 }
55}
56
57impl<R> Reader<R>
58where
59 R: AsyncBufRead + Unpin,
60{
61 /// Creates an async GFF reader.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// use noodles_gff as gff;
67 /// use tokio::io;
68 /// let reader = gff::r#async::io::Reader::new(io::empty());
69 /// ```
70 pub fn new(inner: R) -> Self {
71 Self { inner }
72 }
73
74 /// Reads a lazy line.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// # #[tokio::main]
80 /// # async fn main() -> tokio::io::Result<()> {
81 /// use noodles_gff as gff;
82 ///
83 /// let data = b"##gff-version 3\n";
84 /// let mut reader = gff::r#async::io::Reader::new(&data[..]);
85 ///
86 /// let mut line = gff::Line::default();
87 ///
88 /// reader.read_line(&mut line).await?;
89 /// assert_eq!(line.kind(), gff::line::Kind::Directive);
90 ///
91 /// assert_eq!(reader.read_line(&mut line).await?, 0);
92 /// # Ok(())
93 /// # }
94 /// ```
95 pub async fn read_line(&mut self, line: &mut Line) -> io::Result<usize> {
96 line::read_line(&mut self.inner, line).await
97 }
98
99 /// Returns a stream over lines.
100 ///
101 /// When using this, the caller is responsible to stop reading at either EOF or when the
102 /// `FASTA` directive is read, whichever comes first.
103 ///
104 /// # Examples
105 ///
106 /// ```
107 /// # #[tokio::main]
108 /// # async fn main() -> tokio::io::Result<()> {
109 /// use futures::TryStreamExt;
110 /// use noodles_gff::{self as gff, directive_buf::key};
111 /// use tokio::io;
112 ///
113 /// let mut reader = gff::r#async::io::Reader::new(io::empty());
114 /// let mut lines = reader.lines();
115 ///
116 /// while let Some(line) = lines.try_next().await? {
117 /// if let Some(key::FASTA) = line.as_directive().map(|directive| directive.key().as_ref()) {
118 /// break;
119 /// }
120 ///
121 /// // ...
122 /// }
123 /// # Ok(())
124 /// # }
125 /// ```
126 pub fn lines(&mut self) -> impl Stream<Item = io::Result<Line>> + '_ {
127 Box::pin(stream::try_unfold(
128 (self, Line::default()),
129 |(reader, mut line)| async {
130 reader.read_line(&mut line).await.map(|n| match n {
131 0 => None,
132 _ => Some((line.clone(), (reader, line))),
133 })
134 },
135 ))
136 }
137
138 /// Returns a stream over line buffers.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// # #[tokio::main]
144 /// # async fn main() -> tokio::io::Result<()> {
145 /// use futures::TryStreamExt;
146 /// use noodles_gff::{self as gff, LineBuf};
147 ///
148 /// let data = b"##gff-version 3\n";
149 /// let mut reader = gff::r#async::io::Reader::new(&data[..]);
150 /// let mut lines = reader.line_bufs();
151 ///
152 /// let line = lines.try_next().await?;
153 /// assert!(matches!(line, Some(LineBuf::Directive(_))));
154 ///
155 /// assert!(lines.try_next().await?.is_none());
156 /// # Ok(())
157 /// # }
158 /// ```
159 pub fn line_bufs(&mut self) -> impl Stream<Item = io::Result<LineBuf>> + '_ {
160 use crate::line::Kind;
161
162 Box::pin(stream::try_unfold(
163 (self, Line::default()),
164 |(reader, mut line)| async {
165 reader.read_line(&mut line).await.and_then(|n| match n {
166 0 => Ok(None),
167 _ => match line.kind() {
168 Kind::Directive => {
169 let directive = line
170 .as_directive()
171 .map(|d| LineBuf::Directive(d.into()))
172 .unwrap(); // SAFETY: `line` is a directive.
173
174 Ok(Some((directive, (reader, line))))
175 }
176 Kind::Comment => Ok(Some((
177 LineBuf::Comment(line.as_ref().into()),
178 (reader, line),
179 ))),
180 Kind::Record => {
181 let record = line
182 .as_record()
183 .unwrap() // SAFETY: `line` is a record.
184 .and_then(|record| {
185 RecordBuf::try_from_feature_record(&record).map(LineBuf::Record)
186 })?;
187
188 Ok(Some((record, (reader, line))))
189 }
190 },
191 })
192 },
193 ))
194 }
195
196 /// Returns a stream over records.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// # #[tokio::main]
202 /// # async fn main() -> tokio::io::Result<()> {
203 /// use futures::TryStreamExt;
204 /// use noodles_gff as gff;
205 ///
206 /// let data = b"##gff-version 3\n";
207 /// let mut reader = gff::r#async::io::Reader::new(&data[..]);
208 /// let mut records = reader.record_bufs();
209 ///
210 /// assert!(records.try_next().await?.is_none());
211 /// # Ok(())
212 /// # }
213 /// ```
214 pub fn record_bufs(&mut self) -> impl Stream<Item = io::Result<RecordBuf>> + '_ {
215 Box::pin(stream::try_unfold(self.line_bufs(), |mut lines| async {
216 loop {
217 match lines.try_next().await? {
218 None => return Ok(None),
219 Some(LineBuf::Directive(directive)) if directive.key() == key::FASTA => {
220 return Ok(None)
221 }
222 Some(LineBuf::Record(record)) => return Ok(Some((record, lines))),
223 _ => {}
224 }
225 }
226 }))
227 }
228}
229
230async fn read_line<R>(reader: &mut R, buf: &mut Vec<u8>) -> io::Result<usize>
231where
232 R: AsyncBufRead + Unpin,
233{
234 const LINE_FEED: u8 = b'\n';
235 const CARRIAGE_RETURN: u8 = b'\r';
236
237 match reader.read_until(LINE_FEED, buf).await? {
238 0 => Ok(0),
239 n => {
240 if buf.ends_with(&[LINE_FEED]) {
241 buf.pop();
242
243 if buf.ends_with(&[CARRIAGE_RETURN]) {
244 buf.pop();
245 }
246 }
247
248 Ok(n)
249 }
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[tokio::test]
258 async fn test_read_line() -> io::Result<()> {
259 async fn t(buf: &mut Vec<u8>, mut data: &[u8], expected: &[u8]) -> io::Result<()> {
260 buf.clear();
261 read_line(&mut data, buf).await?;
262 assert_eq!(buf, expected);
263 Ok(())
264 }
265
266 let mut buf = Vec::new();
267
268 t(&mut buf, b"noodles\n", b"noodles").await?;
269 t(&mut buf, b"noodles\r\n", b"noodles").await?;
270 t(&mut buf, b"noodles", b"noodles").await?;
271
272 Ok(())
273 }
274}