Skip to main content

vibeio_hyper/
lib.rs

1//! A compatibility layer for using [`hyper`] with the `vibeio` async runtime.
2//!
3//! This crate provides the necessary adapters to run `hyper`-based HTTP servers
4//! and clients on top of `vibeio` instead of `tokio`. It implements the traits
5//! required by `hyper`'s runtime abstraction:
6//!
7//! - [`VibeioExecutor`]: An executor that spawns tasks on the `vibeio` runtime
8//!   using `vibeio::spawn`.
9//! - [`VibeioTimer`]: A timer that uses `vibeio::time::sleep` for delay operations.
10//! - [`VibeioIo`]: A wrapper type that adapts `vibeio`'s I/O types to implement
11//!   `hyper`'s `Read` and `Write` traits, and also implements `tokio`'s
12//!   `AsyncRead` and `AsyncWrite` traits for compatibility.
13//!
14//! # Overview
15//!
16//! This crate enables `hyper` to work with `vibeio` by implementing `hyper`'s
17//! runtime traits (`Executor`, `Timer`, `Read`, `Write`, `Sleep`) in terms of
18//! `vibeio` primitives.
19//!
20//! ## Executor
21//!
22//! The [`VibeioExecutor`] type implements `hyper::rt::Executor` by spawning
23//! futures onto the `vibeio` runtime via `vibeio::spawn`.
24//!
25//! ## Timer
26//!
27//! The [`VibeioTimer`] type implements `hyper::rt::Timer` by converting sleep
28//! requests into `vibeio::time::sleep` futures wrapped in a compatible type.
29//!
30//! ## I/O Adapters
31//!
32//! The [`VibeioIo<T>`] wrapper adapts any type that implements `tokio::io::AsyncRead`
33//! and `tokio::io::AsyncWrite` to work with `hyper`'s I/O traits. It also
34//! implements the reverse conversion, allowing `hyper`'s I/O types to be used
35//! with `tokio`-style async functions.
36//!
37//! # Implementation notes
38//!
39//! - The `VibeioIo` wrapper uses `Pin<Box<T>>` internally to support the
40//!   trait implementations required by `hyper` and `tokio`.
41//! - The `VibeioSleep` type (internal) implements both `hyper::rt::Sleep`
42//!   and `std::future::Future` to bridge the two runtimes' sleep abstractions.
43//! - Timer handles are properly cancelled when `VibeioSleep` is dropped to
44//!   avoid resource leaks.
45
46use std::{
47    ops::{Deref, DerefMut},
48    pin::Pin,
49    task::{Context, Poll},
50};
51
52use hyper::rt::{Executor, Sleep, Timer};
53use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
54
55/// An executor that spawns tasks onto the `vibeio` runtime.
56///
57/// This type implements `hyper::rt::Executor` and uses `vibeio::spawn` to
58/// execute futures on the `vibeio` runtime.
59#[derive(Debug, Default, Clone, Copy)]
60pub struct VibeioExecutor;
61
62impl<Fut> Executor<Fut> for VibeioExecutor
63where
64    Fut: std::future::Future + 'static,
65    Fut::Output: 'static,
66{
67    #[inline]
68    fn execute(&self, fut: Fut) {
69        vibeio::spawn(fut);
70    }
71}
72
73/// A timer that uses `vibeio`'s time utilities for sleep operations.
74///
75/// This type implements `hyper::rt::Timer` and uses `vibeio::time::sleep`
76/// to implement delay operations.
77#[derive(Debug, Default, Clone, Copy)]
78pub struct VibeioTimer;
79
80impl Timer for VibeioTimer {
81    #[inline]
82    fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>> {
83        Box::pin(VibeioSleep {
84            inner: Box::pin(vibeio::time::sleep(duration)),
85        })
86    }
87
88    #[inline]
89    fn sleep_until(&self, deadline: std::time::Instant) -> Pin<Box<dyn Sleep>> {
90        Box::pin(VibeioSleep {
91            inner: Box::pin(vibeio::time::sleep_until(deadline)),
92        })
93    }
94
95    #[inline]
96    fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: std::time::Instant) {
97        if let Some(mut sleep) = sleep.as_mut().downcast_mut_pin::<VibeioSleep>() {
98            sleep.reset(new_deadline);
99        }
100    }
101}
102
103/// A sleep future that wraps `vibeio::time::Sleep` and implements `hyper::rt::Sleep`.
104struct VibeioSleep {
105    inner: Pin<Box<vibeio::time::Sleep>>,
106}
107
108impl std::future::Future for VibeioSleep {
109    type Output = ();
110
111    #[inline]
112    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113        self.inner.as_mut().poll(cx)
114    }
115}
116
117impl Sleep for VibeioSleep {}
118
119unsafe impl Send for VibeioSleep {}
120unsafe impl Sync for VibeioSleep {}
121
122impl VibeioSleep {
123    #[inline]
124    fn reset(&mut self, new_deadline: std::time::Instant) {
125        self.inner.reset(new_deadline);
126    }
127}
128
129/// A wrapper type that adapts I/O types for use with `hyper` and `tokio`.
130///
131/// `VibeioIo<T>` wraps any type `T` that implements `tokio::io::AsyncRead`
132/// and `tokio::io::AsyncWrite` and provides implementations for:
133///
134/// - `hyper::rt::Read` and `hyper::rt::Write`
135/// - `tokio::io::AsyncRead` and `tokio::io::AsyncWrite`
136///
137/// This allows seamless interoperability between `hyper`'s I/O traits and
138/// `tokio`'s async I/O traits when using the `vibeio` runtime.
139#[derive(Debug)]
140pub struct VibeioIo<T> {
141    inner: Pin<Box<T>>,
142}
143
144impl<T> VibeioIo<T> {
145    /// Creates a new `VibeioIo` wrapper around the given I/O type.
146    #[inline]
147    pub fn new(inner: T) -> Self {
148        Self {
149            inner: Box::pin(inner),
150        }
151    }
152}
153
154impl<T> Deref for VibeioIo<T> {
155    type Target = Pin<Box<T>>;
156
157    #[inline]
158    fn deref(&self) -> &Self::Target {
159        &self.inner
160    }
161}
162
163impl<T> DerefMut for VibeioIo<T> {
164    #[inline]
165    fn deref_mut(&mut self) -> &mut Self::Target {
166        &mut self.inner
167    }
168}
169
170// Implement hyper::rt::Read for types that implement tokio::io::AsyncRead
171impl<T> hyper::rt::Read for VibeioIo<T>
172where
173    T: AsyncRead,
174{
175    #[inline]
176    fn poll_read(
177        mut self: Pin<&mut Self>,
178        cx: &mut Context<'_>,
179        mut buf: hyper::rt::ReadBufCursor<'_>,
180    ) -> Poll<Result<(), std::io::Error>> {
181        let n = {
182            let mut tbuf = unsafe { ReadBuf::uninit(buf.as_mut()) };
183            match self.inner.as_mut().poll_read(cx, &mut tbuf) {
184                Poll::Ready(Ok(_)) => tbuf.filled().len(),
185                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
186                Poll::Pending => return Poll::Pending,
187            }
188        };
189
190        unsafe { buf.advance(n) };
191        Poll::Ready(Ok(()))
192    }
193}
194
195// Implement hyper::rt::Write for types that implement tokio::io::AsyncWrite
196impl<T> hyper::rt::Write for VibeioIo<T>
197where
198    T: AsyncWrite,
199{
200    #[inline]
201    fn poll_write(
202        mut self: Pin<&mut Self>,
203        cx: &mut Context<'_>,
204        buf: &[u8],
205    ) -> Poll<Result<usize, std::io::Error>> {
206        self.inner.as_mut().poll_write(cx, buf)
207    }
208
209    #[inline]
210    fn poll_flush(
211        mut self: Pin<&mut Self>,
212        cx: &mut Context<'_>,
213    ) -> Poll<Result<(), std::io::Error>> {
214        self.inner.as_mut().poll_flush(cx)
215    }
216
217    #[inline]
218    fn poll_shutdown(
219        mut self: Pin<&mut Self>,
220        cx: &mut Context<'_>,
221    ) -> Poll<Result<(), std::io::Error>> {
222        self.inner.as_mut().poll_shutdown(cx)
223    }
224
225    #[inline]
226    fn is_write_vectored(&self) -> bool {
227        self.inner.is_write_vectored()
228    }
229
230    #[inline]
231    fn poll_write_vectored(
232        mut self: Pin<&mut Self>,
233        cx: &mut Context<'_>,
234        bufs: &[std::io::IoSlice<'_>],
235    ) -> Poll<Result<usize, std::io::Error>> {
236        self.inner.as_mut().poll_write_vectored(cx, bufs)
237    }
238}
239
240// Implement tokio::io::AsyncRead for types that implement hyper::rt::Read
241impl<T> AsyncRead for VibeioIo<T>
242where
243    T: hyper::rt::Read,
244{
245    #[inline]
246    fn poll_read(
247        mut self: Pin<&mut Self>,
248        cx: &mut Context<'_>,
249        tbuf: &mut ReadBuf<'_>,
250    ) -> Poll<std::io::Result<()>> {
251        let filled = tbuf.filled().len();
252        let sub_filled = {
253            let mut buf = unsafe { hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()) };
254            match self.inner.as_mut().poll_read(cx, buf.unfilled()) {
255                Poll::Ready(Ok(_)) => buf.filled().len(),
256                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
257                Poll::Pending => return Poll::Pending,
258            }
259        };
260
261        unsafe {
262            tbuf.assume_init(sub_filled);
263            tbuf.set_filled(filled + sub_filled);
264        };
265        Poll::Ready(Ok(()))
266    }
267}
268
269// Implement tokio::io::AsyncWrite for types that implement hyper::rt::Write
270impl<T> AsyncWrite for VibeioIo<T>
271where
272    T: hyper::rt::Write,
273{
274    #[inline]
275    fn poll_write(
276        mut self: Pin<&mut Self>,
277        cx: &mut Context<'_>,
278        buf: &[u8],
279    ) -> Poll<Result<usize, std::io::Error>> {
280        self.inner.as_mut().poll_write(cx, buf)
281    }
282
283    #[inline]
284    fn poll_flush(
285        mut self: Pin<&mut Self>,
286        cx: &mut Context<'_>,
287    ) -> Poll<Result<(), std::io::Error>> {
288        self.inner.as_mut().poll_flush(cx)
289    }
290
291    #[inline]
292    fn poll_shutdown(
293        mut self: Pin<&mut Self>,
294        cx: &mut Context<'_>,
295    ) -> Poll<Result<(), std::io::Error>> {
296        self.inner.as_mut().poll_shutdown(cx)
297    }
298
299    #[inline]
300    fn is_write_vectored(&self) -> bool {
301        self.inner.is_write_vectored()
302    }
303
304    #[inline]
305    fn poll_write_vectored(
306        mut self: Pin<&mut Self>,
307        cx: &mut Context<'_>,
308        bufs: &[std::io::IoSlice<'_>],
309    ) -> Poll<Result<usize, std::io::Error>> {
310        self.inner.as_mut().poll_write_vectored(cx, bufs)
311    }
312}