mediasan_common/
sync.rs

1//! Adapter utilities to run carefully designed async code in a sync context.
2
3use std::io;
4use std::io::Read;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytes::Buf;
9use futures_util::{AsyncRead, Future, FutureExt};
10
11use crate::{AsyncSkip, Skip};
12
13//
14// public functions
15//
16
17/// Run an async function accepting [`AsyncRead`] + [`AsyncSkip`] input in a sync context.
18pub fn sanitize<R, F, Fut>(input: R, fun: F) -> Fut::Output
19where
20    F: FnOnce(AsyncInputAdapter<R>) -> Fut,
21    Fut: Future,
22{
23    // Using AsyncInputAdapter is OK here because this is a blocking (non-async) API.
24    let future = fun(AsyncInputAdapter(input));
25
26    // `future` should never yield, as the wrapped synchronous input is the only thing `awaited` upon in the sanitizer.
27    future.now_or_never().unwrap_or_else(|| unreachable!())
28}
29
30/// Return an adapter to use [`Buf`] as [`AsyncRead`].
31pub fn buf_async_reader<B: Buf + Unpin>(input: B) -> impl AsyncRead + Unpin {
32    AsyncInputAdapter(input.reader())
33}
34
35/// An adapter for [`Read`] + [`Skip`] types implementing [`AsyncRead`] + [`AsyncSkip`].
36///
37/// The [`AsyncRead`] + [`AsyncSkip`] implementations will block on IO, so it must not be used when exposing async APIs.
38pub struct AsyncInputAdapter<T>(T);
39
40//
41// AsyncInputAdapter impls
42//
43
44impl<T: Read + Unpin> AsyncRead for AsyncInputAdapter<T> {
45    fn poll_read(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
46        self.0.read(buf).into()
47    }
48
49    fn poll_read_vectored(
50        mut self: Pin<&mut Self>,
51        _cx: &mut Context<'_>,
52        bufs: &mut [io::IoSliceMut<'_>],
53    ) -> Poll<io::Result<usize>> {
54        self.0.read_vectored(bufs).into()
55    }
56}
57
58impl<T: Skip + Unpin> AsyncSkip for AsyncInputAdapter<T> {
59    fn poll_skip(mut self: Pin<&mut Self>, _cx: &mut Context<'_>, amount: u64) -> Poll<io::Result<()>> {
60        self.0.skip(amount).into()
61    }
62
63    fn poll_stream_position(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
64        self.0.stream_position().into()
65    }
66
67    fn poll_stream_len(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
68        self.0.stream_len().into()
69    }
70}
71
72impl<T: Unpin> Unpin for AsyncInputAdapter<T> {}