Skip to main content

futures_util/io/
mod.rs

1//! Asynchronous I/O.
2//!
3//! This module is the asynchronous version of `std::io`. It defines four
4//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6//! standard library. However, these traits integrate with the asynchronous
7//! task system, so that if an I/O object isn't ready for reading (or writing),
8//! the thread is not blocked, and instead the current task is queued to be
9//! woken when I/O is ready.
10//!
11//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13//! for operating with asynchronous I/O objects, including ways to work with
14//! them using futures, streams and sinks.
15//!
16//! This module is only available when the `std` feature of this
17//! library is activated, and it is activated by default.
18
19#[cfg(feature = "io-compat")]
20#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21use crate::compat::Compat;
22use crate::future::assert_future;
23use crate::stream::assert_stream;
24use std::{pin::Pin, string::String, vec::Vec};
25
26// Re-export some types from `std::io` so that users don't have to deal
27// with conflicts when `use`ing `futures::io` and `std::io`.
28#[doc(no_inline)]
29pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30
31pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32
33// used by `BufReader` and `BufWriter`
34// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37mod allow_std;
38pub use self::allow_std::AllowStdIo;
39
40mod buf_reader;
41pub use self::buf_reader::{BufReader, SeeKRelative};
42
43mod buf_writer;
44pub use self::buf_writer::BufWriter;
45
46mod line_writer;
47pub use self::line_writer::LineWriter;
48
49mod chain;
50pub use self::chain::Chain;
51
52mod close;
53pub use self::close::Close;
54
55mod copy;
56pub use self::copy::{copy, Copy};
57
58mod copy_buf;
59pub use self::copy_buf::{copy_buf, CopyBuf};
60
61mod copy_buf_abortable;
62pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
63
64mod cursor;
65pub use self::cursor::Cursor;
66
67mod empty;
68pub use self::empty::{empty, Empty};
69
70mod fill_buf;
71pub use self::fill_buf::FillBuf;
72
73mod flush;
74pub use self::flush::Flush;
75
76#[cfg(feature = "sink")]
77#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
78mod into_sink;
79#[cfg(feature = "sink")]
80#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
81pub use self::into_sink::IntoSink;
82
83mod lines;
84pub use self::lines::Lines;
85
86mod read;
87pub use self::read::Read;
88
89mod read_vectored;
90pub use self::read_vectored::ReadVectored;
91
92mod read_exact;
93pub use self::read_exact::ReadExact;
94
95mod read_line;
96pub use self::read_line::ReadLine;
97
98mod read_to_end;
99pub use self::read_to_end::ReadToEnd;
100
101mod read_to_string;
102pub use self::read_to_string::ReadToString;
103
104mod read_until;
105pub use self::read_until::ReadUntil;
106
107mod repeat;
108pub use self::repeat::{repeat, Repeat};
109
110mod seek;
111pub use self::seek::Seek;
112
113mod sink;
114pub use self::sink::{sink, Sink};
115
116mod split;
117pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
118
119mod take;
120pub use self::take::Take;
121
122mod window;
123pub use self::window::Window;
124
125mod write;
126pub use self::write::Write;
127
128mod write_vectored;
129pub use self::write_vectored::WriteVectored;
130
131mod write_all;
132pub use self::write_all::WriteAll;
133
134#[cfg(feature = "write-all-vectored")]
135mod write_all_vectored;
136#[cfg(feature = "write-all-vectored")]
137pub use self::write_all_vectored::WriteAllVectored;
138
139/// An extension trait which adds utility methods to `AsyncRead` types.
140pub trait AsyncReadExt: AsyncRead {
141    /// Creates an adaptor which will chain this stream with another.
142    ///
143    /// The returned `AsyncRead` instance will first read all bytes from this object
144    /// until EOF is encountered. Afterwards the output is equivalent to the
145    /// output of `next`.
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// # futures::executor::block_on(async {
151    /// use futures::io::{AsyncReadExt, Cursor};
152    ///
153    /// let reader1 = Cursor::new([1, 2, 3, 4]);
154    /// let reader2 = Cursor::new([5, 6, 7, 8]);
155    ///
156    /// let mut reader = reader1.chain(reader2);
157    /// let mut buffer = Vec::new();
158    ///
159    /// // read the value into a Vec.
160    /// reader.read_to_end(&mut buffer).await?;
161    /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
162    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
163    /// ```
164    fn chain<R>(self, next: R) -> Chain<Self, R>
165    where
166        Self: Sized,
167        R: AsyncRead,
168    {
169        assert_read(Chain::new(self, next))
170    }
171
172    /// Tries to read some bytes directly into the given `buf` in asynchronous
173    /// manner, returning a future type.
174    ///
175    /// The returned future will resolve to the number of bytes read once the read
176    /// operation is completed.
177    ///
178    /// # Examples
179    ///
180    /// ```
181    /// # futures::executor::block_on(async {
182    /// use futures::io::{AsyncReadExt, Cursor};
183    ///
184    /// let mut reader = Cursor::new([1, 2, 3, 4]);
185    /// let mut output = [0u8; 5];
186    ///
187    /// let bytes = reader.read(&mut output[..]).await?;
188    ///
189    /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
190    /// // reader. In a real system you could get anywhere from 1 to
191    /// // `output.len()` bytes in a single read.
192    /// assert_eq!(bytes, 4);
193    /// assert_eq!(output, [1, 2, 3, 4, 0]);
194    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
195    /// ```
196    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
197    where
198        Self: Unpin,
199    {
200        assert_future::<Result<usize>, _>(Read::new(self, buf))
201    }
202
203    /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
204    /// IO operations.
205    ///
206    /// The returned future will resolve to the number of bytes read once the read
207    /// operation is completed.
208    fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
209    where
210        Self: Unpin,
211    {
212        assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
213    }
214
215    /// Creates a future which will read exactly enough bytes to fill `buf`,
216    /// returning an error if end of file (EOF) is hit sooner.
217    ///
218    /// The returned future will resolve once the read operation is completed.
219    ///
220    /// In the case of an error the buffer and the object will be discarded, with
221    /// the error yielded.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// # futures::executor::block_on(async {
227    /// use futures::io::{AsyncReadExt, Cursor};
228    ///
229    /// let mut reader = Cursor::new([1, 2, 3, 4]);
230    /// let mut output = [0u8; 4];
231    ///
232    /// reader.read_exact(&mut output).await?;
233    ///
234    /// assert_eq!(output, [1, 2, 3, 4]);
235    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
236    /// ```
237    ///
238    /// ## EOF is hit before `buf` is filled
239    ///
240    /// ```
241    /// # futures::executor::block_on(async {
242    /// use futures::io::{self, AsyncReadExt, Cursor};
243    ///
244    /// let mut reader = Cursor::new([1, 2, 3, 4]);
245    /// let mut output = [0u8; 5];
246    ///
247    /// let result = reader.read_exact(&mut output).await;
248    ///
249    /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
250    /// # });
251    /// ```
252    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
253    where
254        Self: Unpin,
255    {
256        assert_future::<Result<()>, _>(ReadExact::new(self, buf))
257    }
258
259    /// Creates a future which will read all the bytes from this `AsyncRead`.
260    ///
261    /// On success the total number of bytes read is returned.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// # futures::executor::block_on(async {
267    /// use futures::io::{AsyncReadExt, Cursor};
268    ///
269    /// let mut reader = Cursor::new([1, 2, 3, 4]);
270    /// let mut output = Vec::with_capacity(4);
271    ///
272    /// let bytes = reader.read_to_end(&mut output).await?;
273    ///
274    /// assert_eq!(bytes, 4);
275    /// assert_eq!(output, vec![1, 2, 3, 4]);
276    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
277    /// ```
278    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
279    where
280        Self: Unpin,
281    {
282        assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
283    }
284
285    /// Creates a future which will read all the bytes from this `AsyncRead`.
286    ///
287    /// On success the total number of bytes read is returned.
288    ///
289    /// # Examples
290    ///
291    /// ```
292    /// # futures::executor::block_on(async {
293    /// use futures::io::{AsyncReadExt, Cursor};
294    ///
295    /// let mut reader = Cursor::new(&b"1234"[..]);
296    /// let mut buffer = String::with_capacity(4);
297    ///
298    /// let bytes = reader.read_to_string(&mut buffer).await?;
299    ///
300    /// assert_eq!(bytes, 4);
301    /// assert_eq!(buffer, String::from("1234"));
302    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
303    /// ```
304    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
305    where
306        Self: Unpin,
307    {
308        assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
309    }
310
311    /// Helper method for splitting this read/write object into two halves.
312    ///
313    /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
314    /// traits, respectively.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// # futures::executor::block_on(async {
320    /// use futures::io::{self, AsyncReadExt, Cursor};
321    ///
322    /// // Note that for `Cursor` the read and write halves share a single
323    /// // seek position. This may or may not be true for other types that
324    /// // implement both `AsyncRead` and `AsyncWrite`.
325    ///
326    /// let reader = Cursor::new([1, 2, 3, 4]);
327    /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
328    /// let mut writer = Cursor::new(vec![0u8; 5]);
329    ///
330    /// {
331    ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
332    ///     io::copy(reader, &mut buffer_writer).await?;
333    ///     io::copy(buffer_reader, &mut writer).await?;
334    /// }
335    ///
336    /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
337    /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
338    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
339    /// ```
340    fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
341    where
342        Self: AsyncWrite + Sized,
343    {
344        let (r, w) = split::split(self);
345        (assert_read(r), assert_write(w))
346    }
347
348    /// Creates an AsyncRead adapter which will read at most `limit` bytes
349    /// from the underlying reader.
350    ///
351    /// # Examples
352    ///
353    /// ```
354    /// # futures::executor::block_on(async {
355    /// use futures::io::{AsyncReadExt, Cursor};
356    ///
357    /// let reader = Cursor::new(&b"12345678"[..]);
358    /// let mut buffer = [0; 5];
359    ///
360    /// let mut take = reader.take(4);
361    /// let n = take.read(&mut buffer).await?;
362    ///
363    /// assert_eq!(n, 4);
364    /// assert_eq!(&buffer, b"1234\0");
365    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
366    /// ```
367    fn take(self, limit: u64) -> Take<Self>
368    where
369        Self: Sized,
370    {
371        assert_read(Take::new(self, limit))
372    }
373
374    /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
375    /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
376    /// implements [`AsyncWrite`] as well, the result will also implement the
377    /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
378    ///
379    /// Requires the `io-compat` feature to enable.
380    #[cfg(feature = "io-compat")]
381    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
382    fn compat(self) -> Compat<Self>
383    where
384        Self: Sized + Unpin,
385    {
386        Compat::new(self)
387    }
388}
389
390impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
391
392/// An extension trait which adds utility methods to `AsyncWrite` types.
393pub trait AsyncWriteExt: AsyncWrite {
394    /// Creates a future which will entirely flush this `AsyncWrite`.
395    ///
396    /// # Examples
397    ///
398    /// ```
399    /// # futures::executor::block_on(async {
400    /// use futures::io::{AllowStdIo, AsyncWriteExt};
401    /// use std::io::{BufWriter, Cursor};
402    ///
403    /// let mut output = vec![0u8; 5];
404    ///
405    /// {
406    ///     let writer = Cursor::new(&mut output);
407    ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
408    ///     buffered.write_all(&[1, 2]).await?;
409    ///     buffered.write_all(&[3, 4]).await?;
410    ///     buffered.flush().await?;
411    /// }
412    ///
413    /// assert_eq!(output, [1, 2, 3, 4, 0]);
414    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
415    /// ```
416    fn flush(&mut self) -> Flush<'_, Self>
417    where
418        Self: Unpin,
419    {
420        assert_future::<Result<()>, _>(Flush::new(self))
421    }
422
423    /// Creates a future which will entirely close this `AsyncWrite`.
424    fn close(&mut self) -> Close<'_, Self>
425    where
426        Self: Unpin,
427    {
428        assert_future::<Result<()>, _>(Close::new(self))
429    }
430
431    /// Creates a future which will write bytes from `buf` into the object.
432    ///
433    /// The returned future will resolve to the number of bytes written once the write
434    /// operation is completed.
435    fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
436    where
437        Self: Unpin,
438    {
439        assert_future::<Result<usize>, _>(Write::new(self, buf))
440    }
441
442    /// Creates a future which will write bytes from `bufs` into the object using vectored
443    /// IO operations.
444    ///
445    /// The returned future will resolve to the number of bytes written once the write
446    /// operation is completed.
447    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
448    where
449        Self: Unpin,
450    {
451        assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
452    }
453
454    /// Write data into this object.
455    ///
456    /// Creates a future that will write the entire contents of the buffer `buf` into
457    /// this `AsyncWrite`.
458    ///
459    /// The returned future will not complete until all the data has been written.
460    ///
461    /// # Examples
462    ///
463    /// ```
464    /// # futures::executor::block_on(async {
465    /// use futures::io::{AsyncWriteExt, Cursor};
466    ///
467    /// let mut writer = Cursor::new(vec![0u8; 5]);
468    ///
469    /// writer.write_all(&[1, 2, 3, 4]).await?;
470    ///
471    /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
472    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
473    /// ```
474    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
475    where
476        Self: Unpin,
477    {
478        assert_future::<Result<()>, _>(WriteAll::new(self, buf))
479    }
480
481    /// Attempts to write multiple buffers into this writer.
482    ///
483    /// Creates a future that will write the entire contents of `bufs` into this
484    /// `AsyncWrite` using [vectored writes].
485    ///
486    /// The returned future will not complete until all the data has been
487    /// written.
488    ///
489    /// [vectored writes]: std::io::Write::write_vectored
490    ///
491    /// # Notes
492    ///
493    /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
494    /// a slice of `IoSlice`s, not an immutable one. That's because we need to
495    /// modify the slice to keep track of the bytes already written.
496    ///
497    /// Once this futures returns, the contents of `bufs` are unspecified, as
498    /// this depends on how many calls to `write_vectored` were necessary. It is
499    /// best to understand this function as taking ownership of `bufs` and to
500    /// not use `bufs` afterwards. The underlying buffers, to which the
501    /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
502    /// can be reused.
503    ///
504    /// # Examples
505    ///
506    /// ```
507    /// # futures::executor::block_on(async {
508    /// use futures::io::AsyncWriteExt;
509    /// use futures_util::io::Cursor;
510    /// use std::io::IoSlice;
511    ///
512    /// let mut writer = Cursor::new(Vec::new());
513    /// let bufs = &mut [
514    ///     IoSlice::new(&[1]),
515    ///     IoSlice::new(&[2, 3]),
516    ///     IoSlice::new(&[4, 5, 6]),
517    /// ];
518    ///
519    /// writer.write_all_vectored(bufs).await?;
520    /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
521    ///
522    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
523    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
524    /// ```
525    #[cfg(feature = "write-all-vectored")]
526    fn write_all_vectored<'a>(
527        &'a mut self,
528        bufs: &'a mut [IoSlice<'a>],
529    ) -> WriteAllVectored<'a, Self>
530    where
531        Self: Unpin,
532    {
533        assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
534    }
535
536    /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
537    /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
538    /// Requires the `io-compat` feature to enable.
539    #[cfg(feature = "io-compat")]
540    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
541    fn compat_write(self) -> Compat<Self>
542    where
543        Self: Sized + Unpin,
544    {
545        Compat::new(self)
546    }
547
548    /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
549    ///
550    /// This adapter produces a sink that will write each value passed to it
551    /// into the underlying writer.
552    ///
553    /// Note that this function consumes the given writer, returning a wrapped
554    /// version.
555    ///
556    /// # Examples
557    ///
558    /// ```
559    /// # futures::executor::block_on(async {
560    /// use futures::io::AsyncWriteExt;
561    /// use futures::stream::{self, StreamExt};
562    ///
563    /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
564    ///
565    /// let mut writer = vec![];
566    ///
567    /// stream.forward((&mut writer).into_sink()).await?;
568    ///
569    /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
570    /// # Ok::<(), Box<dyn std::error::Error>>(())
571    /// # })?;
572    /// # Ok::<(), Box<dyn std::error::Error>>(())
573    /// ```
574    #[cfg(feature = "sink")]
575    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
576    fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
577    where
578        Self: Sized,
579    {
580        crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
581    }
582}
583
584impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
585
586/// An extension trait which adds utility methods to `AsyncSeek` types.
587pub trait AsyncSeekExt: AsyncSeek {
588    /// Creates a future which will seek an IO object, and then yield the
589    /// new position in the object and the object itself.
590    ///
591    /// In the case of an error the buffer and the object will be discarded, with
592    /// the error yielded.
593    fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
594    where
595        Self: Unpin,
596    {
597        assert_future::<Result<u64>, _>(Seek::new(self, pos))
598    }
599
600    /// Creates a future which will return the current seek position from the
601    /// start of the stream.
602    ///
603    /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
604    fn stream_position(&mut self) -> Seek<'_, Self>
605    where
606        Self: Unpin,
607    {
608        self.seek(SeekFrom::Current(0))
609    }
610}
611
612impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
613
614/// An extension trait which adds utility methods to `AsyncBufRead` types.
615pub trait AsyncBufReadExt: AsyncBufRead {
616    /// Creates a future which will wait for a non-empty buffer to be available from this I/O
617    /// object or EOF to be reached.
618    ///
619    /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
620    ///
621    /// ```rust
622    /// # futures::executor::block_on(async {
623    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
624    ///
625    /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
626    ///
627    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
628    /// stream.consume_unpin(2);
629    ///
630    /// assert_eq!(stream.fill_buf().await?, vec![3]);
631    /// stream.consume_unpin(1);
632    ///
633    /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
634    /// stream.consume_unpin(3);
635    ///
636    /// assert_eq!(stream.fill_buf().await?, vec![]);
637    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
638    /// ```
639    fn fill_buf(&mut self) -> FillBuf<'_, Self>
640    where
641        Self: Unpin,
642    {
643        assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
644    }
645
646    /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
647    ///
648    /// ```rust
649    /// # futures::executor::block_on(async {
650    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
651    ///
652    /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
653    ///
654    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
655    /// stream.consume_unpin(2);
656    ///
657    /// assert_eq!(stream.fill_buf().await?, vec![3]);
658    /// stream.consume_unpin(1);
659    ///
660    /// assert_eq!(stream.fill_buf().await?, vec![]);
661    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
662    /// ```
663    fn consume_unpin(&mut self, amt: usize)
664    where
665        Self: Unpin,
666    {
667        Pin::new(self).consume(amt)
668    }
669
670    /// Creates a future which will read all the bytes associated with this I/O
671    /// object into `buf` until the delimiter `byte` or EOF is reached.
672    /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
673    ///
674    /// This function will read bytes from the underlying stream until the
675    /// delimiter or EOF is found. Once found, all bytes up to, and including,
676    /// the delimiter (if found) will be appended to `buf`.
677    ///
678    /// The returned future will resolve to the number of bytes read once the read
679    /// operation is completed.
680    ///
681    /// In the case of an error the buffer and the object will be discarded, with
682    /// the error yielded.
683    ///
684    /// # Examples
685    ///
686    /// ```
687    /// # futures::executor::block_on(async {
688    /// use futures::io::{AsyncBufReadExt, Cursor};
689    ///
690    /// let mut cursor = Cursor::new(b"lorem-ipsum");
691    /// let mut buf = vec![];
692    ///
693    /// // cursor is at 'l'
694    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
695    /// assert_eq!(num_bytes, 6);
696    /// assert_eq!(buf, b"lorem-");
697    /// buf.clear();
698    ///
699    /// // cursor is at 'i'
700    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
701    /// assert_eq!(num_bytes, 5);
702    /// assert_eq!(buf, b"ipsum");
703    /// buf.clear();
704    ///
705    /// // cursor is at EOF
706    /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
707    /// assert_eq!(num_bytes, 0);
708    /// assert_eq!(buf, b"");
709    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
710    /// ```
711    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
712    where
713        Self: Unpin,
714    {
715        assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
716    }
717
718    /// Creates a future which will read all the bytes associated with this I/O
719    /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
720    /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
721    ///
722    /// This function will read bytes from the underlying stream until the
723    /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
724    /// up to, and including, the delimiter (if found) will be appended to
725    /// `buf`.
726    ///
727    /// The returned future will resolve to the number of bytes read once the read
728    /// operation is completed.
729    ///
730    /// In the case of an error the buffer and the object will be discarded, with
731    /// the error yielded.
732    ///
733    /// # Errors
734    ///
735    /// This function has the same error semantics as [`read_until`] and will
736    /// also return an error if the read bytes are not valid UTF-8. If an I/O
737    /// error is encountered then `buf` may contain some bytes already read in
738    /// the event that all data read so far was valid UTF-8.
739    ///
740    /// [`read_until`]: AsyncBufReadExt::read_until
741    ///
742    /// # Examples
743    ///
744    /// ```
745    /// # futures::executor::block_on(async {
746    /// use futures::io::{AsyncBufReadExt, Cursor};
747    ///
748    /// let mut cursor = Cursor::new(b"foo\nbar");
749    /// let mut buf = String::new();
750    ///
751    /// // cursor is at 'f'
752    /// let num_bytes = cursor.read_line(&mut buf).await?;
753    /// assert_eq!(num_bytes, 4);
754    /// assert_eq!(buf, "foo\n");
755    /// buf.clear();
756    ///
757    /// // cursor is at 'b'
758    /// let num_bytes = cursor.read_line(&mut buf).await?;
759    /// assert_eq!(num_bytes, 3);
760    /// assert_eq!(buf, "bar");
761    /// buf.clear();
762    ///
763    /// // cursor is at EOF
764    /// let num_bytes = cursor.read_line(&mut buf).await?;
765    /// assert_eq!(num_bytes, 0);
766    /// assert_eq!(buf, "");
767    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
768    /// ```
769    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
770    where
771        Self: Unpin,
772    {
773        assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
774    }
775
776    /// Returns a stream over the lines of this reader.
777    /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
778    ///
779    /// The stream returned from this function will yield instances of
780    /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
781    /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
782    ///
783    /// [`io::Result`]: std::io::Result
784    /// [`String`]: String
785    ///
786    /// # Errors
787    ///
788    /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
789    ///
790    /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
791    ///
792    /// # Examples
793    ///
794    /// ```
795    /// # futures::executor::block_on(async {
796    /// use futures::io::{AsyncBufReadExt, Cursor};
797    /// use futures::stream::StreamExt;
798    ///
799    /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
800    ///
801    /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
802    /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
803    /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
804    /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
805    /// assert_eq!(lines_stream.next().await, None);
806    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
807    /// ```
808    fn lines(self) -> Lines<Self>
809    where
810        Self: Sized,
811    {
812        assert_stream::<Result<String>, _>(Lines::new(self))
813    }
814}
815
816impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
817
818// Just a helper function to ensure the reader we're returning all have the
819// right implementations.
820pub(crate) fn assert_read<R>(reader: R) -> R
821where
822    R: AsyncRead,
823{
824    reader
825}
826// Just a helper function to ensure the writer we're returning all have the
827// right implementations.
828pub(crate) fn assert_write<W>(writer: W) -> W
829where
830    W: AsyncWrite,
831{
832    writer
833}