embedded_io_adapters/
tokio_1.rs1#![allow(clippy::incompatible_msrv)]
7
8use core::future::poll_fn;
9use core::pin::Pin;
10use core::task::Poll;
11
12use tokio::io::AsyncBufReadExt;
13
14#[derive(Clone)]
16pub struct FromTokio<T: ?Sized> {
17 inner: T,
18}
19
20impl<T> FromTokio<T> {
21 pub fn new(inner: T) -> Self {
23 Self { inner }
24 }
25
26 pub fn into_inner(self) -> T {
28 self.inner
29 }
30}
31
32impl<T: ?Sized> FromTokio<T> {
33 pub fn inner(&self) -> &T {
35 &self.inner
36 }
37
38 pub fn inner_mut(&mut self) -> &mut T {
40 &mut self.inner
41 }
42}
43
44impl<T: ?Sized> embedded_io::ErrorType for FromTokio<T> {
45 type Error = std::io::Error;
46}
47
48impl<T: tokio::io::AsyncRead + Unpin + ?Sized> embedded_io_async::Read for FromTokio<T> {
49 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
50 if buf.is_empty() {
55 return Ok(0);
56 }
57
58 poll_fn(|cx| {
59 let mut buf = tokio::io::ReadBuf::new(buf);
60 match Pin::new(&mut self.inner).poll_read(cx, &mut buf) {
61 Poll::Ready(r) => match r {
62 Ok(()) => Poll::Ready(Ok(buf.filled().len())),
63 Err(e) => Poll::Ready(Err(e)),
64 },
65 Poll::Pending => Poll::Pending,
66 }
67 })
68 .await
69 }
70}
71
72impl<T: tokio::io::AsyncBufRead + Unpin + ?Sized> embedded_io_async::BufRead for FromTokio<T> {
73 async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
74 self.inner.fill_buf().await
75 }
76
77 fn consume(&mut self, amt: usize) {
78 Pin::new(&mut self.inner).consume(amt);
79 }
80}
81
82impl<T: tokio::io::AsyncWrite + Unpin + ?Sized> embedded_io_async::Write for FromTokio<T> {
83 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
84 match poll_fn(|cx| Pin::new(&mut self.inner).poll_write(cx, buf)).await {
85 Ok(0) if !buf.is_empty() => Err(std::io::ErrorKind::WriteZero.into()),
86 Ok(n) => Ok(n),
87 Err(e) => Err(e),
88 }
89 }
90
91 async fn flush(&mut self) -> Result<(), Self::Error> {
92 poll_fn(|cx| Pin::new(&mut self.inner).poll_flush(cx)).await
93 }
94}
95
96impl<T: tokio::io::AsyncSeek + Unpin + ?Sized> embedded_io_async::Seek for FromTokio<T> {
97 async fn seek(&mut self, pos: embedded_io::SeekFrom) -> Result<u64, Self::Error> {
98 poll_fn(|cx| Pin::new(&mut self.inner).poll_complete(cx)).await?;
101 Pin::new(&mut self.inner).start_seek(pos.into())?;
102 poll_fn(|cx| Pin::new(&mut self.inner).poll_complete(cx)).await
103 }
104}
105
106