use std::error::Error;
use http::Request;
use http::Response;
use http_body::Body;
use http_body_util::BodyExt;
use hyper::client::conn::http1::SendRequest;
use hyper::client::{self};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
pub struct TakoClient<B: Body>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
sender: SendRequest<B>,
conn_handle: JoinHandle<Result<(), hyper::Error>>,
}
impl<B> TakoClient<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
pub async fn new<'a>(host: &'a str, port: Option<u16>) -> Result<Self, Box<dyn Error>>
where
'a: 'static,
{
let port = port.unwrap_or(80);
let addr = format!("{host}:{port}");
let tcp_stream = TcpStream::connect(addr).await?;
let io = TokioIo::new(tcp_stream);
let (sender, conn) = client::conn::http1::handshake::<_, B>(io).await?;
let conn_handle = tokio::spawn(async move {
if let Err(err) = conn.await {
tracing::error!("Connection error: {}", err);
}
Ok(())
});
Ok(Self {
sender,
conn_handle,
})
}
pub async fn request(&mut self, req: Request<B>) -> Result<Response<Vec<u8>>, Box<dyn Error>> {
let mut response = self.sender.send_request(req).await?;
let mut body_bytes = Vec::new();
while let Some(frame) = response.frame().await {
let frame = frame?;
if let Some(chunk) = frame.data_ref() {
body_bytes.extend_from_slice(chunk);
}
}
let parts = response.into_parts();
let resp = Response::from_parts(parts.0, body_bytes);
Ok(resp)
}
}
impl<B> Drop for TakoClient<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
fn drop(&mut self) {
self.conn_handle.abort();
}
}