faucet_server/client/
pool.rs

1use super::body::ExclusiveBody;
2use super::worker::WorkerConfig;
3use crate::error::{FaucetError, FaucetResult};
4use crate::global_conn::{add_connection, remove_connection};
5use deadpool::managed::{self, Object, Pool, RecycleError};
6use http_body_util::BodyExt;
7use hyper::body::Incoming;
8use hyper::client::conn::http1::SendRequest;
9use hyper::{Request, Response};
10use hyper_util::rt::TokioIo;
11use std::net::SocketAddr;
12use tokio::net::TcpStream;
13
14struct ConnectionHandle {
15    sender: SendRequest<Incoming>,
16}
17
18struct ConnectionManager {
19    config: &'static WorkerConfig,
20}
21
22impl ConnectionManager {
23    fn new(config: &'static WorkerConfig) -> Self {
24        Self { config }
25    }
26}
27
28const RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(20);
29
30impl managed::Manager for ConnectionManager {
31    type Type = ConnectionHandle;
32    type Error = FaucetError;
33
34    async fn create(&self) -> FaucetResult<Self::Type> {
35        log::debug!(target: "faucet", "Establishing TCP connection to {}", self.config.target);
36        let connection_res = loop {
37            match TcpStream::connect(self.config.addr).await {
38                Ok(stream) => break stream,
39                Err(_) => tokio::time::sleep(RETRY_DELAY).await,
40            }
41        };
42        let stream = TokioIo::new(connection_res);
43        let (sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
44        tokio::spawn(async move {
45            match conn.await {
46                Ok(_) => (),
47                Err(err) => {
48                    log::debug!(target: "faucet", "{err}");
49                }
50            }
51        });
52        log::debug!(target: "faucet", "Established TCP connection to {}", self.config.target);
53        Ok(ConnectionHandle { sender })
54    }
55
56    async fn recycle(
57        &self,
58        conn: &mut ConnectionHandle,
59        _: &managed::Metrics,
60    ) -> managed::RecycleResult<FaucetError> {
61        if !self
62            .config
63            .is_online
64            .load(std::sync::atomic::Ordering::SeqCst)
65        {
66            return Err(RecycleError::message("Worker is offline"));
67        }
68        if conn.sender.is_closed() {
69            Err(RecycleError::message("Connection closed"))
70        } else {
71            log::debug!(target: "faucet", "Recycling TCP connection to {}", self.config.target);
72            Ok(())
73        }
74    }
75}
76
77pub struct HttpConnection {
78    inner: Object<ConnectionManager>,
79}
80
81impl HttpConnection {
82    pub async fn send_request(
83        mut self,
84        request: Request<Incoming>,
85    ) -> FaucetResult<Response<ExclusiveBody>> {
86        add_connection();
87        let (parts, body) = self.inner.sender.send_request(request).await?.into_parts();
88        let body = ExclusiveBody::new(body.map_err(Into::into), Some(self));
89        Ok(Response::from_parts(parts, body))
90    }
91}
92
93impl Drop for HttpConnection {
94    fn drop(&mut self) {
95        remove_connection();
96    }
97}
98
99const DEFAULT_MAX_SIZE: usize = 1024;
100
101#[derive(Clone)]
102pub(crate) struct Client {
103    pool: Pool<ConnectionManager>,
104    pub(crate) config: &'static WorkerConfig,
105}
106
107impl Client {
108    pub fn new(config: &'static WorkerConfig) -> Self {
109        let builder = Pool::builder(ConnectionManager::new(config)).max_size(DEFAULT_MAX_SIZE);
110        let pool = builder
111            .build()
112            .expect("Failed to create connection pool. This is a bug");
113        Self { pool, config }
114    }
115
116    pub async fn get(&self) -> FaucetResult<HttpConnection> {
117        Ok(HttpConnection {
118            inner: self.pool.get().await?,
119        })
120    }
121    pub fn is_online(&self) -> bool {
122        self.config
123            .is_online
124            .load(std::sync::atomic::Ordering::SeqCst)
125    }
126}
127
128pub trait ExtractSocketAddr {
129    fn socket_addr(&self) -> SocketAddr;
130}
131
132impl ExtractSocketAddr for Client {
133    #[inline(always)]
134    fn socket_addr(&self) -> SocketAddr {
135        self.config.addr
136    }
137}