futures_util/io/
read_to_end.rs1use futures_core::future::Future;
2use futures_core::ready;
3use futures_core::task::{Context, Poll};
4use futures_io::AsyncRead;
5use std::io;
6use std::iter;
7use std::pin::Pin;
8use std::vec::Vec;
9
10#[derive(Debug)]
12#[must_use = "futures do nothing unless you `.await` or poll them"]
13pub struct ReadToEnd<'a, R: ?Sized> {
14 reader: &'a mut R,
15 buf: &'a mut Vec<u8>,
16 start_len: usize,
17}
18
19impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
20
21impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> {
22 pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> Self {
23 let start_len = buf.len();
24 Self { reader, buf, start_len }
25 }
26}
27
28struct Guard<'a> {
29 buf: &'a mut Vec<u8>,
30 len: usize,
31}
32
33impl Drop for Guard<'_> {
34 fn drop(&mut self) {
35 unsafe {
36 self.buf.set_len(self.len);
37 }
38 }
39}
40
41pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
51 mut rd: Pin<&mut R>,
52 cx: &mut Context<'_>,
53 buf: &mut Vec<u8>,
54 start_len: usize,
55) -> Poll<io::Result<usize>> {
56 let mut g = Guard { len: buf.len(), buf };
57 loop {
58 if g.len == g.buf.len() {
59 g.buf.reserve(32);
60 let spare_capacity = g.buf.capacity() - g.buf.len();
61 g.buf.extend(iter::repeat(0).take(spare_capacity));
63 }
64
65 let buf = &mut g.buf[g.len..];
66 match ready!(rd.as_mut().poll_read(cx, buf)) {
67 Ok(0) => return Poll::Ready(Ok(g.len - start_len)),
68 Ok(n) => {
69 assert!(n <= buf.len());
73 g.len += n;
74 }
75 Err(e) => return Poll::Ready(Err(e)),
76 }
77 }
78}
79
80impl<A> Future for ReadToEnd<'_, A>
81where
82 A: AsyncRead + ?Sized + Unpin,
83{
84 type Output = io::Result<usize>;
85
86 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
87 let this = &mut *self;
88 read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
89 }
90}