async_ioutils/
bridge.rs

1use std::io::{Read, Write};
2use tokio::io::{AsyncRead, ReadBuf, AsyncWrite};
3
4pub struct ReaderBridge<R: AsyncRead + Unpin> {
5    inner: R,
6}
7
8impl<R: AsyncRead + Unpin> ReaderBridge<R> {
9    pub fn new(inner: R) -> Self {
10        Self { inner }
11    }
12}
13
14impl<R: AsyncRead + Unpin> Read for ReaderBridge<R> {
15    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
16        tokio::runtime::Handle::current()
17        .block_on(tokio::io::AsyncReadExt::read(&mut self.inner, buf))
18    }
19}
20
21pub struct WriterBridge<W: AsyncWrite + Unpin> {
22    inner: W,
23}
24
25impl<W: AsyncWrite + Unpin> WriterBridge<W> {
26    pub fn new(inner: W) -> Self {
27        Self { inner }
28    }
29}
30
31impl<W: AsyncWrite + Unpin> Write for WriterBridge<W> {
32    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
33        tokio::runtime::Handle::current()
34        .block_on(tokio::io::AsyncWriteExt::write(&mut self.inner, buf))
35    }
36
37    fn flush(&mut self) -> std::io::Result<()> {
38        tokio::runtime::Handle::current()
39        .block_on(tokio::io::AsyncWriteExt::flush(&mut self.inner))
40    }
41}
42
43pub struct AsyncReadBridge<R: Read + Unpin> {
44    inner: R,
45}
46
47impl<R: Read + Unpin> AsyncReadBridge<R> {
48    pub fn new(inner: R) -> Self {
49        Self { inner }
50    }
51}
52
53impl<R: Read + Unpin> AsyncRead for AsyncReadBridge<R> {
54    fn poll_read(
55        mut self: std::pin::Pin<&mut Self>,
56        _: &mut std::task::Context<'_>,
57        buf: &mut ReadBuf<'_>,
58    ) -> std::task::Poll<std::io::Result<()>> {
59        let mut buf_unfilled = buf.initialize_unfilled();
60
61        let result = std::pin::Pin::new(&mut self.inner).read(&mut buf_unfilled).and_then(|total_read| {
62            buf.advance(total_read);
63            Ok(())
64        });
65
66        if let Err(err) = &result {
67            if err.kind() == std::io::ErrorKind::WouldBlock {
68                return std::task::Poll::Pending;
69            }
70        }
71        
72        return std::task::Poll::Ready(result);
73    }
74}
75
76pub struct AsyncWriteBridge<W: Write + Unpin> {
77    inner: W,
78}
79
80impl<W: Write + Unpin> AsyncWriteBridge<W> {
81    pub fn new(inner: W) -> Self {
82        Self { inner }
83    }
84}
85
86impl<W: Write + Unpin> AsyncWrite for AsyncWriteBridge<W> {
87    fn poll_write(
88        mut self: std::pin::Pin<&mut Self>,
89        _: &mut std::task::Context<'_>,
90        buf: &[u8],
91    ) -> std::task::Poll<std::io::Result<usize>> {
92        let result = std::pin::Pin::new(&mut self.inner).write(buf);
93
94        if let Err(err) = &result {
95            if err.kind() == std::io::ErrorKind::WouldBlock {
96                return std::task::Poll::Pending;
97            }
98        }
99
100        return std::task::Poll::Ready(result);
101    }
102
103    fn poll_flush(
104        mut self: std::pin::Pin<&mut Self>,
105        _: &mut std::task::Context<'_>,
106    ) -> std::task::Poll<std::io::Result<()>> {
107        let result = std::pin::Pin::new(&mut self.inner).flush();
108
109        if let Err(err) = &result {
110            if err.kind() == std::io::ErrorKind::WouldBlock {
111                return std::task::Poll::Pending;
112            }
113        }
114
115        return std::task::Poll::Ready(result);
116    }
117
118    fn poll_shutdown(
119        self: std::pin::Pin<&mut Self>,
120        cx: &mut std::task::Context<'_>,
121    ) -> std::task::Poll<std::io::Result<()>> {
122        self.poll_flush(cx)
123    }
124}