completion/io/read/
read_to_string.rs1use std::future::Future;
2use std::io::{Error, ErrorKind, Result};
3use std::marker::PhantomPinned;
4use std::mem;
5use std::pin::Pin;
6use std::str;
7use std::task::{Context, Poll};
8
9use aliasable::boxed::AliasableBox;
10use completion_core::CompletionFuture;
11use completion_io::{AsyncRead, AsyncReadWith};
12use futures_core::ready;
13use pin_project_lite::pin_project;
14
15use super::{extend_lifetime_mut, AsyncReadExt, ReadToEnd};
16
17pin_project! {
18 #[allow(clippy::box_vec)]
20 pub struct ReadToString<'a, T>
21 where
22 T: AsyncRead,
23 T: ?Sized,
24 {
25 reader: Option<&'a mut T>,
26
27 #[pin]
28 inner: Option<ReadToEnd<'a, T>>,
29
30 buf: AliasableBox<Vec<u8>>,
33
34 #[pin]
36 _pinned: PhantomPinned,
37
38 initial_len: usize,
40
41 s: &'a mut String,
44 }
45}
46
47impl<'a, T: AsyncRead + ?Sized + 'a> ReadToString<'a, T> {
48 pub(super) fn new(reader: &'a mut T, buf: &'a mut String) -> Self {
49 let len = buf.len();
50 let buf_vec = AliasableBox::from_unique(Box::new(mem::take(buf).into_bytes()));
51 Self {
52 reader: Some(reader),
53 inner: None,
54 initial_len: len,
55 buf: buf_vec,
56 _pinned: PhantomPinned,
57 s: buf,
58 }
59 }
60}
61
62impl<'a, T: AsyncRead + ?Sized + 'a> CompletionFuture for ReadToString<'a, T> {
63 type Output = Result<usize>;
64
65 unsafe fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let mut this = self.project();
67
68 let inner = if let Some(inner) = this.inner.as_mut().as_pin_mut() {
69 inner
70 } else {
71 let buf = extend_lifetime_mut(&mut **this.buf);
72
73 let fut = this
74 .reader
75 .take()
76 .expect("polled after completion")
77 .read_to_end(buf);
78 this.inner.set(Some(fut));
79 this.inner.as_mut().as_pin_mut().unwrap()
80 };
81
82 let res = ready!(inner.poll(cx));
83 this.inner.set(None);
84
85 let buf = &mut **this.buf;
87 let initial_len = *this.initial_len;
88
89 let res = res.and_then(|bytes| {
90 str::from_utf8(&buf[initial_len..])
91 .map(|_| bytes)
92 .map_err(|e| Error::new(ErrorKind::InvalidData, e))
93 });
94
95 if res.is_err() {
96 buf.set_len(initial_len);
97 }
98
99 **this.s = String::from_utf8_unchecked(mem::take(buf));
100
101 Poll::Ready(res)
102 }
103 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
104 let mut this = self.project();
105
106 if let Some(inner) = this.inner.as_mut().as_pin_mut() {
107 ready!(inner.poll_cancel(cx));
108 this.inner.set(None);
109
110 let buf = &mut **this.buf;
114 buf.set_len(*this.initial_len);
115 **this.s = String::from_utf8_unchecked(mem::take(buf));
116 }
117 Poll::Ready(())
118 }
119}
120impl<'a, T: AsyncRead + ?Sized + 'a> Future for ReadToString<'a, T>
121where
122 <T as AsyncReadWith<'a>>::ReadFuture: Future<Output = Result<()>>,
123{
124 type Output = Result<usize>;
125
126 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127 unsafe { CompletionFuture::poll(self, cx) }
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 use crate::future::{block_on, CompletionFutureExt};
136
137 use super::super::test_utils::YieldingReader;
138
139 #[test]
140 fn success() {
141 let mut reader = YieldingReader::new(vec![Ok(" "), Ok("World"), Ok("!")]);
142
143 let mut s = "Hello".to_owned();
144 assert_eq!(block_on(reader.read_to_string(&mut s)).unwrap(), 7);
145 assert_eq!(s, "Hello World!");
146 }
147
148 #[test]
149 fn error() {
150 let mut reader = YieldingReader::new(vec![
151 Ok(" "),
152 Err(Error::from(ErrorKind::Interrupted)),
153 Ok("World"),
154 Err(Error::new(ErrorKind::Other, "Some error")),
155 Ok("!"),
156 ]);
157
158 let mut s = "Hello".to_owned();
159 assert_eq!(
160 block_on(reader.read_to_string(&mut s))
161 .unwrap_err()
162 .to_string(),
163 "Some error"
164 );
165 assert_eq!(s, "Hello");
166 }
167
168 #[test]
169 fn invalid_utf8() {
170 let mut reader = YieldingReader::new(vec![Ok(" World".as_bytes()), Ok(&[0xC0])]);
171
172 let mut s = "Hello".to_owned();
173 assert_eq!(
174 block_on(reader.read_to_string(&mut s)).unwrap_err().kind(),
175 ErrorKind::InvalidData,
176 );
177 assert_eq!(s, "Hello");
178 }
179
180 #[test]
181 fn cancellation_doesnt_change_string() {
182 let mut reader =
183 YieldingReader::new(vec![Ok(&[0, 1, 2])]).after_cancellation(vec![&[0, 1, 2]]);
184
185 let mut s = "Hello".to_owned();
186 assert!(block_on(reader.read_to_string(&mut s).now_or_never()).is_none());
187 assert_eq!(s, "Hello");
188 }
189}