embedded_io_adapters/
tokio_1.rs

1//! Adapters to/from `tokio::io` traits.
2
3// MSRV is 1.60 if you don't enable async, 1.80 if you do.
4// Cargo.toml has 1.60, which makes Clippy complain that `poll_fn` was introduced
5// in 1.64. So, just silence it for this file.
6#![allow(clippy::incompatible_msrv)]
7
8use core::future::poll_fn;
9use core::pin::Pin;
10use core::task::Poll;
11
12use tokio::io::AsyncBufReadExt;
13
14/// Adapter from `tokio::io` traits.
15#[derive(Clone)]
16pub struct FromTokio<T: ?Sized> {
17    inner: T,
18}
19
20impl<T> FromTokio<T> {
21    /// Create a new adapter.
22    pub fn new(inner: T) -> Self {
23        Self { inner }
24    }
25
26    /// Consume the adapter, returning the inner object.
27    pub fn into_inner(self) -> T {
28        self.inner
29    }
30}
31
32impl<T: ?Sized> FromTokio<T> {
33    /// Borrow the inner object.
34    pub fn inner(&self) -> &T {
35        &self.inner
36    }
37
38    /// Mutably borrow the inner object.
39    pub fn inner_mut(&mut self) -> &mut T {
40        &mut self.inner
41    }
42}
43
44impl<T: ?Sized> embedded_io::ErrorType for FromTokio<T> {
45    type Error = std::io::Error;
46}
47
48impl<T: tokio::io::AsyncRead + Unpin + ?Sized> embedded_io_async::Read for FromTokio<T> {
49    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
50        // The current tokio implementation (https://github.com/tokio-rs/tokio/blob/tokio-1.33.0/tokio/src/io/poll_evented.rs#L165)
51        // does not consider the case of buf.is_empty() as a special case,
52        // which can cause Poll::Pending to be returned at the end of the stream when called with an empty buffer.
53        // This poll will, however, never become ready, as no more bytes will be received.
54        if buf.is_empty() {
55            return Ok(0);
56        }
57
58        poll_fn(|cx| {
59            let mut buf = tokio::io::ReadBuf::new(buf);
60            match Pin::new(&mut self.inner).poll_read(cx, &mut buf) {
61                Poll::Ready(r) => match r {
62                    Ok(()) => Poll::Ready(Ok(buf.filled().len())),
63                    Err(e) => Poll::Ready(Err(e)),
64                },
65                Poll::Pending => Poll::Pending,
66            }
67        })
68        .await
69    }
70}
71
72impl<T: tokio::io::AsyncBufRead + Unpin + ?Sized> embedded_io_async::BufRead for FromTokio<T> {
73    async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
74        self.inner.fill_buf().await
75    }
76
77    fn consume(&mut self, amt: usize) {
78        Pin::new(&mut self.inner).consume(amt);
79    }
80}
81
82impl<T: tokio::io::AsyncWrite + Unpin + ?Sized> embedded_io_async::Write for FromTokio<T> {
83    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
84        match poll_fn(|cx| Pin::new(&mut self.inner).poll_write(cx, buf)).await {
85            Ok(0) if !buf.is_empty() => Err(std::io::ErrorKind::WriteZero.into()),
86            Ok(n) => Ok(n),
87            Err(e) => Err(e),
88        }
89    }
90
91    async fn flush(&mut self) -> Result<(), Self::Error> {
92        poll_fn(|cx| Pin::new(&mut self.inner).poll_flush(cx)).await
93    }
94}
95
96impl<T: tokio::io::AsyncSeek + Unpin + ?Sized> embedded_io_async::Seek for FromTokio<T> {
97    async fn seek(&mut self, pos: embedded_io::SeekFrom) -> Result<u64, Self::Error> {
98        // Note: `start_seek` can return an error if there is another seek in progress.
99        // Therefor it is recommended to call `poll_complete` before any call to `start_seek`.
100        poll_fn(|cx| Pin::new(&mut self.inner).poll_complete(cx)).await?;
101        Pin::new(&mut self.inner).start_seek(pos.into())?;
102        poll_fn(|cx| Pin::new(&mut self.inner).poll_complete(cx)).await
103    }
104}
105
106// TODO: ToTokio.
107// It's a bit tricky because tokio::io is "stateless", while we're "stateful" (we
108// return futures that borrow Self and get polled for the duration of the operation.)
109// It can probably done by storing the futures in Self, with unsafe Pin hacks because
110// we're a self-referential struct