variable_len_reader/asynchronous/
reader.rs1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use pin_project_lite::pin_project;
5use crate::asynchronous::AsyncVariableReadable;
6use crate::util::read_buf::*;
7
8pub trait ReaderFuture {
9 fn reset(self: Pin<&mut Self>);
10}
11
12macro_rules! read_wrap_future {
13 (@$future: ident, $inner_future: ident $(, $feature: meta)?) => {
14 $(
15 #[$feature]
16 )?
17 $crate::pin_project_lite::pin_project! {
18 $(
19 #[cfg_attr(docsrs, doc($feature))]
20 )?
21 #[derive(Debug)]
22 #[project(!Unpin)]
23 #[must_use = "futures do nothing unless you `.await` or poll them"]
24 pub struct $future<'a, R: ?Sized> {
25 #[pin]
26 inner: $inner_future<'a, R>,
27 }
28 }
29 $(
30 #[$feature]
31 )?
32 impl<'a, R: ?Sized> ReaderFuture for $future<'a, R> {
33 fn reset(self: Pin<&mut Self>) {
34 let me = self.project();
35 me.inner.reset();
36 }
37 }
38 };
39 (f $feature: meta, $future: ident, $inner_future: ident) => {
40 read_wrap_future!(@$future, $inner_future, $feature);
41 };
42 ($future: ident, $inner_future: ident) => {
43 read_wrap_future!(@$future, $inner_future);
44 };
45}
46macro_rules! read_wrap_func {
47 (@$future: ident, $func: ident, $inner_func: ident $(, $feature: meta)?) => {
48 $(
49 #[$feature]
50 #[cfg_attr(docsrs, doc($feature))]
51 )?
52 #[inline]
53 fn $func(&mut self) -> $future<Self> where Self: Unpin {
54 $future { inner: self.$inner_func() }
55 }
56 };
57 (f $feature: meta, $future: ident, $func: ident, $inner_func: ident) => {
58 read_wrap_func!(@$future, $func, $inner_func, $feature);
59 };
60 ($future: ident, $func: ident, $inner_func: ident) => {
61 read_wrap_func!(@$future, $func, $inner_func);
62 };
63}
64
65#[allow(unused_macros)]
68macro_rules! read_size_ap_future {
69 (f $feature: meta, $primitive: ty, $future: ident, $inner_future: ident) => {
70 read_wrap_future!(f $feature, $future, $inner_future);
71 #[$feature]
72 impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for $future<'a, R> {
73 type Output = ::core::result::Result<$primitive, R::Error>;
74
75 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76 self.project().inner.poll(cx).map_ok(|v| v as $primitive)
77 }
78 }
79 };
80}
81
82
83pin_project! {
84 #[derive(Debug)]
85 #[project(!Unpin)]
86 #[must_use = "futures do nothing unless you `.await` or poll them"]
87 pub struct ReadSingle<'a, R: ?Sized> {
88 #[pin]
89 reader: &'a mut R,
90 buf: Option<u8>,
91 }
92}
93impl<'a, R: ?Sized> ReaderFuture for ReadSingle<'a, R> {
94 fn reset(self: Pin<&mut Self>) {
95 let me = self.project();
96 *me.buf = None;
97 }
98}
99impl<'a, R: AsyncVariableReadable + Unpin + ?Sized> Future for ReadSingle<'a, R> {
100 type Output = Result<u8, R::Error>;
101
102 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
103 let mut me = self.project();
104 R::poll_read_single(Pin::new(&mut *me.reader), cx, me.buf)
105 }
106}
107
108pin_project! {
109 #[derive(Debug)]
110 #[project(!Unpin)]
111 #[must_use = "futures do nothing unless you `.await` or poll them"]
112 pub struct ReadMore<'a, R: ?Sized> {
113 #[pin]
114 reader: &'a mut R,
115 buf: ReadBuf<'a>,
116 }
117}
118impl<'a, R: ?Sized> ReaderFuture for ReadMore<'a, R> {
119 fn reset(self: Pin<&mut Self>) {
120 let me = self.project();
121 me.buf.reset();
122 }
123}
124impl<'a, R: AsyncVariableReadable + Unpin + ?Sized> Future for ReadMore<'a, R> {
125 type Output = Result<(), R::Error>;
126
127 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 let mut me = self.project();
129 R::poll_read_more(Pin::new(&mut *me.reader), cx, me.buf)
130 }
131}
132
133#[cfg(feature = "bytes")]
134pin_project! {
135 #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
136 #[derive(Debug)]
137 #[project(!Unpin)]
138 #[must_use = "futures do nothing unless you `.await` or poll them"]
139 pub struct ReadMoreBuf<'a, R: ?Sized, B> where B: bytes::BufMut {
140 #[pin]
141 reader: &'a mut R,
142 #[pin]
143 buf: &'a mut B,
144 }
145}
146#[cfg(feature = "bytes")]
147impl<'a, R: AsyncVariableReadable + Unpin + ?Sized, B: bytes::BufMut> Future for ReadMoreBuf<'a, R, B> {
148 type Output = Result<(), R::Error>;
149
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 let mut me = self.project();
152 R::poll_read_more_buf(Pin::new(&mut *me.reader), cx, &mut *me.buf)
153 }
154}
155
156
157read_wrap_future!(ReadBool, ReadSingle);
158impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for ReadBool<'a, R> {
159 type Output = Result<bool, R::Error>;
160
161 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
162 self.project().inner.poll(cx).map(|r| r.and_then(|b| match b {
163 0 => Ok(false),
164 1 => Ok(true),
165 b => Err(R::read_bool_error("ReadBool", b)),
166 }))
167 }
168}
169
170include!("read_bools.rs");
171
172include!("read_raw.rs");
173include!("read_raw_size.rs");
174
175include!("read_varint.rs");
176include!("read_varint_size.rs");
177include!("read_varint_long.rs");
178include!("read_varint_long_size.rs");
179
180include!("read_signed_varint.rs");
181include!("read_signed_varint_size.rs");
182include!("read_signed_varint_long.rs");
183include!("read_signed_varint_long_size.rs");
184
185include!("read_float_varint.rs");
186include!("read_float_varint_long.rs");
187
188#[cfg(feature = "async_u8_vec")]
189pin_project! {
190 #[cfg_attr(docsrs, doc(cfg(feature = "async_u8_vec")))]
191 #[derive(Debug)]
192 #[project(!Unpin)]
193 #[must_use = "futures do nothing unless you `.await` or poll them"]
194 pub struct ReadU8Vec<'a, R: ?Sized> {
195 #[pin]
196 inner: ReadUsizeVarintAp<'a, R>,
197 buf: Option<OwnedReadBuf<alloc::vec::Vec<u8>>>,
198 }
199}
200#[cfg(feature = "async_u8_vec")]
201impl<'a, R: ?Sized> ReaderFuture for ReadU8Vec<'a, R> {
202 fn reset(self: Pin<&mut Self>) {
203 let me = self.project();
204 me.inner.reset();
205 *me.buf = None;
206 }
207}
208#[cfg(feature = "async_u8_vec")]
209impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for ReadU8Vec<'a, R> {
210 type Output = Result<alloc::vec::Vec<u8>, R::Error>;
211
212 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213 let mut me = self.project();
214 let buf = match me.buf.as_mut() {
215 None => {
216 let size = core::task::ready!(me.inner.as_mut().poll(cx))?;
217 *me.buf = Some(OwnedReadBuf::new(alloc::vec![0; size]));
218 me.buf.as_mut().unwrap()
219 }, Some(b) => b,
220 };
221 let mut ref_buf = buf.into();
222 let res = R::poll_read_more(Pin::new(&mut me.inner.project().inner.project().inner.project().reader), cx, &mut ref_buf);
223 let position = ref_buf.position();
224 buf.set_position(position);
225 core::task::ready!(res)?;
226 Poll::Ready(Ok(buf.clone().into_inner()))
227 }
228}
229
230read_wrap_future!(f cfg(feature = "async_string"), ReadString, ReadU8Vec);
231#[cfg(feature = "async_string")]
232impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for ReadString<'a, R> {
233 type Output = Result<alloc::string::String, R::Error>;
234
235 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236 self.project().inner.poll(cx).map(|r| r.and_then(|v| {
237 match alloc::string::String::from_utf8(v) {
238 Ok(s) => Ok(s),
239 Err(e) => Err(R::read_string_error("ReadString", e)),
240 }
241 }))
242 }
243}
244
245pub trait AsyncVariableReader: AsyncVariableReadable {
246 #[inline]
247 fn read_single(&mut self) -> ReadSingle<Self> where Self: Unpin {
248 ReadSingle { reader: self, buf: None }
249 }
250
251 #[inline]
252 fn read_more<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadMore<'a, Self> where Self: Unpin {
253 ReadMore { reader: self, buf: ReadBuf::new(buf) }
254 }
255
256 #[cfg(feature = "bytes")]
258 #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
259 #[inline]
260 fn read_more_buf<'a, B: bytes::BufMut>(&'a mut self, buf: &'a mut B) -> ReadMoreBuf<'a, Self, B> where Self: Unpin {
261 ReadMoreBuf { reader: self, buf }
262 }
263
264
265 fn read_bool_error(feature_name: &'static str, byte: u8) -> Self::Error;
266
267 read_wrap_func!(ReadBool, read_bool, read_single);
268
269 define_read_bools_func!();
270
271 define_read_raw_func!();
272 define_read_raw_size_func!();
273
274 define_read_varint_func!();
275 define_read_varint_size_func!();
276 define_read_varint_long_func!();
277 define_read_varint_long_size_func!();
278
279 define_read_signed_varint_func!();
280 define_read_signed_varint_size_func!();
281 define_read_signed_varint_long_func!();
282 define_read_signed_varint_long_size_func!();
283
284 define_read_float_varint_func!();
285 define_read_float_varint_long_func!();
286
287 #[allow(deprecated)]
288 #[cfg(feature = "async_u8_vec")]
307 #[cfg_attr(docsrs, doc(cfg(feature = "async_u8_vec")))]
308 #[inline]
309 #[deprecated(since = "3.0.0", note = "see docs for details")]
310 fn read_u8_vec(&mut self) -> ReadU8Vec<Self> where Self: Unpin {
311 ReadU8Vec { inner: self.read_usize_varint_ap(), buf: None }
312 }
313
314 #[allow(deprecated)]
315 #[cfg(feature = "async_u8_vec")]
319 #[cfg_attr(docsrs, doc(cfg(feature = "async_u8_vec")))]
320 #[inline]
321 #[must_use = "futures do nothing unless you `.await` or poll them"]
322 #[deprecated(since = "3.2.0", note = "use [AsyncReaderHelper::help_read_u8_vec] instead")]
323 fn read_u8_vec_boxed(&mut self) -> Pin<alloc::boxed::Box<dyn Future<Output = Result<alloc::vec::Vec<u8>, Self::Error>> + Send + '_>> where Self: Unpin + Send {
324 alloc::boxed::Box::pin(async move {
325 let length = self.read_usize_varint().await?;
326 let mut bytes = alloc::vec![0; length];
327 self.read_more(&mut bytes).await?;
328 Ok(bytes)
329 })
330 }
331
332 #[cfg(feature = "async_string")]
333 #[cfg_attr(docsrs, doc(cfg(feature = "async_string")))]
334 fn read_string_error(future_name: &'static str, error: alloc::string::FromUtf8Error) -> Self::Error;
335
336 #[allow(deprecated)]
337 #[cfg(feature = "async_string")]
350 #[cfg_attr(docsrs, doc(cfg(feature = "async_string")))]
351 #[inline]
352 #[deprecated(since = "3.0.0", note = "see docs for details")]
353 fn read_string(&mut self) -> ReadString<Self> where Self: Unpin {
354 ReadString { inner: self.read_u8_vec() }
355 }
356
357 #[allow(deprecated)]
358 #[cfg(feature = "async_string")]
362 #[cfg_attr(docsrs, doc(cfg(feature = "async_string")))]
363 #[inline]
364 #[must_use = "futures do nothing unless you `.await` or poll them"]
365 #[deprecated(since = "3.2.0", note = "use [AsyncReaderHelper::help_read_string] instead")]
366 fn read_string_boxed(&mut self) -> Pin<alloc::boxed::Box<dyn Future<Output = Result<alloc::string::String, Self::Error>> + Send + '_>> where Self: Unpin + Send {
367 alloc::boxed::Box::pin(async move {
368 match alloc::string::String::from_utf8(self.read_u8_vec_boxed().await?) {
369 Ok(s) => Ok(s),
370 Err(e) => Err(Self::read_string_error("ReadString", e)),
371 }
372 })
373 }
374}