futures_compat/
futures_02.rs

1//! futures 0.2.x compatibility.
2use std::io;
3use std::sync::Arc;
4
5use futures::{Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01};
6use futures::task::{self as task01, Task as Task01};
7
8use futures_core::{Async as Async02, Future as Future02, Never, Stream as Stream02};
9use futures_core::task::{Context, LocalMap, Wake, Waker};
10use futures_core::executor::{Executor as Executor02, SpawnError};
11use futures_io::{AsyncRead as AsyncRead02, AsyncWrite as AsyncWrite02};
12use tokio_io::{AsyncRead as AsyncReadTk, AsyncWrite as AsyncWriteTk};
13
14/// Wrap a `Future` from v0.2 as a `Future` from v0.1.
15#[derive(Debug)]
16#[must_use = "futures do nothing unless polled"]
17pub struct Future02As01<E, F> {
18    exec: E,
19    v02: F,
20}
21
22/// A `Future02As01` that maps errors of `Never` to `()`.
23#[derive(Debug)]
24#[must_use = "futures do nothing unless polled"]
25pub struct Future02NeverAs01Unit<E, F> {
26    exec: E,
27    v02: F,
28}
29
30/// Wrap a `Stream` from v0.2 as a `Stream` from v0.1.
31#[derive(Debug)]
32#[must_use = "streams do nothing unless polled"]
33pub struct Stream02As01<E, S> {
34    exec: E,
35    v02: S,
36}
37
38/// Wrap some `AsyncRead` or `AsyncWrite` from v0.2 as the same from tokio-io.
39#[derive(Debug)]
40pub struct AsyncIo02AsTokio<E, S> {
41    exec: E,
42    v02: S,
43}
44
45/// A wrapper of `Box<Executor>` because it's missing from the futures crate (lolz).
46#[allow(missing_debug_implementations)]
47pub struct BoxedExecutor02(pub(crate) Box<Executor02 + Send>);
48
49impl Executor02 for BoxedExecutor02 {
50    fn spawn(&mut self, f: Box<Future02<Item=(), Error=Never> + Send>) -> Result<(), SpawnError> {
51        (&mut *self.0).spawn(f)
52    }
53}
54
55/// A trait to convert any `Future` from v0.2 into a [`Future02As01`](Future02As01).
56///
57/// Implemented for all types that implement v0.2's `Future` automatically.
58pub trait FutureInto01: Future02 {
59    /// Converts this future into a `Future02As01`.
60    ///
61    /// An executor is required to allow this wrapped future to still access
62    /// `Context::spawn` while wrapped.
63    fn into_01_compat<E>(self, exec: E) -> Future02As01<E, Self>
64    where
65        Self: Sized,
66        E: Executor02;
67
68    /// Converts this future into a `Future02NeverAs01Unit`.
69    ///
70    /// An executor is required to allow this wrapped future to still access
71    /// `Context::spawn` while wrapped.
72    fn into_01_compat_never_unit<E>(self, exec: E) -> Future02NeverAs01Unit<E, Self>
73    where
74        Self: Future02<Error=Never> + Sized,
75        E: Executor02;
76}
77
78/// A trait to convert any `Stream` from v0.2 into a [`Stream02As01`](Stream02As01).
79///
80/// Implemented for all types that implement v0.2's `Stream` automatically.
81pub trait StreamInto01: Stream02 {
82    /// Converts this stream into a `Stream02As01`.
83    ///
84    /// An executor is required to allow this wrapped future to still access
85    /// `Context::spawn` while wrapped.
86    fn into_01_compat<E>(self, exec: E) -> Stream02As01<E, Self>
87    where
88        Self: Sized,
89        E: Executor02;
90}
91
92/// A trait to convert any `AsyncRead`/`AsyncWrite` from v0.2 into a [`AsyncIo02AsTokio`](AsyncIo02AsTokio).
93///
94/// Implemented for all types that implement v0.2's `AsyncRead`/`AsyncWrite` automatically.
95pub trait AsyncIoIntoTokio {
96    /// Converts this IO into an `AsyncIo02AsTokio`.
97    ///
98    /// An executor is required to allow this wrapped future to still access
99    /// `Context::spawn` while wrapped.
100    fn into_tokio_compat<E>(self, exec: E) -> AsyncIo02AsTokio<E, Self>
101    where
102        Self: AsyncRead02 + AsyncWrite02 + Sized,
103        E: Executor02;
104}
105
106impl<F> FutureInto01 for F
107where
108    F: Future02,
109{
110    fn into_01_compat<E>(self, exec: E) -> Future02As01<E, Self>
111    where
112        Self: Sized,
113        E: Executor02,
114    {
115        Future02As01 {
116            exec,
117            v02: self,
118        }
119    }
120
121    fn into_01_compat_never_unit<E>(self, exec: E) -> Future02NeverAs01Unit<E, Self>
122    where
123        Self: Sized,
124        E: Executor02,
125    {
126        Future02NeverAs01Unit {
127            exec,
128            v02: self,
129        }
130    }
131}
132
133impl<E, F> Future01 for Future02As01<E, F>
134where
135    F: Future02,
136    E: Executor02,
137{
138    type Item = F::Item;
139    type Error = F::Error;
140
141    fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
142        let mut locals = LocalMap::new();
143        let waker = current_as_waker();
144        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
145
146        match self.v02.poll(&mut cx) {
147            Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
148            Ok(Async02::Pending) => Ok(Async01::NotReady),
149            Err(err) => Err(err),
150        }
151    }
152}
153
154impl<E, F> Future01 for Future02NeverAs01Unit<E, F>
155where
156    F: Future02<Error=Never>,
157    E: Executor02,
158{
159    type Item = F::Item;
160    type Error = ();
161
162    fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
163        let mut locals = LocalMap::new();
164        let waker = current_as_waker();
165        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
166
167        match self.v02.poll(&mut cx) {
168            Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
169            Ok(Async02::Pending) => Ok(Async01::NotReady),
170            Err(never) => match never {}
171        }
172    }
173}
174
175impl<S> StreamInto01 for S
176where
177    S: Stream02,
178{
179    fn into_01_compat<E>(self, exec: E) -> Stream02As01<E, Self>
180    where
181        Self: Sized,
182        E: Executor02,
183    {
184        Stream02As01 {
185            exec,
186            v02: self,
187        }
188    }
189}
190
191impl<E, S> Stream01 for Stream02As01<E, S>
192where
193    S: Stream02,
194    E: Executor02,
195{
196    type Item = S::Item;
197    type Error = S::Error;
198
199    fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
200        let mut locals = LocalMap::new();
201        let waker = current_as_waker();
202        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
203
204        match self.v02.poll_next(&mut cx) {
205            Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
206            Ok(Async02::Pending) => Ok(Async01::NotReady),
207            Err(err) => Err(err),
208        }
209    }
210}
211
212impl<I> AsyncIoIntoTokio for I {
213    fn into_tokio_compat<E>(self, exec: E) -> AsyncIo02AsTokio<E, Self>
214    where
215        Self: AsyncRead02 + AsyncWrite02 + Sized,
216        E: Executor02,
217    {
218        AsyncIo02AsTokio {
219            exec,
220            v02: self,
221        }
222    }
223}
224
225impl<E: Executor02, I: AsyncRead02> io::Read for AsyncIo02AsTokio<E, I> {
226    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
227        let mut locals = LocalMap::new();
228        let waker = current_as_waker();
229        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
230
231        match self.v02.poll_read(&mut cx, buf) {
232            Ok(Async02::Ready(val)) => Ok(val),
233            Ok(Async02::Pending) => Err(would_block()),
234            Err(err) => Err(err),
235        }
236    }
237}
238
239impl<E: Executor02, I: AsyncWrite02> io::Write for AsyncIo02AsTokio<E, I> {
240    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
241        let mut locals = LocalMap::new();
242        let waker = current_as_waker();
243        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
244
245        match self.v02.poll_write(&mut cx, buf) {
246            Ok(Async02::Ready(val)) => Ok(val),
247            Ok(Async02::Pending) => Err(would_block()),
248            Err(err) => Err(err),
249        }
250    }
251
252    fn flush(&mut self) -> io::Result<()> {
253        let mut locals = LocalMap::new();
254        let waker = current_as_waker();
255        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
256
257        match self.v02.poll_flush(&mut cx) {
258            Ok(Async02::Ready(val)) => Ok(val),
259            Ok(Async02::Pending) => Err(would_block()),
260            Err(err) => Err(err),
261        }
262    }
263}
264
265fn would_block() -> io::Error {
266    io::Error::from(io::ErrorKind::WouldBlock)
267}
268
269impl<E: Executor02, I: AsyncRead02> AsyncReadTk for AsyncIo02AsTokio<E, I> {
270    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
271        let init = self.v02.initializer();
272        if init.should_initialize() {
273            init.initialize(buf);
274            true
275        } else {
276            false
277        }
278    }
279}
280
281impl<E: Executor02, I: AsyncWrite02> AsyncWriteTk for AsyncIo02AsTokio<E, I> {
282    fn shutdown(&mut self) -> Poll01<(), io::Error> {
283        let mut locals = LocalMap::new();
284        let waker = current_as_waker();
285        let mut cx = Context::new(&mut locals, &waker, &mut self.exec);
286
287        match self.v02.poll_close(&mut cx) {
288            Ok(Async02::Ready(val)) => Ok(Async01::Ready(val)),
289            Ok(Async02::Pending) => Ok(Async01::NotReady),
290            Err(err) => Err(err),
291        }
292    }
293}
294
295// Maybe it's possible to do all this without cloning and allocating,
296// but I just wanted to get this working now. Optimzations welcome.
297
298fn current_as_waker() -> Waker {
299    Waker::from(Arc::new(Current(task01::current())))
300}
301
302struct Current(Task01);
303
304impl Wake for Current {
305    fn wake(arc_self: &Arc<Self>) {
306        arc_self.0.notify();
307    }
308}