variable_len_reader/asynchronous/
reader.rs

1use core::future::Future;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use pin_project_lite::pin_project;
5use crate::asynchronous::AsyncVariableReadable;
6use crate::util::read_buf::*;
7
8pub trait ReaderFuture {
9    fn reset(self: Pin<&mut Self>);
10}
11
12macro_rules! read_wrap_future {
13    (@$future: ident, $inner_future: ident $(, $feature: meta)?) => {
14        $(
15        #[$feature]
16        )?
17        $crate::pin_project_lite::pin_project! {
18            $(
19            #[cfg_attr(docsrs, doc($feature))]
20            )?
21            #[derive(Debug)]
22            #[project(!Unpin)]
23            #[must_use = "futures do nothing unless you `.await` or poll them"]
24            pub struct $future<'a, R: ?Sized> {
25                #[pin]
26                inner: $inner_future<'a, R>,
27            }
28        }
29        $(
30        #[$feature]
31        )?
32        impl<'a, R: ?Sized> ReaderFuture for $future<'a, R> {
33            fn reset(self: Pin<&mut Self>) {
34                let me = self.project();
35                me.inner.reset();
36            }
37        }
38    };
39    (f $feature: meta, $future: ident, $inner_future: ident) => {
40        read_wrap_future!(@$future, $inner_future, $feature);
41    };
42    ($future: ident, $inner_future: ident) => {
43        read_wrap_future!(@$future, $inner_future);
44    };
45}
46macro_rules! read_wrap_func {
47    (@$future: ident, $func: ident, $inner_func: ident $(, $feature: meta)?) => {
48        $(
49        #[$feature]
50        #[cfg_attr(docsrs, doc($feature))]
51        )?
52        #[inline]
53        fn $func(&mut self) -> $future<Self> where Self: Unpin {
54            $future { inner: self.$inner_func() }
55        }
56    };
57    (f $feature: meta, $future: ident, $func: ident, $inner_func: ident) => {
58		read_wrap_func!(@$future, $func, $inner_func, $feature);
59	};
60    ($future: ident, $func: ident, $inner_func: ident) => {
61		read_wrap_func!(@$future, $func, $inner_func);
62	};
63}
64
65/// AP means all-platform. This is used for usize/isize converting from u128/i128.
66/// CP means current-platform. It reads usize/isize directly.
67#[allow(unused_macros)]
68macro_rules! read_size_ap_future {
69    (f $feature: meta, $primitive: ty, $future: ident, $inner_future: ident) => {
70        read_wrap_future!(f $feature, $future, $inner_future);
71        #[$feature]
72        impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for $future<'a, R> {
73            type Output = ::core::result::Result<$primitive, R::Error>;
74
75            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76                self.project().inner.poll(cx).map_ok(|v| v as $primitive)
77            }
78        }
79    };
80}
81
82
83pin_project! {
84    #[derive(Debug)]
85    #[project(!Unpin)]
86    #[must_use = "futures do nothing unless you `.await` or poll them"]
87    pub struct ReadSingle<'a, R: ?Sized> {
88        #[pin]
89        reader: &'a mut R,
90        buf: Option<u8>,
91    }
92}
93impl<'a, R: ?Sized> ReaderFuture for ReadSingle<'a, R> {
94    fn reset(self: Pin<&mut Self>) {
95        let me = self.project();
96        *me.buf = None;
97    }
98}
99impl<'a, R: AsyncVariableReadable + Unpin + ?Sized> Future for ReadSingle<'a, R> {
100    type Output = Result<u8, R::Error>;
101
102    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
103        let mut me = self.project();
104        R::poll_read_single(Pin::new(&mut *me.reader), cx, me.buf)
105    }
106}
107
108pin_project! {
109    #[derive(Debug)]
110    #[project(!Unpin)]
111    #[must_use = "futures do nothing unless you `.await` or poll them"]
112    pub struct ReadMore<'a, R: ?Sized> {
113        #[pin]
114        reader: &'a mut R,
115        buf: ReadBuf<'a>,
116    }
117}
118impl<'a, R: ?Sized> ReaderFuture for ReadMore<'a, R> {
119    fn reset(self: Pin<&mut Self>) {
120        let me = self.project();
121        me.buf.reset();
122    }
123}
124impl<'a, R: AsyncVariableReadable + Unpin + ?Sized> Future for ReadMore<'a, R> {
125    type Output = Result<(), R::Error>;
126
127    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128        let mut me = self.project();
129        R::poll_read_more(Pin::new(&mut *me.reader), cx, me.buf)
130    }
131}
132
133#[cfg(feature = "bytes")]
134pin_project! {
135    #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
136    #[derive(Debug)]
137    #[project(!Unpin)]
138    #[must_use = "futures do nothing unless you `.await` or poll them"]
139    pub struct ReadMoreBuf<'a, R: ?Sized, B> where B: bytes::BufMut {
140        #[pin]
141        reader: &'a mut R,
142        #[pin]
143        buf: &'a mut B,
144    }
145}
146#[cfg(feature = "bytes")]
147impl<'a, R: AsyncVariableReadable + Unpin + ?Sized, B: bytes::BufMut> Future for ReadMoreBuf<'a, R, B> {
148    type Output = Result<(), R::Error>;
149
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        let mut me = self.project();
152        R::poll_read_more_buf(Pin::new(&mut *me.reader), cx, &mut *me.buf)
153    }
154}
155
156
157read_wrap_future!(ReadBool, ReadSingle);
158impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for ReadBool<'a, R> {
159    type Output = Result<bool, R::Error>;
160
161    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
162        self.project().inner.poll(cx).map(|r| r.and_then(|b| match b {
163            0 => Ok(false),
164            1 => Ok(true),
165            b => Err(R::read_bool_error("ReadBool", b)),
166        }))
167    }
168}
169
170include!("read_bools.rs");
171
172include!("read_raw.rs");
173include!("read_raw_size.rs");
174
175include!("read_varint.rs");
176include!("read_varint_size.rs");
177include!("read_varint_long.rs");
178include!("read_varint_long_size.rs");
179
180include!("read_signed_varint.rs");
181include!("read_signed_varint_size.rs");
182include!("read_signed_varint_long.rs");
183include!("read_signed_varint_long_size.rs");
184
185include!("read_float_varint.rs");
186include!("read_float_varint_long.rs");
187
188#[cfg(feature = "async_u8_vec")]
189pin_project! {
190    #[cfg_attr(docsrs, doc(cfg(feature = "async_u8_vec")))]
191    #[derive(Debug)]
192    #[project(!Unpin)]
193    #[must_use = "futures do nothing unless you `.await` or poll them"]
194    pub struct ReadU8Vec<'a, R: ?Sized> {
195        #[pin]
196        inner: ReadUsizeVarintAp<'a, R>,
197        buf: Option<OwnedReadBuf<alloc::vec::Vec<u8>>>,
198    }
199}
200#[cfg(feature = "async_u8_vec")]
201impl<'a, R: ?Sized> ReaderFuture for ReadU8Vec<'a, R> {
202    fn reset(self: Pin<&mut Self>) {
203        let me = self.project();
204        me.inner.reset();
205        *me.buf = None;
206    }
207}
208#[cfg(feature = "async_u8_vec")]
209impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for ReadU8Vec<'a, R> {
210    type Output = Result<alloc::vec::Vec<u8>, R::Error>;
211
212    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213        let mut me = self.project();
214        let buf = match me.buf.as_mut() {
215            None => {
216                let size = core::task::ready!(me.inner.as_mut().poll(cx))?;
217                *me.buf = Some(OwnedReadBuf::new(alloc::vec![0; size]));
218                me.buf.as_mut().unwrap()
219            }, Some(b) => b,
220        };
221        let mut ref_buf = buf.into();
222        let res = R::poll_read_more(Pin::new(&mut me.inner.project().inner.project().inner.project().reader), cx, &mut ref_buf);
223        let position = ref_buf.position();
224        buf.set_position(position);
225        core::task::ready!(res)?;
226        Poll::Ready(Ok(buf.clone().into_inner()))
227    }
228}
229
230read_wrap_future!(f cfg(feature = "async_string"), ReadString, ReadU8Vec);
231#[cfg(feature = "async_string")]
232impl<'a, R: AsyncVariableReader + Unpin + ?Sized> Future for ReadString<'a, R> {
233    type Output = Result<alloc::string::String, R::Error>;
234
235    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236        self.project().inner.poll(cx).map(|r| r.and_then(|v| {
237            match alloc::string::String::from_utf8(v) {
238                Ok(s) => Ok(s),
239                Err(e) => Err(R::read_string_error("ReadString", e)),
240            }
241        }))
242    }
243}
244
245pub trait AsyncVariableReader: AsyncVariableReadable {
246    #[inline]
247    fn read_single(&mut self) -> ReadSingle<Self> where Self: Unpin {
248        ReadSingle { reader: self, buf: None }
249    }
250
251    #[inline]
252    fn read_more<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadMore<'a, Self> where Self: Unpin {
253        ReadMore { reader: self, buf: ReadBuf::new(buf) }
254    }
255
256    /// You may call [bytes::BytesMut::limit] to prevent reading more data than needed.
257    #[cfg(feature = "bytes")]
258    #[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
259    #[inline]
260    fn read_more_buf<'a, B: bytes::BufMut>(&'a mut self, buf: &'a mut B) -> ReadMoreBuf<'a, Self, B> where Self: Unpin {
261        ReadMoreBuf { reader: self, buf }
262    }
263
264
265    fn read_bool_error(feature_name: &'static str, byte: u8) -> Self::Error;
266
267    read_wrap_func!(ReadBool, read_bool, read_single);
268
269    define_read_bools_func!();
270
271    define_read_raw_func!();
272    define_read_raw_size_func!();
273
274    define_read_varint_func!();
275    define_read_varint_size_func!();
276    define_read_varint_long_func!();
277    define_read_varint_long_size_func!();
278
279    define_read_signed_varint_func!();
280    define_read_signed_varint_size_func!();
281    define_read_signed_varint_long_func!();
282    define_read_signed_varint_long_size_func!();
283
284    define_read_float_varint_func!();
285    define_read_float_varint_long_func!();
286
287    #[allow(deprecated)]
288    /// Note this future is not zero-cost,
289    /// it will clone the inner vec buf when poll returns ready.
290    ///
291    /// You can use the example below instead.
292    /// ```rust,ignore
293    /// let len = reader.read_usize_varint_ap().await?;
294    /// let buf = vec![0; len];
295    /// reader.read_more(&mut buf).await?;
296    /// ```
297    /// Or you can simply call [Self::read_u8_vec_boxed] instead.
298    /// ```rust,ignore
299    /// reader.read_u8_vec_boxed().await?;
300    /// ```
301    ///
302    /// Now you can call
303    /// ```rust,ignore
304    /// AsyncReaderHelper(&mut reader).help_read_u8_vec().await?;
305    /// ```
306    #[cfg(feature = "async_u8_vec")]
307    #[cfg_attr(docsrs, doc(cfg(feature = "async_u8_vec")))]
308    #[inline]
309    #[deprecated(since = "3.0.0", note = "see docs for details")]
310    fn read_u8_vec(&mut self) -> ReadU8Vec<Self> where Self: Unpin {
311        ReadU8Vec { inner: self.read_usize_varint_ap(), buf: None }
312    }
313
314    #[allow(deprecated)]
315    /// This future is not zero-cost.
316    /// But it is more efficient than [Self::read_u8_vec]
317    /// when you need to read a large number of u8s.
318    #[cfg(feature = "async_u8_vec")]
319    #[cfg_attr(docsrs, doc(cfg(feature = "async_u8_vec")))]
320    #[inline]
321    #[must_use = "futures do nothing unless you `.await` or poll them"]
322    #[deprecated(since = "3.2.0", note = "use [AsyncReaderHelper::help_read_u8_vec] instead")]
323    fn read_u8_vec_boxed(&mut self) -> Pin<alloc::boxed::Box<dyn Future<Output = Result<alloc::vec::Vec<u8>, Self::Error>> + Send + '_>> where Self: Unpin + Send {
324        alloc::boxed::Box::pin(async move {
325            let length = self.read_usize_varint().await?;
326            let mut bytes = alloc::vec![0; length];
327            self.read_more(&mut bytes).await?;
328            Ok(bytes)
329        })
330    }
331
332    #[cfg(feature = "async_string")]
333    #[cfg_attr(docsrs, doc(cfg(feature = "async_string")))]
334    fn read_string_error(future_name: &'static str, error: alloc::string::FromUtf8Error) -> Self::Error;
335
336    #[allow(deprecated)]
337    /// This future is based on [Self::read_u8_vec],
338    /// which is not zero-cost and deprecated.
339    ///
340    /// Or you can simply call [Self::read_string_boxed] instead.
341    /// ```rust,ignore
342    /// reader.read_string_boxed().await?;
343    /// ```
344    ///
345    /// Now you can call
346    /// ```rust,ignore
347    /// AsyncReaderHelper(&mut reader).help_read_string().await?;
348    /// ```
349    #[cfg(feature = "async_string")]
350    #[cfg_attr(docsrs, doc(cfg(feature = "async_string")))]
351    #[inline]
352    #[deprecated(since = "3.0.0", note = "see docs for details")]
353    fn read_string(&mut self) -> ReadString<Self> where Self: Unpin {
354        ReadString { inner: self.read_u8_vec() }
355    }
356
357    #[allow(deprecated)]
358    /// This future is not zero-cost.
359    /// But it is more efficient than [Self::read_string]
360    /// when you need to read a long string.
361    #[cfg(feature = "async_string")]
362    #[cfg_attr(docsrs, doc(cfg(feature = "async_string")))]
363    #[inline]
364    #[must_use = "futures do nothing unless you `.await` or poll them"]
365    #[deprecated(since = "3.2.0", note = "use [AsyncReaderHelper::help_read_string] instead")]
366    fn read_string_boxed(&mut self) -> Pin<alloc::boxed::Box<dyn Future<Output = Result<alloc::string::String, Self::Error>> + Send + '_>> where Self: Unpin + Send {
367        alloc::boxed::Box::pin(async move {
368            match alloc::string::String::from_utf8(self.read_u8_vec_boxed().await?) {
369                Ok(s) => Ok(s),
370                Err(e) => Err(Self::read_string_error("ReadString", e)),
371            }
372        })
373    }
374}