completion/io/read/
read_to_string.rs

1use std::future::Future;
2use std::io::{Error, ErrorKind, Result};
3use std::marker::PhantomPinned;
4use std::mem;
5use std::pin::Pin;
6use std::str;
7use std::task::{Context, Poll};
8
9use aliasable::boxed::AliasableBox;
10use completion_core::CompletionFuture;
11use completion_io::{AsyncRead, AsyncReadWith};
12use futures_core::ready;
13use pin_project_lite::pin_project;
14
15use super::{extend_lifetime_mut, AsyncReadExt, ReadToEnd};
16
17pin_project! {
18    /// Future for [`AsyncReadExt::read_to_string`].
19    #[allow(clippy::box_vec)]
20    pub struct ReadToString<'a, T>
21    where
22        T: AsyncRead,
23        T: ?Sized,
24    {
25        reader: Option<&'a mut T>,
26
27        #[pin]
28        inner: Option<ReadToEnd<'a, T>>,
29
30        // The vector the above future reads to. It has to be boxed as the future also holds a
31        // reference to it and Rust doesn't support shared locals.
32        buf: AliasableBox<Vec<u8>>,
33
34        // We want to support `buf` being stored inline in the future.
35        #[pin]
36        _pinned: PhantomPinned,
37
38        // The initial length of the above buffer.
39        initial_len: usize,
40
41        // The string that was passing into `read_to_end`. This is kept empty throughout the
42        // duration of the operation, so we only have to do a UTF-8 check at the end.
43        s: &'a mut String,
44    }
45}
46
47impl<'a, T: AsyncRead + ?Sized + 'a> ReadToString<'a, T> {
48    pub(super) fn new(reader: &'a mut T, buf: &'a mut String) -> Self {
49        let len = buf.len();
50        let buf_vec = AliasableBox::from_unique(Box::new(mem::take(buf).into_bytes()));
51        Self {
52            reader: Some(reader),
53            inner: None,
54            initial_len: len,
55            buf: buf_vec,
56            _pinned: PhantomPinned,
57            s: buf,
58        }
59    }
60}
61
62impl<'a, T: AsyncRead + ?Sized + 'a> CompletionFuture for ReadToString<'a, T> {
63    type Output = Result<usize>;
64
65    unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let mut this = self.project();
67
68        let inner = if let Some(inner) = this.inner.as_mut().as_pin_mut() {
69            inner
70        } else {
71            let buf = extend_lifetime_mut(&mut **this.buf);
72
73            let fut = this
74                .reader
75                .take()
76                .expect("polled after completion")
77                .read_to_end(buf);
78            this.inner.set(Some(fut));
79            this.inner.as_mut().as_pin_mut().unwrap()
80        };
81
82        let res = ready!(inner.poll(cx));
83        this.inner.set(None);
84
85        // The future is gone now, so we can safely create a mutable reference without aliasing.
86        let buf = &mut **this.buf;
87        let initial_len = *this.initial_len;
88
89        let res = res.and_then(|bytes| {
90            str::from_utf8(&buf[initial_len..])
91                .map(|_| bytes)
92                .map_err(|e| Error::new(ErrorKind::InvalidData, e))
93        });
94
95        if res.is_err() {
96            buf.set_len(initial_len);
97        }
98
99        **this.s = String::from_utf8_unchecked(mem::take(buf));
100
101        Poll::Ready(res)
102    }
103    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
104        let mut this = self.project();
105
106        if let Some(inner) = this.inner.as_mut().as_pin_mut() {
107            ready!(inner.poll_cancel(cx));
108            this.inner.set(None);
109
110            // Reset the string to its initial state.
111
112            // The future is gone now, so we can safely create a mutable reference without aliasing.
113            let buf = &mut **this.buf;
114            buf.set_len(*this.initial_len);
115            **this.s = String::from_utf8_unchecked(mem::take(buf));
116        }
117        Poll::Ready(())
118    }
119}
120impl<'a, T: AsyncRead + ?Sized + 'a> Future for ReadToString<'a, T>
121where
122    <T as AsyncReadWith<'a>>::ReadFuture: Future<Output = Result<()>>,
123{
124    type Output = Result<usize>;
125
126    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127        unsafe { CompletionFuture::poll(self, cx) }
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    use crate::future::{block_on, CompletionFutureExt};
136
137    use super::super::test_utils::YieldingReader;
138
139    #[test]
140    fn success() {
141        let mut reader = YieldingReader::new(vec![Ok(" "), Ok("World"), Ok("!")]);
142
143        let mut s = "Hello".to_owned();
144        assert_eq!(block_on(reader.read_to_string(&mut s)).unwrap(), 7);
145        assert_eq!(s, "Hello World!");
146    }
147
148    #[test]
149    fn error() {
150        let mut reader = YieldingReader::new(vec![
151            Ok(" "),
152            Err(Error::from(ErrorKind::Interrupted)),
153            Ok("World"),
154            Err(Error::new(ErrorKind::Other, "Some error")),
155            Ok("!"),
156        ]);
157
158        let mut s = "Hello".to_owned();
159        assert_eq!(
160            block_on(reader.read_to_string(&mut s))
161                .unwrap_err()
162                .to_string(),
163            "Some error"
164        );
165        assert_eq!(s, "Hello");
166    }
167
168    #[test]
169    fn invalid_utf8() {
170        let mut reader = YieldingReader::new(vec![Ok(" World".as_bytes()), Ok(&[0xC0])]);
171
172        let mut s = "Hello".to_owned();
173        assert_eq!(
174            block_on(reader.read_to_string(&mut s)).unwrap_err().kind(),
175            ErrorKind::InvalidData,
176        );
177        assert_eq!(s, "Hello");
178    }
179
180    #[test]
181    fn cancellation_doesnt_change_string() {
182        let mut reader =
183            YieldingReader::new(vec![Ok(&[0, 1, 2])]).after_cancellation(vec![&[0, 1, 2]]);
184
185        let mut s = "Hello".to_owned();
186        assert!(block_on(reader.read_to_string(&mut s).now_or_never()).is_none());
187        assert_eq!(s, "Hello");
188    }
189}