use super::body::ExclusiveBody;
use super::worker::WorkerState;
use crate::error::{FaucetError, FaucetResult};
use async_trait::async_trait;
use deadpool::managed::{self, Object, Pool, RecycleError};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use hyper::client::conn::http1::SendRequest;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use tokio::net::TcpStream;
struct ConnectionHandle {
sender: SendRequest<Incoming>,
}
async fn create_http_client(worker_state: &WorkerState) -> FaucetResult<ConnectionHandle> {
log::debug!(target: "faucet", "Establishing TCP connection to {}", worker_state.target());
let stream = TokioIo::new(TcpStream::connect(worker_state.socket_addr()).await?);
let (sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
tokio::spawn(async move {
conn.await.expect("client conn");
});
log::debug!(target: "faucet", "Established TCP connection to {}", worker_state.target());
Ok(ConnectionHandle { sender })
}
struct ConnectionManager {
worker_state: WorkerState,
}
impl ConnectionManager {
fn new(worker_state: WorkerState) -> Self {
Self { worker_state }
}
}
#[async_trait]
impl managed::Manager for ConnectionManager {
type Type = ConnectionHandle;
type Error = FaucetError;
async fn create(&self) -> FaucetResult<Self::Type> {
create_http_client(&self.worker_state).await
}
async fn recycle(
&self,
conn: &mut ConnectionHandle,
_: &managed::Metrics,
) -> managed::RecycleResult<FaucetError> {
if conn.sender.is_closed() {
Err(RecycleError::StaticMessage("Connection closed"))
} else {
log::debug!(target: "faucet", "Recycling TCP connection to {}", self.worker_state.target());
Ok(())
}
}
}
pub struct HttpConnection {
inner: Object<ConnectionManager>,
}
impl HttpConnection {
pub async fn send_request(
mut self,
request: Request<Incoming>,
) -> FaucetResult<Response<ExclusiveBody>> {
let (parts, body) = self.inner.sender.send_request(request).await?.into_parts();
let body = ExclusiveBody::new(body.map_err(Into::into), Some(self));
Ok(Response::from_parts(parts, body))
}
}
pub(crate) struct ClientBuilder {
max_size: Option<usize>,
worker_state: Option<WorkerState>,
}
const DEFAULT_MAX_SIZE: usize = 32;
impl ClientBuilder {
pub fn build(self) -> FaucetResult<Client> {
let worker_state = self
.worker_state
.expect("Unable to create connection without worker state");
let builder = Pool::builder(ConnectionManager::new(worker_state.clone()))
.max_size(self.max_size.unwrap_or(DEFAULT_MAX_SIZE));
let pool = builder.build()?;
Ok(Client { pool, worker_state })
}
}
#[derive(Clone)]
pub(crate) struct Client {
pool: Pool<ConnectionManager>,
worker_state: WorkerState,
}
impl Client {
pub fn builder(worker_state: WorkerState) -> ClientBuilder {
ClientBuilder {
max_size: None,
worker_state: Some(worker_state),
}
}
pub fn target(&self) -> &'static str {
self.worker_state.target()
}
pub async fn get(&self) -> FaucetResult<HttpConnection> {
Ok(HttpConnection {
inner: self.pool.get().await?,
})
}
pub fn is_online(&self) -> bool {
self.worker_state.is_online()
}
}
pub trait ExtractSocketAddr {
fn socket_addr(&self) -> SocketAddr;
}
impl ExtractSocketAddr for Client {
#[inline(always)]
fn socket_addr(&self) -> SocketAddr {
self.worker_state.socket_addr()
}
}