1use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
2use futures_01::{
3 task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
4};
5#[cfg(feature = "sink")]
6use futures_01::{AsyncSink as AsyncSink01, Sink as Sink01, StartSend as StartSend01};
7use futures_core::{
8 future::TryFuture as TryFuture03,
9 stream::TryStream as TryStream03,
10 task::{RawWaker, RawWakerVTable},
11};
12#[cfg(feature = "sink")]
13use futures_sink::Sink as Sink03;
14#[cfg(feature = "sink")]
15use std::marker::PhantomData;
16use std::{mem, pin::Pin, sync::Arc, task::Context};
17
18#[allow(clippy::too_long_first_doc_paragraph)] #[derive(Debug, Clone, Copy)]
24#[must_use = "futures do nothing unless you `.await` or poll them"]
25pub struct Compat<T> {
26 pub(crate) inner: T,
27}
28
29#[cfg(feature = "sink")]
32#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
33#[derive(Debug)]
34#[must_use = "sinks do nothing unless polled"]
35pub struct CompatSink<T, Item> {
36 inner: T,
37 _phantom: PhantomData<fn(Item)>,
38}
39
40impl<T> Compat<T> {
41 pub fn new(inner: T) -> Self {
47 Self { inner }
48 }
49
50 pub fn get_ref(&self) -> &T {
53 &self.inner
54 }
55
56 pub fn get_mut(&mut self) -> &mut T {
59 &mut self.inner
60 }
61
62 pub fn into_inner(self) -> T {
64 self.inner
65 }
66}
67
68#[cfg(feature = "sink")]
69impl<T, Item> CompatSink<T, Item> {
70 pub fn new(inner: T) -> Self {
72 Self { inner, _phantom: PhantomData }
73 }
74
75 pub fn get_ref(&self) -> &T {
77 &self.inner
78 }
79
80 pub fn get_mut(&mut self) -> &mut T {
82 &mut self.inner
83 }
84
85 pub fn into_inner(self) -> T {
87 self.inner
88 }
89}
90
91fn poll_03_to_01<T, E>(x: task03::Poll<Result<T, E>>) -> Result<Async01<T>, E> {
92 match x? {
93 task03::Poll::Ready(t) => Ok(Async01::Ready(t)),
94 task03::Poll::Pending => Ok(Async01::NotReady),
95 }
96}
97
98impl<Fut> Future01 for Compat<Fut>
99where
100 Fut: TryFuture03 + Unpin,
101{
102 type Item = Fut::Ok;
103 type Error = Fut::Error;
104
105 fn poll(&mut self) -> Poll01<Self::Item, Self::Error> {
106 with_context(self, |inner, cx| poll_03_to_01(inner.try_poll(cx)))
107 }
108}
109
110impl<St> Stream01 for Compat<St>
111where
112 St: TryStream03 + Unpin,
113{
114 type Item = St::Ok;
115 type Error = St::Error;
116
117 fn poll(&mut self) -> Poll01<Option<Self::Item>, Self::Error> {
118 with_context(self, |inner, cx| match inner.try_poll_next(cx)? {
119 task03::Poll::Ready(None) => Ok(Async01::Ready(None)),
120 task03::Poll::Ready(Some(t)) => Ok(Async01::Ready(Some(t))),
121 task03::Poll::Pending => Ok(Async01::NotReady),
122 })
123 }
124}
125
126#[cfg(feature = "sink")]
127impl<T, Item> Sink01 for CompatSink<T, Item>
128where
129 T: Sink03<Item> + Unpin,
130{
131 type SinkItem = Item;
132 type SinkError = T::Error;
133
134 fn start_send(&mut self, item: Self::SinkItem) -> StartSend01<Self::SinkItem, Self::SinkError> {
135 with_sink_context(self, |mut inner, cx| match inner.as_mut().poll_ready(cx)? {
136 task03::Poll::Ready(()) => inner.start_send(item).map(|()| AsyncSink01::Ready),
137 task03::Poll::Pending => Ok(AsyncSink01::NotReady(item)),
138 })
139 }
140
141 fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
142 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_flush(cx)))
143 }
144
145 fn close(&mut self) -> Poll01<(), Self::SinkError> {
146 with_sink_context(self, |inner, cx| poll_03_to_01(inner.poll_close(cx)))
147 }
148}
149
150#[derive(Clone)]
151struct Current(task01::Task);
152
153impl Current {
154 fn new() -> Self {
155 Self(task01::current())
156 }
157
158 fn as_waker(&self) -> WakerRef<'_> {
159 unsafe fn ptr_to_current<'a>(ptr: *const ()) -> &'a Current {
160 unsafe { &*(ptr as *const Current) }
161 }
162 fn current_to_ptr(current: &Current) -> *const () {
163 current as *const Current as *const ()
164 }
165
166 unsafe fn clone(ptr: *const ()) -> RawWaker {
167 unsafe {
171 mem::transmute::<task03::Waker, RawWaker>(task03::waker(Arc::new(
172 ptr_to_current(ptr).clone(),
173 )))
174 }
175 }
176 unsafe fn drop(_: *const ()) {}
177 unsafe fn wake(ptr: *const ()) {
178 unsafe { ptr_to_current(ptr).0.notify() }
179 }
180
181 let ptr = current_to_ptr(self);
182 let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
183 WakerRef::new_unowned(std::mem::ManuallyDrop::new(unsafe {
184 task03::Waker::from_raw(RawWaker::new(ptr, vtable))
185 }))
186 }
187}
188
189impl ArcWake03 for Current {
190 fn wake_by_ref(arc_self: &Arc<Self>) {
191 arc_self.0.notify();
192 }
193}
194
195fn with_context<T, R, F>(compat: &mut Compat<T>, f: F) -> R
196where
197 T: Unpin,
198 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
199{
200 let current = Current::new();
201 let waker = current.as_waker();
202 let mut cx = Context::from_waker(&waker);
203 f(Pin::new(&mut compat.inner), &mut cx)
204}
205
206#[cfg(feature = "sink")]
207fn with_sink_context<T, Item, R, F>(compat: &mut CompatSink<T, Item>, f: F) -> R
208where
209 T: Unpin,
210 F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> R,
211{
212 let current = Current::new();
213 let waker = current.as_waker();
214 let mut cx = Context::from_waker(&waker);
215 f(Pin::new(&mut compat.inner), &mut cx)
216}
217
218#[cfg(feature = "io-compat")]
219#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
220mod io {
221 use super::*;
222 use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
223 use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
224
225 fn poll_03_to_io<T>(x: task03::Poll<Result<T, std::io::Error>>) -> Result<T, std::io::Error> {
226 match x {
227 task03::Poll::Ready(Ok(t)) => Ok(t),
228 task03::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
229 task03::Poll::Ready(Err(e)) => Err(e),
230 }
231 }
232
233 impl<R: AsyncRead03 + Unpin> std::io::Read for Compat<R> {
234 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
235 let current = Current::new();
236 let waker = current.as_waker();
237 let mut cx = Context::from_waker(&waker);
238 poll_03_to_io(Pin::new(&mut self.inner).poll_read(&mut cx, buf))
239 }
240 }
241
242 impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
243
244 impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
245 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
246 let current = Current::new();
247 let waker = current.as_waker();
248 let mut cx = Context::from_waker(&waker);
249 poll_03_to_io(Pin::new(&mut self.inner).poll_write(&mut cx, buf))
250 }
251
252 fn flush(&mut self) -> std::io::Result<()> {
253 let current = Current::new();
254 let waker = current.as_waker();
255 let mut cx = Context::from_waker(&waker);
256 poll_03_to_io(Pin::new(&mut self.inner).poll_flush(&mut cx))
257 }
258 }
259
260 impl<W: AsyncWrite03 + Unpin> AsyncWrite01 for Compat<W> {
261 fn shutdown(&mut self) -> std::io::Result<Async01<()>> {
262 let current = Current::new();
263 let waker = current.as_waker();
264 let mut cx = Context::from_waker(&waker);
265 poll_03_to_01(Pin::new(&mut self.inner).poll_close(&mut cx))
266 }
267 }
268}