async_std/io/buf_read/
mod.rs

1mod lines;
2mod read_line;
3mod read_until;
4mod split;
5
6pub use lines::Lines;
7pub use split::Split;
8
9use read_line::ReadLineFuture;
10use read_until::ReadUntilFuture;
11
12use std::mem;
13use std::pin::Pin;
14
15use crate::io;
16use crate::task::{Context, Poll};
17
18extension_trait! {
19    use std::ops::{Deref, DerefMut};
20
21    #[doc = r#"
22        Allows reading from a buffered byte stream.
23
24        This trait is a re-export of [`futures::io::AsyncBufRead`] and is an async version of
25        [`std::io::BufRead`].
26
27        The [provided methods] do not really exist in the trait itself, but they become
28        available when [`BufReadExt`] from the [prelude] is imported:
29
30        ```
31        # #[allow(unused_imports)]
32        use async_std::prelude::*;
33        ```
34
35        [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
36        [`futures::io::AsyncBufRead`]:
37        https://docs.rs/futures/0.3/futures/io/trait.AsyncBufRead.html
38        [provided methods]: #provided-methods
39        [`BufReadExt`]: ../io/prelude/trait.BufReadExt.html
40        [prelude]: ../prelude/index.html
41    "#]
42    pub trait BufRead {
43        #[doc = r#"
44            Returns the contents of the internal buffer, filling it with more data from the
45            inner reader if it is empty.
46
47            This function is a lower-level call. It needs to be paired with the [`consume`]
48            method to function properly. When calling this method, none of the contents will be
49            "read" in the sense that later calling `read` may return the same contents. As
50            such, [`consume`] must be called with the number of bytes that are consumed from
51            this buffer to ensure that the bytes are never returned twice.
52
53            [`consume`]: #tymethod.consume
54
55            An empty buffer returned indicates that the stream has reached EOF.
56        "#]
57        // TODO: write a proper doctest with `consume`
58        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
59
60        #[doc = r#"
61            Tells this buffer that `amt` bytes have been consumed from the buffer, so they
62            should no longer be returned in calls to `read`.
63        "#]
64        fn consume(self: Pin<&mut Self>, amt: usize);
65    }
66
67    #[doc = r#"
68        Extension methods for [`BufRead`].
69
70        [`BufRead`]: ../trait.BufRead.html
71    "#]
72    pub trait BufReadExt: futures_io::AsyncBufRead {
73        #[doc = r#"
74            Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
75
76            This function will read bytes from the underlying stream until the delimiter or EOF
77            is found. Once found, all bytes up to, and including, the delimiter (if found) will
78            be appended to `buf`.
79
80            If successful, this function will return the total number of bytes read.
81
82            # Examples
83
84            ```no_run
85            # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
86            #
87            use async_std::fs::File;
88            use async_std::io::BufReader;
89            use async_std::prelude::*;
90
91            let mut file = BufReader::new(File::open("a.txt").await?);
92
93            let mut buf = Vec::with_capacity(1024);
94            let n = file.read_until(b'\n', &mut buf).await?;
95            #
96            # Ok(()) }) }
97            ```
98
99            Multiple successful calls to `read_until` append all bytes up to and including to
100            `buf`:
101            ```
102            # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
103            #
104            use async_std::io::BufReader;
105            use async_std::prelude::*;
106
107            let from: &[u8] = b"append\nexample\n";
108            let mut reader = BufReader::new(from);
109            let mut buf = vec![];
110
111            let mut size = reader.read_until(b'\n', &mut buf).await?;
112            assert_eq!(size, 7);
113            assert_eq!(buf, b"append\n");
114
115            size += reader.read_until(b'\n', &mut buf).await?;
116            assert_eq!(size, from.len());
117
118            assert_eq!(buf, from);
119            #
120            # Ok(()) }) }
121            ```
122        "#]
123        fn read_until<'a>(
124            &'a mut self,
125            byte: u8,
126            buf: &'a mut Vec<u8>,
127        ) -> impl Future<Output = usize> + 'a [ReadUntilFuture<'a, Self>]
128        where
129            Self: Unpin,
130        {
131            ReadUntilFuture {
132                reader: self,
133                byte,
134                buf,
135                read: 0,
136            }
137        }
138
139        #[doc = r#"
140            Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is
141            reached.
142
143            This function will read bytes from the underlying stream until the newline
144            delimiter (the 0xA byte) or EOF is found. Once found, all bytes up to, and
145            including, the delimiter (if found) will be appended to `buf`.
146
147            If successful, this function will return the total number of bytes read.
148
149            If this function returns `Ok(0)`, the stream has reached EOF.
150
151            # Errors
152
153            This function has the same error semantics as [`read_until`] and will also return
154            an error if the read bytes are not valid UTF-8. If an I/O error is encountered then
155            `buf` may contain some bytes already read in the event that all data read so far
156            was valid UTF-8.
157
158            [`read_until`]: #method.read_until
159
160            # Examples
161
162            ```no_run
163            # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
164            #
165            use async_std::fs::File;
166            use async_std::io::BufReader;
167            use async_std::prelude::*;
168
169            let mut file = BufReader::new(File::open("a.txt").await?);
170
171            let mut buf = String::new();
172            file.read_line(&mut buf).await?;
173            #
174            # Ok(()) }) }
175            ```
176        "#]
177        fn read_line<'a>(
178            &'a mut self,
179            buf: &'a mut String,
180        ) -> impl Future<Output = io::Result<usize>> + 'a [ReadLineFuture<'a, Self>]
181        where
182            Self: Unpin,
183        {
184            ReadLineFuture {
185                reader: self,
186                bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
187                buf,
188                read: 0,
189            }
190        }
191
192        #[doc = r#"
193            Returns a stream over the lines of this byte stream.
194
195            The stream returned from this function will yield instances of
196            [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte
197            (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
198
199            [`io::Result`]: type.Result.html
200            [`String`]: https://doc.rust-lang.org/std/string/struct.String.html
201
202            # Examples
203
204            ```no_run
205            # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
206            #
207            use async_std::fs::File;
208            use async_std::io::BufReader;
209            use async_std::prelude::*;
210
211            let file = File::open("a.txt").await?;
212            let mut lines = BufReader::new(file).lines();
213            let mut count = 0;
214
215            while let Some(line) = lines.next().await {
216                line?;
217                count += 1;
218            }
219            #
220            # Ok(()) }) }
221            ```
222        "#]
223        fn lines(self) -> Lines<Self>
224        where
225            Self: Unpin + Sized,
226        {
227            Lines {
228                reader: self,
229                buf: String::new(),
230                bytes: Vec::new(),
231                read: 0,
232            }
233        }
234
235        #[doc = r#"
236            Returns a stream over the contents of this reader split on the byte `byte`.
237
238            The stream returned from this function will return instances of
239            [`io::Result`]`<`[`Vec<u8>`]`>`. Each vector returned will *not* have
240            the delimiter byte at the end.
241
242            This function will yield errors whenever [`read_until`] would have
243            also yielded an error.
244
245            [`io::Result`]: type.Result.html
246            [`Vec<u8>`]: ../vec/struct.Vec.html
247            [`read_until`]: #method.read_until
248
249            # Examples
250
251            [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
252            this example, we use [`Cursor`] to iterate over all hyphen delimited
253            segments in a byte slice
254
255            [`Cursor`]: struct.Cursor.html
256
257            ```
258            # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
259            #
260            use async_std::prelude::*;
261            use async_std::io;
262
263            let cursor = io::Cursor::new(b"lorem-ipsum-dolor");
264
265            let mut split_iter = cursor.split(b'-').map(|l| l.unwrap());
266            assert_eq!(split_iter.next().await, Some(b"lorem".to_vec()));
267            assert_eq!(split_iter.next().await, Some(b"ipsum".to_vec()));
268            assert_eq!(split_iter.next().await, Some(b"dolor".to_vec()));
269            assert_eq!(split_iter.next().await, None);
270            #
271            # Ok(()) }) }
272            ```
273        "#]
274        fn split(self, byte: u8) -> Split<Self>
275        where
276            Self: Sized,
277        {
278            Split {
279                reader: self,
280                buf: Vec::new(),
281                delim: byte,
282                read: 0,
283            }
284        }
285    }
286
287    impl<T: BufRead + Unpin + ?Sized> BufRead for Box<T> {
288        fn poll_fill_buf(
289            self: Pin<&mut Self>,
290            cx: &mut Context<'_>,
291        ) -> Poll<io::Result<&[u8]>> {
292            unreachable!("this impl only appears in the rendered docs")
293        }
294
295        fn consume(self: Pin<&mut Self>, amt: usize) {
296            unreachable!("this impl only appears in the rendered docs")
297        }
298    }
299
300    impl<T: BufRead + Unpin + ?Sized> BufRead for &mut T {
301        fn poll_fill_buf(
302            self: Pin<&mut Self>,
303            cx: &mut Context<'_>,
304        ) -> Poll<io::Result<&[u8]>> {
305            unreachable!("this impl only appears in the rendered docs")
306        }
307
308        fn consume(self: Pin<&mut Self>, amt: usize) {
309            unreachable!("this impl only appears in the rendered docs")
310        }
311    }
312
313    impl<P> BufRead for Pin<P>
314    where
315        P: DerefMut + Unpin,
316        <P as Deref>::Target: BufRead,
317    {
318        fn poll_fill_buf(
319            self: Pin<&mut Self>,
320            cx: &mut Context<'_>,
321        ) -> Poll<io::Result<&[u8]>> {
322            unreachable!("this impl only appears in the rendered docs")
323        }
324
325        fn consume(self: Pin<&mut Self>, amt: usize) {
326            unreachable!("this impl only appears in the rendered docs")
327        }
328    }
329
330    impl BufRead for &[u8] {
331        fn poll_fill_buf(
332            self: Pin<&mut Self>,
333            cx: &mut Context<'_>,
334        ) -> Poll<io::Result<&[u8]>> {
335            unreachable!()
336        }
337
338        fn consume(self: Pin<&mut Self>, amt: usize) {
339            unreachable!("this impl only appears in the rendered docs")
340        }
341    }
342}
343
344pub fn read_until_internal<R: BufReadExt + ?Sized>(
345    mut reader: Pin<&mut R>,
346    cx: &mut Context<'_>,
347    byte: u8,
348    buf: &mut Vec<u8>,
349    read: &mut usize,
350) -> Poll<io::Result<usize>> {
351    loop {
352        let (done, used) = {
353            let available = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?;
354            if let Some(i) = memchr::memchr(byte, available) {
355                buf.extend_from_slice(&available[..=i]);
356                (true, i + 1)
357            } else {
358                buf.extend_from_slice(available);
359                (false, available.len())
360            }
361        };
362        reader.as_mut().consume(used);
363        *read += used;
364        if done || used == 0 {
365            return Poll::Ready(Ok(mem::replace(read, 0)));
366        }
367    }
368}