embedded_io_adapters/
tokio_1.rs

1//! Adapters to/from `tokio::io` traits.
2
3use core::future::poll_fn;
4use core::pin::Pin;
5use core::task::Poll;
6
7use tokio::io::AsyncBufReadExt;
8
9/// Adapter from `tokio::io` traits.
10#[derive(Clone)]
11pub struct FromTokio<T: ?Sized> {
12    inner: T,
13}
14
15impl<T> FromTokio<T> {
16    /// Create a new adapter.
17    pub fn new(inner: T) -> Self {
18        Self { inner }
19    }
20
21    /// Consume the adapter, returning the inner object.
22    pub fn into_inner(self) -> T {
23        self.inner
24    }
25}
26
27impl<T: ?Sized> FromTokio<T> {
28    /// Borrow the inner object.
29    pub fn inner(&self) -> &T {
30        &self.inner
31    }
32
33    /// Mutably borrow the inner object.
34    pub fn inner_mut(&mut self) -> &mut T {
35        &mut self.inner
36    }
37}
38
39impl<T: ?Sized> embedded_io::ErrorType for FromTokio<T> {
40    type Error = std::io::Error;
41}
42
43impl<T: tokio::io::AsyncRead + Unpin + ?Sized> embedded_io_async::Read for FromTokio<T> {
44    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
45        // The current tokio implementation (https://github.com/tokio-rs/tokio/blob/tokio-1.33.0/tokio/src/io/poll_evented.rs#L165)
46        // does not consider the case of buf.is_empty() as a special case,
47        // which can cause Poll::Pending to be returned at the end of the stream when called with an empty buffer.
48        // This poll will, however, never become ready, as no more bytes will be received.
49        if buf.is_empty() {
50            return Ok(0);
51        }
52
53        poll_fn(|cx| {
54            let mut buf = tokio::io::ReadBuf::new(buf);
55            match Pin::new(&mut self.inner).poll_read(cx, &mut buf) {
56                Poll::Ready(r) => match r {
57                    Ok(()) => Poll::Ready(Ok(buf.filled().len())),
58                    Err(e) => Poll::Ready(Err(e)),
59                },
60                Poll::Pending => Poll::Pending,
61            }
62        })
63        .await
64    }
65}
66
67impl<T: tokio::io::AsyncBufRead + Unpin + ?Sized> embedded_io_async::BufRead for FromTokio<T> {
68    async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
69        self.inner.fill_buf().await
70    }
71
72    fn consume(&mut self, amt: usize) {
73        Pin::new(&mut self.inner).consume(amt);
74    }
75}
76
77impl<T: tokio::io::AsyncWrite + Unpin + ?Sized> embedded_io_async::Write for FromTokio<T> {
78    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
79        match poll_fn(|cx| Pin::new(&mut self.inner).poll_write(cx, buf)).await {
80            Ok(0) if !buf.is_empty() => Err(std::io::ErrorKind::WriteZero.into()),
81            Ok(n) => Ok(n),
82            Err(e) => Err(e),
83        }
84    }
85
86    async fn flush(&mut self) -> Result<(), Self::Error> {
87        poll_fn(|cx| Pin::new(&mut self.inner).poll_flush(cx)).await
88    }
89}
90
91impl<T: tokio::io::AsyncSeek + Unpin + ?Sized> embedded_io_async::Seek for FromTokio<T> {
92    async fn seek(&mut self, pos: embedded_io::SeekFrom) -> Result<u64, Self::Error> {
93        // Note: `start_seek` can return an error if there is another seek in progress.
94        // Therefor it is recommended to call `poll_complete` before any call to `start_seek`.
95        poll_fn(|cx| Pin::new(&mut self.inner).poll_complete(cx)).await?;
96        Pin::new(&mut self.inner).start_seek(pos.into())?;
97        poll_fn(|cx| Pin::new(&mut self.inner).poll_complete(cx)).await
98    }
99}
100
101// TODO: ToTokio.
102// It's a bit tricky because tokio::io is "stateless", while we're "stateful" (we
103// return futures that borrow Self and get polled for the duration of the operation.)
104// It can probably done by storing the futures in Self, with unsafe Pin hacks because
105// we're a self-referential struct