futures_util/compat/
compat03as01.rs

1use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
2use futures_01::{
3    task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
4};
5#[cfg(feature = "sink")]
6use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
7use futures_core::{
8    future::TryFuture as TryFuture03,
9    stream::TryStream as TryStream03,
10    task::{RawWaker, RawWakerVTable},
11};
12#[cfg(feature = "sink")]
13use futures_sink::Sink as Sink03;
14#[cfg(feature = "sink")]
15use std::marker::PhantomData;
16use std::{mem, pin::Pin, sync::Arc, task::Context};
17
18#[allow(clippy::too_long_first_doc_paragraph)] // clippy bug, see https://github.com/rust-lang/rust-clippy/issues/13315
19/// Converts a futures 0.3 [`TryFuture`](futures_core::future::TryFuture) or
20/// [`TryStream`](futures_core::stream::TryStream) into a futures 0.1
21/// [`Future`](futures_01::future::Future) or
22/// [`Stream`](futures_01::stream::Stream).
23#[derive(Debug, Clone, Copy)]
24#[must_use = "futures do nothing unless you `.await` or poll them"]
25pub struct Compat<T> {
26    pub(crate) inner: T,
27}
28
29/// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
30/// [`Sink`](futures_01::sink::Sink).
31#[cfg(feature = "sink")]
32#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
33#[derive(Debug)]
34#[must_use = "sinks do nothing unless polled"]
35pub struct CompatSink<T, Item> {
36    inner: T,
37    _phantom: PhantomData<fn(Item)>,
38}
39
40impl<T> Compat<T> {
41    /// Creates a new [`Compat`].
42    ///
43    /// For types which implement appropriate futures `0.3`
44    /// traits, the result will be a type which implements
45    /// the corresponding futures 0.1 type.
46    pub fn new(inner: T) -> Self {
47        Self { inner }
48    }
49
50    /// Get a reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
51    /// contained within.
52    pub fn get_ref(&self) -> &T {
53        &self.inner
54    }
55
56    /// Get a mutable reference to 0.3 Future, Stream, AsyncRead, or AsyncWrite object
57    /// contained within.
58    pub fn get_mut(&mut self) -> &mut T {
59        &mut self.inner
60    }
61
62    /// Returns the inner item.
63    pub fn into_inner(self) -> T {
64        self.inner
65    }
66}
67
68#[cfg(feature = "sink")]
69impl<T, Item> CompatSink<T, Item> {
70    /// Creates a new [`CompatSink`].
71    pub fn new(inner: T) -> Self {
72        Self { inner, _phantom: PhantomData }
73    }
74
75    /// Get a reference to 0.3 Sink contained within.
76    pub fn get_ref(&self) -> &T {
77        &self.inner
78    }
79
80    /// Get a mutable reference to 0.3 Sink contained within.
81    pub fn get_mut(&mut self) -> &mut T {
82        &mut self.inner
83    }
84
85    /// Returns the inner item.
86    pub fn into_inner(self) -> T {
87        self.inner
88    }
89}
90
91fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
92    match x? {
93        task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
94        task03::Poll::Pending => Ok(Async01::NotReady),
95    }
96}
97
98impl<Fut> Future01 for Compat<Fut>
99where
100    Fut: TryFuture03 + Unpin,
101{
102    type Item = Fut::Ok;
103    type Error = Fut::Error;
104
105    fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
106        with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
107    }
108}
109
110impl<St> Stream01 for Compat<St>
111where
112    St: TryStream03 + Unpin,
113{
114    type Item = St::Ok;
115    type Error = St::Error;
116
117    fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
118        with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
119            task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
120            task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
121            task03::Poll::Pending => Ok(Async01::NotReady),
122        })
123    }
124}
125
126#[cfg(feature = "sink")]
127impl<T, Item> Sink01 for CompatSink<T, Item>
128where
129    T: Sink03<Item> + Unpin,
130{
131    type SinkItem = Item;
132    type SinkError = T::Error;
133
134    fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
135        with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
136            task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
137            task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
138        })
139    }
140
141    fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
142        with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
143    }
144
145    fn close(&mut self) -> Poll01<(), Self::SinkError> {
146        with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
147    }
148}
149
150#[derive(Clone)]
151struct Current(task01::Task);
152
153impl Current {
154    fn new() -> Self {
155        Self(task01::current())
156    }
157
158    fn as_waker(&self) -> WakerRef<'_> {
159        unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
160            unsafe { &*(ptr as *const Current) }
161        }
162        fn current_to_ptr(current: &Current) -> *const () {
163            current as *const Current as *const ()
164        }
165
166        unsafe fn clone(ptr: *const ()) -> RawWaker {
167            // Lazily create the `Arc` only when the waker is actually cloned.
168            // FIXME: remove `transmute` when a `Waker` -> `RawWaker` conversion
169            // function is landed in `core`.
170            unsafe {
171                mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
172                    ptr_to_current(ptr).clone(),
173                )))
174            }
175        }
176        unsafe fn drop(_: *const ()) {}
177        unsafe fn wake(ptr: *const ()) {
178            unsafe { ptr_to_current(ptr).0.notify() }
179        }
180
181        let ptr = current_to_ptr(self);
182        let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
183        WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
184            task03::Waker::from_raw(RawWaker::new(ptr, vtable))
185        }))
186    }
187}
188
189impl ArcWake03 for Current {
190    fn wake_by_ref(arc_self: &Arc<Self>) {
191        arc_self.0.notify();
192    }
193}
194
195fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
196where
197    T: Unpin,
198    F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
199{
200    let current = Current::new();
201    let waker = current.as_waker();
202    let mut cx = Context::from_waker(&waker);
203    f(Pin::new(&mut compat.inner), &mut cx)
204}
205
206#[cfg(feature = "sink")]
207fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
208where
209    T: Unpin,
210    F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
211{
212    let current = Current::new();
213    let waker = current.as_waker();
214    let mut cx = Context::from_waker(&waker);
215    f(Pin::new(&mut compat.inner), &mut cx)
216}
217
218#[cfg(feature = "io-compat")]
219#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
220mod io {
221    use super::*;
222    use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
223    use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
224
225    fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
226        match x {
227            task03::Poll::Ready(Ok(t)) => Ok(t),
228            task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
229            task03::Poll::Ready(Err(e)) => Err(e),
230        }
231    }
232
233    impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
234        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
235            let current = Current::new();
236            let waker = current.as_waker();
237            let mut cx = Context::from_waker(&waker);
238            poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
239        }
240    }
241
242    impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
243
244    impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
245        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
246            let current = Current::new();
247            let waker = current.as_waker();
248            let mut cx = Context::from_waker(&waker);
249            poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
250        }
251
252        fn flush(&mut self) -> std::io::Result<()> {
253            let current = Current::new();
254            let waker = current.as_waker();
255            let mut cx = Context::from_waker(&waker);
256            poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
257        }
258    }
259
260    impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
261        fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
262            let current = Current::new();
263            let waker = current.as_waker();
264            let mut cx = Context::from_waker(&waker);
265            poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
266        }
267    }
268}