1#![allow(dead_code)]
2use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7 time::{Duration, Instant},
8};
9
10use hyper::rt::{Sleep, Timer};
11use pin_project_lite::pin_project;
12
13#[derive(Clone)]
14pub struct TokioExecutor;
16
17impl<F> hyper::rt::Executor<F> for TokioExecutor
18where
19 F: std::future::Future + Send + 'static,
20 F::Output: Send + 'static,
21{
22 fn execute(&self, fut: F) {
23 tokio::task::spawn(fut);
24 }
25}
26
27#[derive(Clone, Debug)]
30pub struct TokioTimer;
31
32impl Timer for TokioTimer {
33 fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
34 Box::pin(TokioSleep {
35 inner: tokio::time::sleep(duration),
36 })
37 }
38
39 fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
40 Box::pin(TokioSleep {
41 inner: tokio::time::sleep_until(deadline.into()),
42 })
43 }
44
45 fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
46 if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
47 sleep.reset(new_deadline)
48 }
49 }
50}
51
52pin_project! {
55 pub(crate) struct TokioSleep {
56 #[pin]
57 pub(crate) inner: tokio::time::Sleep,
58 }
59}
60
61impl Future for TokioSleep {
62 type Output = ();
63
64 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65 self.project().inner.poll(cx)
66 }
67}
68
69impl Sleep for TokioSleep {}
70
71impl TokioSleep {
72 pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
73 self.project().inner.as_mut().reset(deadline.into());
74 }
75}
76
77pin_project! {
78 #[derive(Debug)]
79 pub struct TokioIo<T> {
80 #[pin]
81 inner: T,
82 }
83}
84
85impl<T> TokioIo<T> {
86 pub fn new(inner: T) -> Self {
87 Self { inner }
88 }
89
90 pub fn inner(self) -> T {
91 self.inner
92 }
93}
94
95impl<T> hyper::rt::Read for TokioIo<T>
96where
97 T: tokio::io::AsyncRead,
98{
99 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: hyper::rt::ReadBufCursor<'_>) -> Poll<Result<(), std::io::Error>> {
100 let n = unsafe {
101 let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
102 match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
103 Poll::Ready(Ok(())) => tbuf.filled().len(),
104 other => return other,
105 }
106 };
107
108 unsafe {
109 buf.advance(n);
110 }
111 Poll::Ready(Ok(()))
112 }
113}
114
115impl<T> hyper::rt::Write for TokioIo<T>
116where
117 T: tokio::io::AsyncWrite,
118{
119 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>> {
120 tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
121 }
122
123 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
124 tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
125 }
126
127 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
128 tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
129 }
130
131 fn is_write_vectored(&self) -> bool {
132 tokio::io::AsyncWrite::is_write_vectored(&self.inner)
133 }
134
135 fn poll_write_vectored(
136 self: Pin<&mut Self>,
137 cx: &mut Context<'_>,
138 bufs: &[std::io::IoSlice<'_>],
139 ) -> Poll<Result<usize, std::io::Error>> {
140 tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
141 }
142}
143
144impl<T> tokio::io::AsyncRead for TokioIo<T>
145where
146 T: hyper::rt::Read,
147{
148 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, tbuf: &mut tokio::io::ReadBuf<'_>) -> Poll<Result<(), std::io::Error>> {
149 let filled = tbuf.filled().len();
151 let sub_filled = unsafe {
152 let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
153
154 match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
155 Poll::Ready(Ok(())) => buf.filled().len(),
156 other => return other,
157 }
158 };
159
160 let n_filled = filled + sub_filled;
161 let n_init = sub_filled;
163 unsafe {
164 tbuf.assume_init(n_init);
165 tbuf.set_filled(n_filled);
166 }
167
168 Poll::Ready(Ok(()))
169 }
170}
171
172impl<T> tokio::io::AsyncWrite for TokioIo<T>
173where
174 T: hyper::rt::Write,
175{
176 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>> {
177 hyper::rt::Write::poll_write(self.project().inner, cx, buf)
178 }
179
180 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
181 hyper::rt::Write::poll_flush(self.project().inner, cx)
182 }
183
184 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
185 hyper::rt::Write::poll_shutdown(self.project().inner, cx)
186 }
187
188 fn is_write_vectored(&self) -> bool {
189 hyper::rt::Write::is_write_vectored(&self.inner)
190 }
191
192 fn poll_write_vectored(
193 self: Pin<&mut Self>,
194 cx: &mut Context<'_>,
195 bufs: &[std::io::IoSlice<'_>],
196 ) -> Poll<Result<usize, std::io::Error>> {
197 hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
198 }
199}