1use std::io;
3use std::sync::Arc;
4
5use futures::{Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01};
6use futures::task::{self as task01, Task as Task01};
7
8use futures_core::{Async as Async02, Future as Future02, Never, Stream as Stream02};
9use futures_core::task::{Context, LocalMap, Wake, Waker};
10use futures_core::executor::{Executor as Executor02, SpawnError};
11use futures_io::{AsyncRead as AsyncRead02, AsyncWrite as AsyncWrite02};
12use tokio_io::{AsyncRead as AsyncReadTk, AsyncWrite as AsyncWriteTk};
13
14#[derive(Debug)]
16#[must_use = "futures do nothing unless polled"]
17pub struct Future02As01<E, F> {
18 exec: E,
19 v02: F,
20}
21
22#[derive(Debug)]
24#[must_use = "futures do nothing unless polled"]
25pub struct Future02NeverAs01Unit<E, F> {
26 exec: E,
27 v02: F,
28}
29
30#[derive(Debug)]
32#[must_use = "streams do nothing unless polled"]
33pub struct Stream02As01<E, S> {
34 exec: E,
35 v02: S,
36}
37
38#[derive(Debug)]
40pub struct AsyncIo02AsTokio<E, S> {
41 exec: E,
42 v02: S,
43}
44
45#[allow(missing_debug_implementations)]
47pub struct BoxedExecutor02(pub(crate) Box<Executor02 + Send>);
48
49impl Executor02 for BoxedExecutor02 {
50 fn spawn(&mut self, f: Box<Future02<Item=(), Error=Never> + Send>) -> Result<(), SpawnError> {
51 (&mut *self.0).spawn(f)
52 }
53}
54
55pub trait FutureInto01: Future02 {
59 fn into_01_compat<E>(self, exec: E) -> Future02As01<E, Self>
64 where
65 Self: Sized,
66 E: Executor02;
67
68 fn into_01_compat_never_unit<E>(self, exec: E) -> Future02NeverAs01Unit<E, Self>
73 where
74 Self: Future02<Error=Never> + Sized,
75 E: Executor02;
76}
77
78pub trait StreamInto01: Stream02 {
82 fn into_01_compat<E>(self, exec: E) -> Stream02As01<E, Self>
87 where
88 Self: Sized,
89 E: Executor02;
90}
91
92pub trait AsyncIoIntoTokio {
96 fn into_tokio_compat<E>(self, exec: E) -> AsyncIo02AsTokio<E, Self>
101 where
102 Self: AsyncRead02 + AsyncWrite02 + Sized,
103 E: Executor02;
104}
105
106impl<F> FutureInto01 for F
107where
108 F: Future02,
109{
110 fn into_01_compat<E>(self, exec: E) -> Future02As01<E, Self>
111 where
112 Self: Sized,
113 E: Executor02,
114 {
115 Future02As01 {
116 exec,
117 v02: self,
118 }
119 }
120
121 fn into_01_compat_never_unit<E>(self, exec: E) -> Future02NeverAs01Unit<E, Self>
122 where
123 Self: Sized,
124 E: Executor02,
125 {
126 Future02NeverAs01Unit {
127 exec,
128 v02: self,
129 }
130 }
131}
132
133impl<E, F> Future01 for Future02As01<E, F>
134where
135 F: Future02,
136 E: Executor02,
137{
138 type Item = F::Item;
139 type Error = F::Error;
140
141 fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
142 let mut locals = LocalMap::new();
143 let waker = current_as_waker();
144 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
145
146 match self.v02.poll(&mut cx) {
147 Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
148 Ok(Async02::Pending) => Ok(Async01::NotReady),
149 Err(err) => Err(err),
150 }
151 }
152}
153
154impl<E, F> Future01 for Future02NeverAs01Unit<E, F>
155where
156 F: Future02<Error=Never>,
157 E: Executor02,
158{
159 type Item = F::Item;
160 type Error = ();
161
162 fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
163 let mut locals = LocalMap::new();
164 let waker = current_as_waker();
165 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
166
167 match self.v02.poll(&mut cx) {
168 Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
169 Ok(Async02::Pending) => Ok(Async01::NotReady),
170 Err(never) => match never {}
171 }
172 }
173}
174
175impl<S> StreamInto01 for S
176where
177 S: Stream02,
178{
179 fn into_01_compat<E>(self, exec: E) -> Stream02As01<E, Self>
180 where
181 Self: Sized,
182 E: Executor02,
183 {
184 Stream02As01 {
185 exec,
186 v02: self,
187 }
188 }
189}
190
191impl<E, S> Stream01 for Stream02As01<E, S>
192where
193 S: Stream02,
194 E: Executor02,
195{
196 type Item = S::Item;
197 type Error = S::Error;
198
199 fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
200 let mut locals = LocalMap::new();
201 let waker = current_as_waker();
202 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
203
204 match self.v02.poll_next(&mut cx) {
205 Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
206 Ok(Async02::Pending) => Ok(Async01::NotReady),
207 Err(err) => Err(err),
208 }
209 }
210}
211
212impl<I> AsyncIoIntoTokio for I {
213 fn into_tokio_compat<E>(self, exec: E) -> AsyncIo02AsTokio<E, Self>
214 where
215 Self: AsyncRead02 + AsyncWrite02 + Sized,
216 E: Executor02,
217 {
218 AsyncIo02AsTokio {
219 exec,
220 v02: self,
221 }
222 }
223}
224
225impl<E: Executor02, I: AsyncRead02> io::Read for AsyncIo02AsTokio<E, I> {
226 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
227 let mut locals = LocalMap::new();
228 let waker = current_as_waker();
229 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
230
231 match self.v02.poll_read(&mut cx, buf) {
232 Ok(Async02::Ready(val)) => Ok(val),
233 Ok(Async02::Pending) => Err(would_block()),
234 Err(err) => Err(err),
235 }
236 }
237}
238
239impl<E: Executor02, I: AsyncWrite02> io::Write for AsyncIo02AsTokio<E, I> {
240 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
241 let mut locals = LocalMap::new();
242 let waker = current_as_waker();
243 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
244
245 match self.v02.poll_write(&mut cx, buf) {
246 Ok(Async02::Ready(val)) => Ok(val),
247 Ok(Async02::Pending) => Err(would_block()),
248 Err(err) => Err(err),
249 }
250 }
251
252 fn flush(&mut self) -> io::Result<()> {
253 let mut locals = LocalMap::new();
254 let waker = current_as_waker();
255 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
256
257 match self.v02.poll_flush(&mut cx) {
258 Ok(Async02::Ready(val)) => Ok(val),
259 Ok(Async02::Pending) => Err(would_block()),
260 Err(err) => Err(err),
261 }
262 }
263}
264
265fn would_block() -> io::Error {
266 io::Error::from(io::ErrorKind::WouldBlock)
267}
268
269impl<E: Executor02, I: AsyncRead02> AsyncReadTk for AsyncIo02AsTokio<E, I> {
270 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
271 let init = self.v02.initializer();
272 if init.should_initialize() {
273 init.initialize(buf);
274 true
275 } else {
276 false
277 }
278 }
279}
280
281impl<E: Executor02, I: AsyncWrite02> AsyncWriteTk for AsyncIo02AsTokio<E, I> {
282 fn shutdown(&mut self) -> Poll01<(), io::Error> {
283 let mut locals = LocalMap::new();
284 let waker = current_as_waker();
285 let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
286
287 match self.v02.poll_close(&mut cx) {
288 Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
289 Ok(Async02::Pending) => Ok(Async01::NotReady),
290 Err(err) => Err(err),
291 }
292 }
293}
294
295fn current_as_waker() -> Waker {
299 Waker::from(Arc::new(Current(task01::current())))
300}
301
302struct Current(Task01);
303
304impl Wake for Current {
305 fn wake(arc_self: &Arc<Self>) {
306 arc_self.0.notify();
307 }
308}