faucet_server/client/
pool.rs

1use super::body::ExclusiveBody;
2use super::worker::WorkerConfig;
3use crate::error::{FaucetError, FaucetResult};
4use crate::global_conn::{add_connection, remove_connection};
5use deadpool::managed::{self, Object, Pool, RecycleError};
6use http_body_util::BodyExt;
7use hyper::body::Incoming;
8use hyper::client::conn::http1::SendRequest;
9use hyper::{Request, Response};
10use hyper_util::rt::TokioIo;
11use std::net::SocketAddr;
12use tokio::net::TcpStream;
13
14struct ConnectionHandle {
15    sender: SendRequest<Incoming>,
16}
17
18async fn create_http_client(config: WorkerConfig) -> FaucetResult<ConnectionHandle> {
19    log::debug!(target: "faucet", "Establishing TCP connection to {}", config.target);
20    let stream = TokioIo::new(TcpStream::connect(config.addr).await?);
21    let (sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
22    tokio::spawn(async move {
23        match conn.await {
24            Ok(_) => (),
25            Err(err) => {
26                log::debug!(target: "faucet", "{err}");
27            }
28        }
29    });
30    log::debug!(target: "faucet", "Established TCP connection to {}", config.target);
31    Ok(ConnectionHandle { sender })
32}
33
34struct ConnectionManager {
35    config: WorkerConfig,
36}
37
38impl ConnectionManager {
39    fn new(config: WorkerConfig) -> Self {
40        Self { config }
41    }
42}
43
44impl managed::Manager for ConnectionManager {
45    type Type = ConnectionHandle;
46    type Error = FaucetError;
47
48    async fn create(&self) -> FaucetResult<Self::Type> {
49        create_http_client(self.config).await
50    }
51
52    async fn recycle(
53        &self,
54        conn: &mut ConnectionHandle,
55        _: &managed::Metrics,
56    ) -> managed::RecycleResult<FaucetError> {
57        if conn.sender.is_closed() {
58            Err(RecycleError::message("Connection closed"))
59        } else {
60            log::debug!(target: "faucet", "Recycling TCP connection to {}", self.config.target);
61            Ok(())
62        }
63    }
64}
65
66pub struct HttpConnection {
67    inner: Object<ConnectionManager>,
68}
69
70impl HttpConnection {
71    pub async fn send_request(
72        mut self,
73        request: Request<Incoming>,
74    ) -> FaucetResult<Response<ExclusiveBody>> {
75        add_connection();
76        let (parts, body) = self.inner.sender.send_request(request).await?.into_parts();
77        let body = ExclusiveBody::new(body.map_err(Into::into), Some(self));
78        Ok(Response::from_parts(parts, body))
79    }
80}
81
82impl Drop for HttpConnection {
83    fn drop(&mut self) {
84        remove_connection();
85    }
86}
87
88pub(crate) struct ClientBuilder {
89    max_size: Option<usize>,
90    config: Option<WorkerConfig>,
91}
92
93const DEFAULT_MAX_SIZE: usize = 32;
94
95impl ClientBuilder {
96    pub fn build(self) -> FaucetResult<Client> {
97        let config = self
98            .config
99            .expect("Unable to create connection without worker state");
100        let builder = Pool::builder(ConnectionManager::new(config))
101            .max_size(self.max_size.unwrap_or(DEFAULT_MAX_SIZE));
102        let pool = builder.build()?;
103        Ok(Client { pool, config })
104    }
105}
106
107#[derive(Clone)]
108pub(crate) struct Client {
109    pool: Pool<ConnectionManager>,
110    pub(crate) config: WorkerConfig,
111}
112
113impl Client {
114    pub fn builder(config: WorkerConfig) -> ClientBuilder {
115        ClientBuilder {
116            max_size: None,
117            config: Some(config),
118        }
119    }
120
121    pub async fn get(&self) -> FaucetResult<HttpConnection> {
122        Ok(HttpConnection {
123            inner: self.pool.get().await?,
124        })
125    }
126    pub fn is_online(&self) -> bool {
127        self.config
128            .is_online
129            .load(std::sync::atomic::Ordering::SeqCst)
130    }
131}
132
133pub trait ExtractSocketAddr {
134    fn socket_addr(&self) -> SocketAddr;
135}
136
137impl ExtractSocketAddr for Client {
138    #[inline(always)]
139    fn socket_addr(&self) -> SocketAddr {
140        self.config.addr
141    }
142}