1use std::{
47 ops::{Deref, DerefMut},
48 pin::Pin,
49 task::{Context, Poll},
50};
51
52use hyper::rt::{Executor, Sleep, Timer};
53use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
54
55#[derive(Debug, Default, Clone, Copy)]
60pub struct VibeioExecutor;
61
62impl<Fut> Executor<Fut> for VibeioExecutor
63where
64 Fut: std::future::Future + 'static,
65 Fut::Output: 'static,
66{
67 #[inline]
68 fn execute(&self, fut: Fut) {
69 vibeio::spawn(fut);
70 }
71}
72
73#[derive(Debug, Default, Clone, Copy)]
78pub struct VibeioTimer;
79
80impl Timer for VibeioTimer {
81 #[inline]
82 fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>> {
83 Box::pin(VibeioSleep {
84 inner: Box::pin(vibeio::time::sleep(duration)),
85 })
86 }
87
88 #[inline]
89 fn sleep_until(&self, deadline: std::time::Instant) -> Pin<Box<dyn Sleep>> {
90 Box::pin(VibeioSleep {
91 inner: Box::pin(vibeio::time::sleep_until(deadline)),
92 })
93 }
94
95 #[inline]
96 fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: std::time::Instant) {
97 if let Some(mut sleep) = sleep.as_mut().downcast_mut_pin::<VibeioSleep>() {
98 sleep.reset(new_deadline);
99 }
100 }
101}
102
103struct VibeioSleep {
105 inner: Pin<Box<vibeio::time::Sleep>>,
106}
107
108impl std::future::Future for VibeioSleep {
109 type Output = ();
110
111 #[inline]
112 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113 self.inner.as_mut().poll(cx)
114 }
115}
116
117impl Sleep for VibeioSleep {}
118
119unsafe impl Send for VibeioSleep {}
120unsafe impl Sync for VibeioSleep {}
121
122impl VibeioSleep {
123 #[inline]
124 fn reset(&mut self, new_deadline: std::time::Instant) {
125 self.inner.reset(new_deadline);
126 }
127}
128
129#[derive(Debug)]
140pub struct VibeioIo<T> {
141 inner: Pin<Box<T>>,
142}
143
144impl<T> VibeioIo<T> {
145 #[inline]
147 pub fn new(inner: T) -> Self {
148 Self {
149 inner: Box::pin(inner),
150 }
151 }
152}
153
154impl<T> Deref for VibeioIo<T> {
155 type Target = Pin<Box<T>>;
156
157 #[inline]
158 fn deref(&self) -> &Self::Target {
159 &self.inner
160 }
161}
162
163impl<T> DerefMut for VibeioIo<T> {
164 #[inline]
165 fn deref_mut(&mut self) -> &mut Self::Target {
166 &mut self.inner
167 }
168}
169
170impl<T> hyper::rt::Read for VibeioIo<T>
172where
173 T: AsyncRead,
174{
175 #[inline]
176 fn poll_read(
177 mut self: Pin<&mut Self>,
178 cx: &mut Context<'_>,
179 mut buf: hyper::rt::ReadBufCursor<'_>,
180 ) -> Poll<Result<(), std::io::Error>> {
181 let n = {
182 let mut tbuf = unsafe { ReadBuf::uninit(buf.as_mut()) };
183 match self.inner.as_mut().poll_read(cx, &mut tbuf) {
184 Poll::Ready(Ok(_)) => tbuf.filled().len(),
185 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
186 Poll::Pending => return Poll::Pending,
187 }
188 };
189
190 unsafe { buf.advance(n) };
191 Poll::Ready(Ok(()))
192 }
193}
194
195impl<T> hyper::rt::Write for VibeioIo<T>
197where
198 T: AsyncWrite,
199{
200 #[inline]
201 fn poll_write(
202 mut self: Pin<&mut Self>,
203 cx: &mut Context<'_>,
204 buf: &[u8],
205 ) -> Poll<Result<usize, std::io::Error>> {
206 self.inner.as_mut().poll_write(cx, buf)
207 }
208
209 #[inline]
210 fn poll_flush(
211 mut self: Pin<&mut Self>,
212 cx: &mut Context<'_>,
213 ) -> Poll<Result<(), std::io::Error>> {
214 self.inner.as_mut().poll_flush(cx)
215 }
216
217 #[inline]
218 fn poll_shutdown(
219 mut self: Pin<&mut Self>,
220 cx: &mut Context<'_>,
221 ) -> Poll<Result<(), std::io::Error>> {
222 self.inner.as_mut().poll_shutdown(cx)
223 }
224
225 #[inline]
226 fn is_write_vectored(&self) -> bool {
227 self.inner.is_write_vectored()
228 }
229
230 #[inline]
231 fn poll_write_vectored(
232 mut self: Pin<&mut Self>,
233 cx: &mut Context<'_>,
234 bufs: &[std::io::IoSlice<'_>],
235 ) -> Poll<Result<usize, std::io::Error>> {
236 self.inner.as_mut().poll_write_vectored(cx, bufs)
237 }
238}
239
240impl<T> AsyncRead for VibeioIo<T>
242where
243 T: hyper::rt::Read,
244{
245 #[inline]
246 fn poll_read(
247 mut self: Pin<&mut Self>,
248 cx: &mut Context<'_>,
249 tbuf: &mut ReadBuf<'_>,
250 ) -> Poll<std::io::Result<()>> {
251 let filled = tbuf.filled().len();
252 let sub_filled = {
253 let mut buf = unsafe { hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()) };
254 match self.inner.as_mut().poll_read(cx, buf.unfilled()) {
255 Poll::Ready(Ok(_)) => buf.filled().len(),
256 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
257 Poll::Pending => return Poll::Pending,
258 }
259 };
260
261 unsafe {
262 tbuf.assume_init(sub_filled);
263 tbuf.set_filled(filled + sub_filled);
264 };
265 Poll::Ready(Ok(()))
266 }
267}
268
269impl<T> AsyncWrite for VibeioIo<T>
271where
272 T: hyper::rt::Write,
273{
274 #[inline]
275 fn poll_write(
276 mut self: Pin<&mut Self>,
277 cx: &mut Context<'_>,
278 buf: &[u8],
279 ) -> Poll<Result<usize, std::io::Error>> {
280 self.inner.as_mut().poll_write(cx, buf)
281 }
282
283 #[inline]
284 fn poll_flush(
285 mut self: Pin<&mut Self>,
286 cx: &mut Context<'_>,
287 ) -> Poll<Result<(), std::io::Error>> {
288 self.inner.as_mut().poll_flush(cx)
289 }
290
291 #[inline]
292 fn poll_shutdown(
293 mut self: Pin<&mut Self>,
294 cx: &mut Context<'_>,
295 ) -> Poll<Result<(), std::io::Error>> {
296 self.inner.as_mut().poll_shutdown(cx)
297 }
298
299 #[inline]
300 fn is_write_vectored(&self) -> bool {
301 self.inner.is_write_vectored()
302 }
303
304 #[inline]
305 fn poll_write_vectored(
306 mut self: Pin<&mut Self>,
307 cx: &mut Context<'_>,
308 bufs: &[std::io::IoSlice<'_>],
309 ) -> Poll<Result<usize, std::io::Error>> {
310 self.inner.as_mut().poll_write_vectored(cx, bufs)
311 }
312}