ftnet_utils/
http_connection.rs1pub type ConnectionPool = bb8::Pool<ConnectionManager>;
2pub type ConnectionPools =
3 std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, ConnectionPool>>>;
4
5pub struct ConnectionManager {
6 addr: String,
7}
8
9impl ConnectionManager {
10 pub fn new(addr: String) -> Self {
11 Self { addr }
12 }
13
14 pub async fn connect(
15 &self,
16 ) -> eyre::Result<
17 hyper::client::conn::http1::SendRequest<
18 http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
19 >,
20 > {
21 use eyre::WrapErr;
22
23 let stream = tokio::net::TcpStream::connect(&self.addr)
24 .await
25 .wrap_err_with(|| "failed to open tcp connection")?;
26 let io = hyper_util::rt::TokioIo::new(stream);
27
28 let (sender, conn) = hyper::client::conn::http1::handshake(io)
29 .await
30 .wrap_err_with(|| "failed to do http1 handshake")?;
31 tokio::task::spawn(async move {
32 if let Err(err) = conn.await.wrap_err_with(|| "connection failed") {
33 tracing::error!("Connection failed: {err:?}");
34 }
35 });
36
37 Ok(sender)
38 }
39}
40
41impl bb8::ManageConnection for ConnectionManager {
42 type Connection = hyper::client::conn::http1::SendRequest<
43 http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
44 >;
45 type Error = eyre::Error;
46
47 fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
48 Box::pin(async move { self.connect().await })
49 }
50
51 fn is_valid(
52 &self,
53 conn: &mut Self::Connection,
54 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
55 Box::pin(async {
56 if conn.is_closed() {
57 return Err(eyre::anyhow!("connection is closed"));
58 }
59
60 Ok(())
61 })
62 }
63
64 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
65 conn.is_closed()
66 }
67}