hyper_fahrenheit/
lib.rs

1//! Have you ever wondered if you could run [hyper](https://docs.rs/hyper) on
2//! [fahrenheit](https://docs.rs/fahrenheit/)?
3//! I bet you haven't, but yes, you can (but please don't).
4//!
5//! ## Example:
6//! ```
7//! use fahrenheit;
8//! use hyper::{Client, Uri};
9//! use hyper_fahrenheit::{Connector, FahrenheitExecutor};
10//!
11//! fahrenheit::run(async move {
12//!   let client: Client<Connector, hyper::Body> = Client::builder()
13//!       .executor(FahrenheitExecutor)
14//!       .build(Connector);
15//!   let res = client
16//!       .get(Uri::from_static("http://httpbin.org/ip"))
17//!       .await
18//!       .unwrap();
19//!   println!("status: {}", res.status());
20//!   let buf = hyper::body::to_bytes(res).await.unwrap();
21//!   println!("body: {:?}", buf);
22//! });
23//! ```
24
25use futures_io::{AsyncRead, AsyncWrite};
26use futures_util::future::BoxFuture;
27use hyper::rt::Executor;
28use hyper::{
29    client::connect::{Connected, Connection},
30    service::Service,
31    Uri,
32};
33use std::io::Error;
34use std::net::ToSocketAddrs;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37
38/// Wraps fahrenheit's TcpStream for hyper's pleasure.
39pub struct AsyncTcpStream(fahrenheit::AsyncTcpStream);
40
41impl AsyncTcpStream {
42    pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<AsyncTcpStream, std::io::Error> {
43        Ok(AsyncTcpStream(fahrenheit::AsyncTcpStream::connect(addr)?))
44    }
45}
46
47// Hyper needs this.
48impl tokio::io::AsyncRead for AsyncTcpStream {
49    fn poll_read(
50        self: Pin<&mut Self>,
51        ctx: &mut Context,
52        buf: &mut [u8],
53    ) -> Poll<Result<usize, Error>> {
54        let this = Pin::into_inner(self);
55        AsyncRead::poll_read(Pin::new(&mut this.0), ctx, buf)
56    }
57}
58
59impl tokio::io::AsyncWrite for AsyncTcpStream {
60    fn poll_write(
61        self: Pin<&mut Self>,
62        ctx: &mut Context,
63        buf: &[u8],
64    ) -> Poll<Result<usize, Error>> {
65        let this = Pin::into_inner(self);
66        AsyncWrite::poll_write(Pin::new(&mut this.0), ctx, buf)
67    }
68
69    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
70        let this = Pin::into_inner(self);
71        AsyncWrite::poll_flush(Pin::new(&mut this.0), cx)
72    }
73    fn poll_shutdown(
74        self: Pin<&mut Self>,
75        _cx: &mut Context<'_>,
76    ) -> Poll<Result<(), std::io::Error>> {
77        Poll::Ready(Ok(()))
78    }
79}
80
81#[derive(Clone, Copy, Debug, Default)]
82pub struct Connector;
83
84impl Service<Uri> for Connector {
85    type Response = AsyncTcpStream;
86    type Error = std::io::Error;
87    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
88
89    fn call(&mut self, req: Uri) -> Self::Future {
90        let fut = async move {
91            let addr = format!("{}:{}", req.host().unwrap(), req.port_u16().unwrap_or(80));
92            AsyncTcpStream::connect(addr)
93        };
94
95        Box::pin(fut)
96    }
97
98    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
99        Poll::Ready(Ok(()))
100    }
101}
102
103impl Connection for AsyncTcpStream {
104    fn connected(&self) -> Connected {
105        Connected::new()
106    }
107}
108
109// Wraps fahrenheit as hyper's Executor.
110pub struct FahrenheitExecutor;
111
112impl<Fut> Executor<Fut> for FahrenheitExecutor
113where
114    Fut: Send + std::future::Future<Output = ()> + 'static,
115{
116    fn execute(&self, fut: Fut) {
117        fahrenheit::spawn(fut);
118    }
119}