use futures::compat::{Compat as Compat03As01, Compat01As03};
use futures::future::BoxFuture;
use futures::future::FutureObj;
use futures::prelude::*;
use futures::task::SpawnError;
use hyper::client::connect as hyper_connect;
use hyper_tls::HttpsConnector;
use native_tls::TlsConnector;
use runtime::net::TcpStream;
use runtime_raw::Runtime;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use super::{Body, HttpClient, Request, Response};
#[derive(Debug)]
pub struct HyperClient {
client: Arc<hyper::Client<HttpsConnector<RuntimeTcpConnector>, hyper::Body>>,
}
impl HyperClient {
pub(crate) fn new() -> Self {
let tcp_connector = RuntimeTcpConnector::new();
let tls_connector = TlsConnector::new().unwrap();
let https = HttpsConnector::from((tcp_connector, tls_connector));
let client = hyper::Client::builder()
.executor(Compat03As01::new(TokioSpawner))
.build::<_, hyper::Body>(https);
Self {
client: Arc::new(client),
}
}
}
impl Clone for HyperClient {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
}
}
}
impl HttpClient for HyperClient {
type Error = hyper::error::Error;
fn send(&self, req: Request) -> BoxFuture<'static, Result<Response, Self::Error>> {
let client = self.client.clone();
Box::pin(async move {
let (parts, body) = req.into_parts();
let byte_stream = Compat03As01::new(ChunkStream { reader: body });
let body = hyper::Body::wrap_stream(byte_stream);
let req = hyper::Request::from_parts(parts, body);
let res = Compat01As03::new(client.request(req)).await?;
let (parts, body) = res.into_parts();
let body_stream = Compat01As03::new(body)
.map(|chunk| chunk.map(|chunk| chunk.to_vec()))
.map_err(|_| io::ErrorKind::InvalidData.into());
let body_reader = body_stream.into_async_read();
let body = Body::from_reader(Box::new(body_reader));
let res = http::Response::from_parts(parts, body);
Ok(res)
})
}
}
struct ChunkStream<R: AsyncRead> {
reader: R,
}
impl<R: AsyncRead + Unpin> futures::Stream for ChunkStream<R> {
type Item = Result<hyper::Chunk, Box<dyn std::error::Error + Send + Sync + 'static>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = vec![];
let read = futures::ready!(Pin::new(&mut self.reader).poll_read(cx, &mut buf))?;
if read == 0 {
return Poll::Ready(None);
} else {
buf.shrink_to_fit();
let chunk = hyper::Chunk::from(buf);
Poll::Ready(Some(Ok(chunk)))
}
}
}
pub(crate) struct RuntimeTcpConnector {
_priv: (),
}
impl RuntimeTcpConnector {
pub(crate) fn new() -> Self {
Self { _priv: () }
}
}
impl hyper_connect::Connect for RuntimeTcpConnector {
type Transport = Compat03As01<TcpStream>;
type Error = io::Error;
type Future = Compat03As01<
Pin<
Box<
dyn Future<
Output = Result<(Self::Transport, hyper_connect::Connected), Self::Error>,
> + Send,
>,
>,
>;
fn connect(&self, dest: hyper_connect::Destination) -> Self::Future {
Compat03As01::new(Box::pin(async move {
let port = match dest.port() {
Some(port) => port,
None if dest.scheme() == "https" => 443,
None => 80,
};
let tcp_stream = TcpStream::connect((dest.host(), port)).await?;
Ok((
Compat03As01::new(tcp_stream),
hyper_connect::Connected::new(),
))
}))
}
}
#[derive(Debug)]
struct TokioSpawner;
impl futures::task::Spawn for &TokioSpawner {
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
runtime_tokio::Tokio.spawn_boxed(future.boxed())
}
}