hyper_rt/
lib.rs

1use std::future::Future;
2use std::io;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::{Duration, Instant};
6
7use futures_io::{AsyncRead, AsyncWrite};
8use hyper::rt::{Executor, Sleep, Timer};
9use pin_project_lite::pin_project;
10
11/// Future executor that utilises `awak` threads.
12#[non_exhaustive]
13#[derive(Default, Debug, Clone)]
14pub struct HyperExecutor {}
15
16pin_project! {
17    /// A wrapper that implements IO traits for an inner type that
18    /// implements hyper's IO traits, or vice versa (implements hyper's IO
19    /// traits for a type that implements IO traits).
20    #[derive(Debug)]
21    pub struct HyperIo<T> {
22        #[pin]
23        inner: T,
24    }
25}
26
27/// A Timer that uses the awak runtime.
28#[non_exhaustive]
29#[derive(Default, Clone, Debug)]
30pub struct HyperTimer;
31
32// Use HyperSleep to get awak::time::Sleep to implement Unpin.
33pin_project! {
34    struct HyperSleep {
35        #[pin]
36        inner: awak::time::Delay,
37    }
38}
39
40// ===== impl HyperExecutor =====
41
42impl<Fut> Executor<Fut> for HyperExecutor
43where
44    Fut: Future + Send + 'static,
45    Fut::Output: Send + 'static,
46{
47    fn execute(&self, fut: Fut) {
48        awak::spawn(fut).detach();
49    }
50}
51
52impl HyperExecutor {
53    /// Create new executor that relies on [`awak::spawn`] to execute futures.
54    pub fn new() -> Self {
55        Self {}
56    }
57}
58
59// ==== impl HyperIo =====
60
61impl<T> HyperIo<T> {
62    /// Wrap a type implementing hyper's IO traits.
63    pub fn new(inner: T) -> Self {
64        Self { inner }
65    }
66
67    /// Borrow the inner type.
68    pub fn inner(&self) -> &T {
69        &self.inner
70    }
71
72    /// Mut borrow the inner type.
73    pub fn inner_mut(&mut self) -> &mut T {
74        &mut self.inner
75    }
76
77    /// Consume this wrapper and get the inner type.
78    pub fn into_inner(self) -> T {
79        self.inner
80    }
81}
82
83impl<T> hyper::rt::Read for HyperIo<T>
84where
85    T: AsyncRead,
86{
87    fn poll_read(
88        self: Pin<&mut Self>,
89        cx: &mut Context<'_>,
90        mut buf: hyper::rt::ReadBufCursor<'_>,
91    ) -> Poll<io::Result<()>> {
92        let tbuf = unsafe { &mut *(buf.as_mut() as *mut _ as *mut [u8]) };
93        let n = ready!(self.project().inner.poll_read(cx, tbuf))?;
94        unsafe {
95            buf.advance(n);
96        }
97        Poll::Ready(Ok(()))
98    }
99}
100
101impl<T> hyper::rt::Write for HyperIo<T>
102where
103    T: AsyncWrite,
104{
105    fn poll_write(
106        self: Pin<&mut Self>,
107        cx: &mut Context<'_>,
108        buf: &[u8],
109    ) -> Poll<io::Result<usize>> {
110        self.project().inner.poll_write(cx, buf)
111    }
112
113    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
114        self.project().inner.poll_flush(cx)
115    }
116
117    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
118        self.project().inner.poll_close(cx)
119    }
120
121    fn poll_write_vectored(
122        self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124        bufs: &[io::IoSlice<'_>],
125    ) -> Poll<io::Result<usize>> {
126        self.project().inner.poll_write_vectored(cx, bufs)
127    }
128}
129
130impl<T> AsyncRead for HyperIo<T>
131where
132    T: hyper::rt::Read,
133{
134    fn poll_read(
135        self: Pin<&mut Self>,
136        cx: &mut Context<'_>,
137        tbuf: &mut [u8],
138    ) -> Poll<io::Result<usize>> {
139        let mut buf = hyper::rt::ReadBuf::new(tbuf);
140        ready!(self.project().inner.poll_read(cx, buf.unfilled()))?;
141        Poll::Ready(Ok(buf.filled().len()))
142    }
143}
144
145impl<T> AsyncWrite for HyperIo<T>
146where
147    T: hyper::rt::Write,
148{
149    fn poll_write(
150        self: Pin<&mut Self>,
151        cx: &mut Context<'_>,
152        buf: &[u8],
153    ) -> Poll<io::Result<usize>> {
154        hyper::rt::Write::poll_write(self.project().inner, cx, buf)
155    }
156
157    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
158        hyper::rt::Write::poll_flush(self.project().inner, cx)
159    }
160
161    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
162        hyper::rt::Write::poll_shutdown(self.project().inner, cx)
163    }
164
165    fn poll_write_vectored(
166        self: Pin<&mut Self>,
167        cx: &mut Context<'_>,
168        bufs: &[io::IoSlice<'_>],
169    ) -> Poll<Result<usize, io::Error>> {
170        hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
171    }
172}
173
174// ==== impl HyperTimer =====
175
176impl Timer for HyperTimer {
177    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
178        Box::pin(HyperSleep {
179            inner: awak::time::delay_for(duration),
180        })
181    }
182
183    fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
184        Box::pin(HyperSleep {
185            inner: awak::time::delay_until(deadline),
186        })
187    }
188
189    fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
190        if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<HyperSleep>() {
191            sleep.reset(new_deadline)
192        }
193    }
194}
195
196impl HyperTimer {
197    /// Create a new HyperTimer
198    pub fn new() -> Self {
199        Self {}
200    }
201}
202
203impl Future for HyperSleep {
204    type Output = ();
205
206    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207        self.project().inner.poll(cx)
208    }
209}
210
211impl Sleep for HyperSleep {}
212
213impl HyperSleep {
214    fn reset(self: Pin<&mut Self>, deadline: Instant) {
215        self.project().inner.as_mut().reset(deadline);
216    }
217}