socks_hub/
tokiort.rs

1#![allow(dead_code)]
2//! Various runtimes for hyper
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7    time::{Duration, Instant},
8};
9
10use hyper::rt::{Sleep, Timer};
11use pin_project_lite::pin_project;
12
13#[derive(Clone)]
14/// An Executor that uses the tokio runtime.
15pub struct TokioExecutor;
16
17impl<F> hyper::rt::Executor<F> for TokioExecutor
18where
19    F: std::future::Future + Send + 'static,
20    F::Output: Send + 'static,
21{
22    fn execute(&self, fut: F) {
23        tokio::task::spawn(fut);
24    }
25}
26
27/// A Timer that uses the tokio runtime.
28
29#[derive(Clone, Debug)]
30pub struct TokioTimer;
31
32impl Timer for TokioTimer {
33    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
34        Box::pin(TokioSleep {
35            inner: tokio::time::sleep(duration),
36        })
37    }
38
39    fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
40        Box::pin(TokioSleep {
41            inner: tokio::time::sleep_until(deadline.into()),
42        })
43    }
44
45    fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
46        if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
47            sleep.reset(new_deadline)
48        }
49    }
50}
51
52// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
53// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
54pin_project! {
55    pub(crate) struct TokioSleep {
56        #[pin]
57        pub(crate) inner: tokio::time::Sleep,
58    }
59}
60
61impl Future for TokioSleep {
62    type Output = ();
63
64    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65        self.project().inner.poll(cx)
66    }
67}
68
69impl Sleep for TokioSleep {}
70
71impl TokioSleep {
72    pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
73        self.project().inner.as_mut().reset(deadline.into());
74    }
75}
76
77pin_project! {
78    #[derive(Debug)]
79    pub struct TokioIo<T> {
80        #[pin]
81        inner: T,
82    }
83}
84
85impl<T> TokioIo<T> {
86    pub fn new(inner: T) -> Self {
87        Self { inner }
88    }
89
90    pub fn inner(self) -> T {
91        self.inner
92    }
93}
94
95impl<T> hyper::rt::Read for TokioIo<T>
96where
97    T: tokio::io::AsyncRead,
98{
99    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: hyper::rt::ReadBufCursor<'_>) -> Poll<Result<(), std::io::Error>> {
100        let n = unsafe {
101            let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
102            match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
103                Poll::Ready(Ok(())) => tbuf.filled().len(),
104                other => return other,
105            }
106        };
107
108        unsafe {
109            buf.advance(n);
110        }
111        Poll::Ready(Ok(()))
112    }
113}
114
115impl<T> hyper::rt::Write for TokioIo<T>
116where
117    T: tokio::io::AsyncWrite,
118{
119    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>> {
120        tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
121    }
122
123    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
124        tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
125    }
126
127    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
128        tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
129    }
130
131    fn is_write_vectored(&self) -> bool {
132        tokio::io::AsyncWrite::is_write_vectored(&self.inner)
133    }
134
135    fn poll_write_vectored(
136        self: Pin<&mut Self>,
137        cx: &mut Context<'_>,
138        bufs: &[std::io::IoSlice<'_>],
139    ) -> Poll<Result<usize, std::io::Error>> {
140        tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
141    }
142}
143
144impl<T> tokio::io::AsyncRead for TokioIo<T>
145where
146    T: hyper::rt::Read,
147{
148    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, tbuf: &mut tokio::io::ReadBuf<'_>) -> Poll<Result<(), std::io::Error>> {
149        //let init = tbuf.initialized().len();
150        let filled = tbuf.filled().len();
151        let sub_filled = unsafe {
152            let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
153
154            match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
155                Poll::Ready(Ok(())) => buf.filled().len(),
156                other => return other,
157            }
158        };
159
160        let n_filled = filled + sub_filled;
161        // At least sub_filled bytes had to have been initialized.
162        let n_init = sub_filled;
163        unsafe {
164            tbuf.assume_init(n_init);
165            tbuf.set_filled(n_filled);
166        }
167
168        Poll::Ready(Ok(()))
169    }
170}
171
172impl<T> tokio::io::AsyncWrite for TokioIo<T>
173where
174    T: hyper::rt::Write,
175{
176    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>> {
177        hyper::rt::Write::poll_write(self.project().inner, cx, buf)
178    }
179
180    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
181        hyper::rt::Write::poll_flush(self.project().inner, cx)
182    }
183
184    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
185        hyper::rt::Write::poll_shutdown(self.project().inner, cx)
186    }
187
188    fn is_write_vectored(&self) -> bool {
189        hyper::rt::Write::is_write_vectored(&self.inner)
190    }
191
192    fn poll_write_vectored(
193        self: Pin<&mut Self>,
194        cx: &mut Context<'_>,
195        bufs: &[std::io::IoSlice<'_>],
196    ) -> Poll<Result<usize, std::io::Error>> {
197        hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
198    }
199}