ftnet-utils 0.1.0

ftnet utilities
Documentation
pub type ConnectionPool = bb8::Pool<ConnectionManager>;
pub type ConnectionPools =
    std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, ConnectionPool>>>;

pub struct ConnectionManager {
    addr: String,
}

impl ConnectionManager {
    pub fn new(addr: String) -> Self {
        Self { addr }
    }

    pub async fn connect(
        &self,
    ) -> eyre::Result<
        hyper::client::conn::http1::SendRequest<
            http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
        >,
    > {
        use eyre::WrapErr;

        let stream = tokio::net::TcpStream::connect(&self.addr)
            .await
            .wrap_err_with(|| "failed to open tcp connection")?;
        let io = hyper_util::rt::TokioIo::new(stream);

        let (sender, conn) = hyper::client::conn::http1::handshake(io)
            .await
            .wrap_err_with(|| "failed to do http1 handshake")?;
        tokio::task::spawn(async move {
            if let Err(err) = conn.await.wrap_err_with(|| "connection failed") {
                tracing::error!("Connection failed: {err:?}");
            }
        });

        Ok(sender)
    }
}

impl bb8::ManageConnection for ConnectionManager {
    type Connection = hyper::client::conn::http1::SendRequest<
        http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
    >;
    type Error = eyre::Error;

    fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
        Box::pin(async move { self.connect().await })
    }

    fn is_valid(
        &self,
        conn: &mut Self::Connection,
    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
        Box::pin(async {
            if conn.is_closed() {
                return Err(eyre::anyhow!("connection is closed"));
            }

            Ok(())
        })
    }

    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
        conn.is_closed()
    }
}