fut_compat/io/
tokio.rs

1use std::io::{Error, ErrorKind, SeekFrom};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::io::{AsyncRead, AsyncBufRead, AsyncWrite, AsyncSeek};
6
7use ::tokio::io::{
8    AsyncRead as TokioAsyncRead,
9    AsyncBufRead as TokioAsyncBufRead,
10    AsyncWrite as TokioAsyncWrite,
11    AsyncSeek as TokioAsyncSeek,
12    ReadBuf,
13};
14
15
16
17/// Provides compatibility between objects implementing [`tokio`](https://docs.rs/tokio)'s async io traits and
18/// the corresponding traits defined by the [`futures`](https://docs.rs/futures) crate.
19#[cfg(feature = "tokio-rt")]
20#[cfg_attr(docsrs, doc(cfg(feature = "tokio-rt")))]
21pub struct TokioCompat<T> {
22    inner: T,
23    seek_in_progress: bool,
24}
25
26impl<T> TokioCompat<T> {
27    /// Creates a new instance by wrapping the `inner` object.
28    pub fn new(inner: T) -> Self {
29        Self {
30            inner,
31            seek_in_progress: false,
32        }
33    }
34
35    /// Get a reference to the wrapped object.
36    pub fn get_ref(&self) -> &T {
37        &self.inner
38    }
39
40    /// Get a mutable reference to the wrapped object.
41    pub fn get_mut(&mut self) -> &mut T {
42        &mut self.inner
43    }
44
45    /// Consumes the `TokioCompat` object and returns the wrapped object.
46    pub fn into_inner(self) -> T {
47        self.inner
48    }
49}
50
51impl<T> AsyncRead for TokioCompat<T>
52where
53    T: TokioAsyncRead + Unpin,
54{
55    fn poll_read(
56        self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58        buf: &mut [u8]
59    ) -> Poll<Result<usize, Error>> {
60        let inner = Pin::into_inner(self);
61
62        let inner = Pin::new(&mut inner.inner);
63
64        let mut buf = ReadBuf::new(buf);
65        let filled_len = buf.filled().len();
66
67        match TokioAsyncRead::poll_read(inner, cx, &mut buf) {
68            Poll::Pending => return Poll::Pending,
69            Poll::Ready(Ok(())) => {
70                let filled_len = buf.filled().len()-filled_len;
71
72                return Poll::Ready(Ok(filled_len));
73            }
74            Poll::Ready(Err(err)) => {
75                match err.kind() {
76                    ErrorKind::WouldBlock => return Poll::Pending,
77                    ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
78                    _ => return Poll::Ready(Err(err))
79                }
80            }
81        }
82    }
83}
84
85impl<T> AsyncBufRead for TokioCompat<T>
86where
87    T: TokioAsyncBufRead + Unpin,
88{
89    fn poll_fill_buf(
90        self: Pin<&mut Self>,
91        cx: &mut Context<'_>,
92    ) -> Poll<Result<&[u8], Error>> {
93        let inner = Pin::into_inner(self);
94
95        let inner = Pin::new(&mut inner.inner);
96
97        match TokioAsyncBufRead::poll_fill_buf(inner, cx) {
98            Poll::Pending => return Poll::Pending,
99            Poll::Ready(Ok(buf)) => Poll::Ready(Ok(buf)),
100            Poll::Ready(Err(err)) => {
101                match err.kind() {
102                    ErrorKind::WouldBlock => return Poll::Pending,
103                    ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
104                    _ => return Poll::Ready(Err(err))
105                }
106            }
107        }
108    }
109
110    fn consume(self: Pin<&mut Self>, amt: usize) {
111        let inner = Pin::into_inner(self);
112
113        let inner = Pin::new(&mut inner.inner);
114
115        TokioAsyncBufRead::consume(inner, amt)
116    }
117}
118
119impl<T> AsyncWrite for TokioCompat<T>
120where
121    T: TokioAsyncWrite + Unpin,
122{
123    fn poll_write(
124        self: Pin<&mut Self>,
125        cx: &mut Context<'_>,
126        buf: &[u8]
127    ) -> Poll<Result<usize, Error>> {
128        let inner = Pin::into_inner(self);
129
130        let inner = Pin::new(&mut inner.inner);
131
132        match TokioAsyncWrite::poll_write(inner, cx, buf) {
133            Poll::Pending => return Poll::Pending,
134            Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)),
135            Poll::Ready(Err(err)) => {
136                match err.kind() {
137                    ErrorKind::WouldBlock => return Poll::Pending,
138                    ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
139                    _ => return Poll::Ready(Err(err))
140                }
141            }
142        }
143    }
144
145    fn poll_flush(
146        self: Pin<&mut Self>,
147        cx: &mut Context<'_>
148    ) -> Poll<Result<(), Error>> {
149        let inner = Pin::into_inner(self);
150
151        let inner = Pin::new(&mut inner.inner);
152
153        match TokioAsyncWrite::poll_flush(inner, cx) {
154            Poll::Pending => return Poll::Pending,
155            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
156            Poll::Ready(Err(err)) => {
157                match err.kind() {
158                    ErrorKind::WouldBlock => return Poll::Pending,
159                    ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
160                    _ => return Poll::Ready(Err(err))
161                }
162            }
163        }
164    }
165
166    fn poll_close(
167        self: Pin<&mut Self>,
168        cx: &mut Context<'_>
169    ) -> Poll<Result<(), Error>> {
170        let inner = Pin::into_inner(self);
171
172        let inner = Pin::new(&mut inner.inner);
173
174        match TokioAsyncWrite::poll_shutdown(inner, cx) {
175            Poll::Pending => return Poll::Pending,
176            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
177            Poll::Ready(Err(err)) => {
178                match err.kind() {
179                    ErrorKind::WouldBlock => return Poll::Pending,
180                    ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
181                    _ => return Poll::Ready(Err(err))
182                }
183            }
184        }
185    }
186}
187
188impl<T> AsyncSeek for TokioCompat<T>
189where
190    T: TokioAsyncSeek + Unpin,
191{
192    fn poll_seek(
193        self: Pin<&mut Self>,
194        cx: &mut Context<'_>,
195        pos: SeekFrom,
196    ) -> Poll<Result<u64, Error>> {
197        let inner = Pin::into_inner(self);
198
199        if !inner.seek_in_progress {
200            if let Err(err) = Pin::new(&mut inner.inner).start_seek(pos) {
201                return Poll::Ready(Err(err));
202            }
203
204            inner.seek_in_progress = true;
205        }
206
207        match TokioAsyncSeek::poll_complete(Pin::new(&mut inner.inner), cx) {
208            Poll::Pending => return Poll::Pending,
209            Poll::Ready(result) => {
210                inner.seek_in_progress = false;
211
212                match result {
213                    Ok(pos) => return Poll::Ready(Ok(pos)),
214                    Err(err) => {
215                        match err.kind() {
216                            ErrorKind::WouldBlock => return Poll::Pending,
217                            ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
218                            _ => return Poll::Ready(Err(err))
219                        }
220                    }
221                }
222            }
223        }
224    }
225}