faucet_server/client/
pool.rs1use super::body::ExclusiveBody;
2use super::worker::WorkerConfig;
3use crate::error::{FaucetError, FaucetResult};
4use crate::global_conn::{add_connection, remove_connection};
5use deadpool::managed::{self, Object, Pool, RecycleError};
6use http_body_util::BodyExt;
7use hyper::body::Incoming;
8use hyper::client::conn::http1::SendRequest;
9use hyper::{Request, Response};
10use hyper_util::rt::TokioIo;
11use std::net::SocketAddr;
12use tokio::net::TcpStream;
13
14struct ConnectionHandle {
15 sender: SendRequest<Incoming>,
16}
17
18struct ConnectionManager {
19 config: &'static WorkerConfig,
20}
21
22impl ConnectionManager {
23 fn new(config: &'static WorkerConfig) -> Self {
24 Self { config }
25 }
26}
27
28const RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(20);
29
30impl managed::Manager for ConnectionManager {
31 type Type = ConnectionHandle;
32 type Error = FaucetError;
33
34 async fn create(&self) -> FaucetResult<Self::Type> {
35 log::debug!(target: "faucet", "Establishing TCP connection to {}", self.config.target);
36 let connection_res = loop {
37 match TcpStream::connect(self.config.addr).await {
38 Ok(stream) => break stream,
39 Err(_) => tokio::time::sleep(RETRY_DELAY).await,
40 }
41 };
42 let stream = TokioIo::new(connection_res);
43 let (sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
44 tokio::spawn(async move {
45 match conn.await {
46 Ok(_) => (),
47 Err(err) => {
48 log::debug!(target: "faucet", "{err}");
49 }
50 }
51 });
52 log::debug!(target: "faucet", "Established TCP connection to {}", self.config.target);
53 Ok(ConnectionHandle { sender })
54 }
55
56 async fn recycle(
57 &self,
58 conn: &mut ConnectionHandle,
59 _: &managed::Metrics,
60 ) -> managed::RecycleResult<FaucetError> {
61 if !self
62 .config
63 .is_online
64 .load(std::sync::atomic::Ordering::SeqCst)
65 {
66 return Err(RecycleError::message("Worker is offline"));
67 }
68 if conn.sender.is_closed() {
69 Err(RecycleError::message("Connection closed"))
70 } else {
71 log::debug!(target: "faucet", "Recycling TCP connection to {}", self.config.target);
72 Ok(())
73 }
74 }
75}
76
77pub struct HttpConnection {
78 inner: Object<ConnectionManager>,
79}
80
81impl HttpConnection {
82 pub async fn send_request(
83 mut self,
84 request: Request<Incoming>,
85 ) -> FaucetResult<Response<ExclusiveBody>> {
86 add_connection();
87 let (parts, body) = self.inner.sender.send_request(request).await?.into_parts();
88 let body = ExclusiveBody::new(body.map_err(Into::into), Some(self));
89 Ok(Response::from_parts(parts, body))
90 }
91}
92
93impl Drop for HttpConnection {
94 fn drop(&mut self) {
95 remove_connection();
96 }
97}
98
99const DEFAULT_MAX_SIZE: usize = 1024;
100
101#[derive(Clone)]
102pub(crate) struct Client {
103 pool: Pool<ConnectionManager>,
104 pub(crate) config: &'static WorkerConfig,
105}
106
107impl Client {
108 pub fn new(config: &'static WorkerConfig) -> Self {
109 let builder = Pool::builder(ConnectionManager::new(config)).max_size(DEFAULT_MAX_SIZE);
110 let pool = builder
111 .build()
112 .expect("Failed to create connection pool. This is a bug");
113 Self { pool, config }
114 }
115
116 pub async fn get(&self) -> FaucetResult<HttpConnection> {
117 Ok(HttpConnection {
118 inner: self.pool.get().await?,
119 })
120 }
121 pub fn is_online(&self) -> bool {
122 self.config
123 .is_online
124 .load(std::sync::atomic::Ordering::SeqCst)
125 }
126}
127
128pub trait ExtractSocketAddr {
129 fn socket_addr(&self) -> SocketAddr;
130}
131
132impl ExtractSocketAddr for Client {
133 #[inline(always)]
134 fn socket_addr(&self) -> SocketAddr {
135 self.config.addr
136 }
137}