faucet_server/client/
pool.rs1use super::body::ExclusiveBody;
2use super::worker::WorkerConfig;
3use crate::error::{FaucetError, FaucetResult};
4use crate::global_conn::{add_connection, remove_connection};
5use deadpool::managed::{self, Object, Pool, RecycleError};
6use http_body_util::BodyExt;
7use hyper::body::Incoming;
8use hyper::client::conn::http1::SendRequest;
9use hyper::{Request, Response};
10use hyper_util::rt::TokioIo;
11use std::net::SocketAddr;
12use tokio::net::TcpStream;
13
14struct ConnectionHandle {
15 sender: SendRequest<Incoming>,
16}
17
18async fn create_http_client(config: WorkerConfig) -> FaucetResult<ConnectionHandle> {
19 log::debug!(target: "faucet", "Establishing TCP connection to {}", config.target);
20 let stream = TokioIo::new(TcpStream::connect(config.addr).await?);
21 let (sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
22 tokio::spawn(async move {
23 match conn.await {
24 Ok(_) => (),
25 Err(err) => {
26 log::debug!(target: "faucet", "{err}");
27 }
28 }
29 });
30 log::debug!(target: "faucet", "Established TCP connection to {}", config.target);
31 Ok(ConnectionHandle { sender })
32}
33
34struct ConnectionManager {
35 config: WorkerConfig,
36}
37
38impl ConnectionManager {
39 fn new(config: WorkerConfig) -> Self {
40 Self { config }
41 }
42}
43
44impl managed::Manager for ConnectionManager {
45 type Type = ConnectionHandle;
46 type Error = FaucetError;
47
48 async fn create(&self) -> FaucetResult<Self::Type> {
49 create_http_client(self.config).await
50 }
51
52 async fn recycle(
53 &self,
54 conn: &mut ConnectionHandle,
55 _: &managed::Metrics,
56 ) -> managed::RecycleResult<FaucetError> {
57 if conn.sender.is_closed() {
58 Err(RecycleError::message("Connection closed"))
59 } else {
60 log::debug!(target: "faucet", "Recycling TCP connection to {}", self.config.target);
61 Ok(())
62 }
63 }
64}
65
66pub struct HttpConnection {
67 inner: Object<ConnectionManager>,
68}
69
70impl HttpConnection {
71 pub async fn send_request(
72 mut self,
73 request: Request<Incoming>,
74 ) -> FaucetResult<Response<ExclusiveBody>> {
75 add_connection();
76 let (parts, body) = self.inner.sender.send_request(request).await?.into_parts();
77 let body = ExclusiveBody::new(body.map_err(Into::into), Some(self));
78 Ok(Response::from_parts(parts, body))
79 }
80}
81
82impl Drop for HttpConnection {
83 fn drop(&mut self) {
84 remove_connection();
85 }
86}
87
88pub(crate) struct ClientBuilder {
89 max_size: Option<usize>,
90 config: Option<WorkerConfig>,
91}
92
93const DEFAULT_MAX_SIZE: usize = 32;
94
95impl ClientBuilder {
96 pub fn build(self) -> FaucetResult<Client> {
97 let config = self
98 .config
99 .expect("Unable to create connection without worker state");
100 let builder = Pool::builder(ConnectionManager::new(config))
101 .max_size(self.max_size.unwrap_or(DEFAULT_MAX_SIZE));
102 let pool = builder.build()?;
103 Ok(Client { pool, config })
104 }
105}
106
107#[derive(Clone)]
108pub(crate) struct Client {
109 pool: Pool<ConnectionManager>,
110 pub(crate) config: WorkerConfig,
111}
112
113impl Client {
114 pub fn builder(config: WorkerConfig) -> ClientBuilder {
115 ClientBuilder {
116 max_size: None,
117 config: Some(config),
118 }
119 }
120
121 pub async fn get(&self) -> FaucetResult<HttpConnection> {
122 Ok(HttpConnection {
123 inner: self.pool.get().await?,
124 })
125 }
126 pub fn is_online(&self) -> bool {
127 self.config
128 .is_online
129 .load(std::sync::atomic::Ordering::SeqCst)
130 }
131}
132
133pub trait ExtractSocketAddr {
134 fn socket_addr(&self) -> SocketAddr;
135}
136
137impl ExtractSocketAddr for Client {
138 #[inline(always)]
139 fn socket_addr(&self) -> SocketAddr {
140 self.config.addr
141 }
142}