fastn_net/
http_connection_manager.rs1pub type HttpConnectionPool = bb8::Pool<HttpConnectionManager>;
5
6pub type HttpConnectionPools =
11 std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, HttpConnectionPool>>>;
12
13pub struct HttpConnectionManager {
18 addr: String,
19}
20
21impl HttpConnectionManager {
22 pub fn new(addr: String) -> Self {
23 Self { addr }
24 }
25
26 pub async fn connect(
27 &self,
28 ) -> eyre::Result<
29 hyper::client::conn::http1::SendRequest<
30 http_body_util::combinators::BoxBody<hyper::body::Bytes, eyre::Error>,
31 >,
32 > {
33 use eyre::WrapErr;
34
35 let stream = tokio::net::TcpStream::connect(&self.addr)
36 .await
37 .wrap_err_with(|| "failed to open tcp connection")?;
38 let io = hyper_util::rt::TokioIo::new(stream);
39
40 let (sender, conn) = hyper::client::conn::http1::handshake(io)
41 .await
42 .wrap_err_with(|| "failed to do http1 handshake")?;
43 tokio::task::spawn(async move {
44 if let Err(err) = conn.await.wrap_err_with(|| "connection failed") {
45 tracing::error!("Connection failed: {err:?}");
46 }
47 });
48
49 Ok(sender)
50 }
51}
52
53impl bb8::ManageConnection for HttpConnectionManager {
54 type Connection = hyper::client::conn::http1::SendRequest<
55 http_body_util::combinators::BoxBody<hyper::body::Bytes, eyre::Error>,
56 >;
57 type Error = eyre::Error;
58
59 fn connect(&self) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
60 Box::pin(async move { self.connect().await })
61 }
62
63 fn is_valid(
64 &self,
65 conn: &mut Self::Connection,
66 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
67 Box::pin(async {
68 if conn.is_closed() {
69 return Err(eyre::anyhow!("connection is closed"));
70 }
71
72 Ok(())
73 })
74 }
75
76 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
77 conn.is_closed()
78 }
79}