ftnet_utils/
http_connection.rs

1pub type ConnectionPool = bb8::Pool<ConnectionManager>;
2pub type ConnectionPools =
3    std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, ConnectionPool>>>;
4
5pub struct ConnectionManager {
6    addr: String,
7}
8
9impl ConnectionManager {
10    pub fn new(addr: String) -> Self {
11        Self { addr }
12    }
13
14    pub async fn connect(
15        &self,
16    ) -> eyre::Result<
17        hyper::client::conn::http1::SendRequest<
18            http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
19        >,
20    > {
21        use eyre::WrapErr;
22
23        let stream = tokio::net::TcpStream::connect(&self.addr)
24            .await
25            .wrap_err_with(|| "failed to open tcp connection")?;
26        let io = hyper_util::rt::TokioIo::new(stream);
27
28        let (sender, conn) = hyper::client::conn::http1::handshake(io)
29            .await
30            .wrap_err_with(|| "failed to do http1 handshake")?;
31        tokio::task::spawn(async move {
32            if let Err(err) = conn.await.wrap_err_with(|| "connection failed") {
33                tracing::error!("Connection failed: {err:?}");
34            }
35        });
36
37        Ok(sender)
38    }
39}
40
41impl bb8::ManageConnection for ConnectionManager {
42    type Connection = hyper::client::conn::http1::SendRequest<
43        http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
44    >;
45    type Error = eyre::Error;
46
47    fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
48        Box::pin(async move { self.connect().await })
49    }
50
51    fn is_valid(
52        &self,
53        conn: &mut Self::Connection,
54    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
55        Box::pin(async {
56            if conn.is_closed() {
57                return Err(eyre::anyhow!("connection is closed"));
58            }
59
60            Ok(())
61        })
62    }
63
64    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
65        conn.is_closed()
66    }
67}