1use std::io::{Error, ErrorKind, SeekFrom};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::io::{AsyncRead, AsyncBufRead, AsyncWrite, AsyncSeek};
6
7use ::tokio::io::{
8 AsyncRead as TokioAsyncRead,
9 AsyncBufRead as TokioAsyncBufRead,
10 AsyncWrite as TokioAsyncWrite,
11 AsyncSeek as TokioAsyncSeek,
12 ReadBuf,
13};
14
15
16
17#[cfg(feature = "tokio-rt")]
20#[cfg_attr(docsrs, doc(cfg(feature = "tokio-rt")))]
21pub struct TokioCompat<T> {
22 inner: T,
23 seek_in_progress: bool,
24}
25
26impl<T> TokioCompat<T> {
27 pub fn new(inner: T) -> Self {
29 Self {
30 inner,
31 seek_in_progress: false,
32 }
33 }
34
35 pub fn get_ref(&self) -> &T {
37 &self.inner
38 }
39
40 pub fn get_mut(&mut self) -> &mut T {
42 &mut self.inner
43 }
44
45 pub fn into_inner(self) -> T {
47 self.inner
48 }
49}
50
51impl<T> AsyncRead for TokioCompat<T>
52where
53 T: TokioAsyncRead + Unpin,
54{
55 fn poll_read(
56 self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 buf: &mut [u8]
59 ) -> Poll<Result<usize, Error>> {
60 let inner = Pin::into_inner(self);
61
62 let inner = Pin::new(&mut inner.inner);
63
64 let mut buf = ReadBuf::new(buf);
65 let filled_len = buf.filled().len();
66
67 match TokioAsyncRead::poll_read(inner, cx, &mut buf) {
68 Poll::Pending => return Poll::Pending,
69 Poll::Ready(Ok(())) => {
70 let filled_len = buf.filled().len()-filled_len;
71
72 return Poll::Ready(Ok(filled_len));
73 }
74 Poll::Ready(Err(err)) => {
75 match err.kind() {
76 ErrorKind::WouldBlock => return Poll::Pending,
77 ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
78 _ => return Poll::Ready(Err(err))
79 }
80 }
81 }
82 }
83}
84
85impl<T> AsyncBufRead for TokioCompat<T>
86where
87 T: TokioAsyncBufRead + Unpin,
88{
89 fn poll_fill_buf(
90 self: Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 ) -> Poll<Result<&[u8], Error>> {
93 let inner = Pin::into_inner(self);
94
95 let inner = Pin::new(&mut inner.inner);
96
97 match TokioAsyncBufRead::poll_fill_buf(inner, cx) {
98 Poll::Pending => return Poll::Pending,
99 Poll::Ready(Ok(buf)) => Poll::Ready(Ok(buf)),
100 Poll::Ready(Err(err)) => {
101 match err.kind() {
102 ErrorKind::WouldBlock => return Poll::Pending,
103 ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
104 _ => return Poll::Ready(Err(err))
105 }
106 }
107 }
108 }
109
110 fn consume(self: Pin<&mut Self>, amt: usize) {
111 let inner = Pin::into_inner(self);
112
113 let inner = Pin::new(&mut inner.inner);
114
115 TokioAsyncBufRead::consume(inner, amt)
116 }
117}
118
119impl<T> AsyncWrite for TokioCompat<T>
120where
121 T: TokioAsyncWrite + Unpin,
122{
123 fn poll_write(
124 self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 buf: &[u8]
127 ) -> Poll<Result<usize, Error>> {
128 let inner = Pin::into_inner(self);
129
130 let inner = Pin::new(&mut inner.inner);
131
132 match TokioAsyncWrite::poll_write(inner, cx, buf) {
133 Poll::Pending => return Poll::Pending,
134 Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)),
135 Poll::Ready(Err(err)) => {
136 match err.kind() {
137 ErrorKind::WouldBlock => return Poll::Pending,
138 ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
139 _ => return Poll::Ready(Err(err))
140 }
141 }
142 }
143 }
144
145 fn poll_flush(
146 self: Pin<&mut Self>,
147 cx: &mut Context<'_>
148 ) -> Poll<Result<(), Error>> {
149 let inner = Pin::into_inner(self);
150
151 let inner = Pin::new(&mut inner.inner);
152
153 match TokioAsyncWrite::poll_flush(inner, cx) {
154 Poll::Pending => return Poll::Pending,
155 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
156 Poll::Ready(Err(err)) => {
157 match err.kind() {
158 ErrorKind::WouldBlock => return Poll::Pending,
159 ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
160 _ => return Poll::Ready(Err(err))
161 }
162 }
163 }
164 }
165
166 fn poll_close(
167 self: Pin<&mut Self>,
168 cx: &mut Context<'_>
169 ) -> Poll<Result<(), Error>> {
170 let inner = Pin::into_inner(self);
171
172 let inner = Pin::new(&mut inner.inner);
173
174 match TokioAsyncWrite::poll_shutdown(inner, cx) {
175 Poll::Pending => return Poll::Pending,
176 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
177 Poll::Ready(Err(err)) => {
178 match err.kind() {
179 ErrorKind::WouldBlock => return Poll::Pending,
180 ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
181 _ => return Poll::Ready(Err(err))
182 }
183 }
184 }
185 }
186}
187
188impl<T> AsyncSeek for TokioCompat<T>
189where
190 T: TokioAsyncSeek + Unpin,
191{
192 fn poll_seek(
193 self: Pin<&mut Self>,
194 cx: &mut Context<'_>,
195 pos: SeekFrom,
196 ) -> Poll<Result<u64, Error>> {
197 let inner = Pin::into_inner(self);
198
199 if !inner.seek_in_progress {
200 if let Err(err) = Pin::new(&mut inner.inner).start_seek(pos) {
201 return Poll::Ready(Err(err));
202 }
203
204 inner.seek_in_progress = true;
205 }
206
207 match TokioAsyncSeek::poll_complete(Pin::new(&mut inner.inner), cx) {
208 Poll::Pending => return Poll::Pending,
209 Poll::Ready(result) => {
210 inner.seek_in_progress = false;
211
212 match result {
213 Ok(pos) => return Poll::Ready(Ok(pos)),
214 Err(err) => {
215 match err.kind() {
216 ErrorKind::WouldBlock => return Poll::Pending,
217 ErrorKind::Interrupted => return Poll::Ready(Err(Error::new(ErrorKind::Other, "Interrupted."))),
218 _ => return Poll::Ready(Err(err))
219 }
220 }
221 }
222 }
223 }
224 }
225}