fastn_net/
http_connection_manager.rs

1/// Connection pool for HTTP/1.1 connections.
2///
3/// Uses bb8 for connection pooling with automatic health checks and recycling.
4pub type HttpConnectionPool = bb8::Pool<HttpConnectionManager>;
5
6/// Collection of connection pools indexed by server address.
7///
8/// Allows maintaining separate pools for different HTTP servers,
9/// enabling efficient connection reuse across multiple targets.
10pub type HttpConnectionPools =
11    std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, HttpConnectionPool>>>;
12
13/// Manages HTTP/1.1 connections for connection pooling.
14///
15/// Implements the bb8::ManageConnection trait to handle connection
16/// lifecycle including creation, validation, and cleanup.
17pub struct HttpConnectionManager {
18    addr: String,
19}
20
21impl HttpConnectionManager {
22    pub fn new(addr: String) -> Self {
23        Self { addr }
24    }
25
26    pub async fn connect(
27        &self,
28    ) -> eyre::Result<
29        hyper::client::conn::http1::SendRequest<
30            http_body_util::combinators::BoxBody<hyper::body::Bytes, eyre::Error>,
31        >,
32    > {
33        use eyre::WrapErr;
34
35        let stream = tokio::net::TcpStream::connect(&self.addr)
36            .await
37            .wrap_err_with(|| "failed to open tcp connection")?;
38        let io = hyper_util::rt::TokioIo::new(stream);
39
40        let (sender, conn) = hyper::client::conn::http1::handshake(io)
41            .await
42            .wrap_err_with(|| "failed to do http1 handshake")?;
43        tokio::task::spawn(async move {
44            if let Err(err) = conn.await.wrap_err_with(|| "connection failed") {
45                tracing::error!("Connection failed: {err:?}");
46            }
47        });
48
49        Ok(sender)
50    }
51}
52
53impl bb8::ManageConnection for HttpConnectionManager {
54    type Connection = hyper::client::conn::http1::SendRequest<
55        http_body_util::combinators::BoxBody<hyper::body::Bytes, eyre::Error>,
56    >;
57    type Error = eyre::Error;
58
59    fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
60        Box::pin(async move { self.connect().await })
61    }
62
63    fn is_valid(
64        &self,
65        conn: &mut Self::Connection,
66    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
67        Box::pin(async {
68            if conn.is_closed() {
69                return Err(eyre::anyhow!("connection is closed"));
70            }
71
72            Ok(())
73        })
74    }
75
76    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
77        conn.is_closed()
78    }
79}