1use std::io;
4use std::io::Read;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::Buf;
9use futures_util::{AsyncRead, Future, FutureExt};
10
11use crate::{AsyncSkip, Skip};
12
13pub fn sanitize<R, F, Fut>(input: R, fun: F) -> Fut::Output
19where
20 F: FnOnce(AsyncInputAdapter<R>) -> Fut,
21 Fut: Future,
22{
23 let future = fun(AsyncInputAdapter(input));
25
26 future.now_or_never().unwrap_or_else(|| unreachable!())
28}
29
30pub fn buf_async_reader<B: Buf + Unpin>(input: B) -> impl AsyncRead + Unpin {
32 AsyncInputAdapter(input.reader())
33}
34
35pub struct AsyncInputAdapter<T>(T);
39
40impl<T: Read + Unpin> AsyncRead for AsyncInputAdapter<T> {
45 fn poll_read(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
46 self.0.read(buf).into()
47 }
48
49 fn poll_read_vectored(
50 mut self: Pin<&mut Self>,
51 _cx: &mut Context<'_>,
52 bufs: &mut [io::IoSliceMut<'_>],
53 ) -> Poll<io::Result<usize>> {
54 self.0.read_vectored(bufs).into()
55 }
56}
57
58impl<T: Skip + Unpin> AsyncSkip for AsyncInputAdapter<T> {
59 fn poll_skip(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, amount: u64) -> Poll<io::Result<()>> {
60 self.0.skip(amount).into()
61 }
62
63 fn poll_stream_position(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
64 self.0.stream_position().into()
65 }
66
67 fn poll_stream_len(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
68 self.0.stream_len().into()
69 }
70}
71
72impl<T: Unpin> Unpin for AsyncInputAdapter<T> {}