ftnet_utils/
peer_to_http.rs

1pub async fn peer_to_http(
2    addr: &str,
3    client_pools: ftnet_utils::HttpConnectionPools,
4    send: &mut iroh::endpoint::SendStream,
5    mut recv: ftnet_utils::FrameReader,
6) -> eyre::Result<()> {
7    use eyre::WrapErr;
8    use http_body_util::BodyExt;
9    use tokio_stream::StreamExt;
10
11    tracing::info!("http request with {addr}");
12    let start = std::time::Instant::now();
13
14    let req: ftnet_utils::http::Request = match recv.next().await {
15        Some(Ok(v)) => serde_json::from_str(&v)
16            .wrap_err_with(|| "failed to serialize json while reading http request")?,
17        Some(Err(e)) => {
18            tracing::error!("failed to read request: {e}");
19            return Err(eyre::anyhow!("failed to read request: {e}"));
20        }
21        None => {
22            tracing::error!("no request");
23            return Err(eyre::anyhow!("no request"));
24        }
25    };
26
27    tracing::info!("got request: {req:?}");
28
29    let mut body = recv.read_buffer().to_vec();
30    let mut recv = recv.into_inner();
31
32    let mut buf = Vec::with_capacity(1024 * 64);
33
34    tracing::trace!("reading body");
35    while let Some(v) = recv.read(&mut buf).await? {
36        if v == 0 {
37            tracing::trace!("finished reading");
38            break;
39        }
40        tracing::trace!("reading body, partial: {v}");
41        body.extend_from_slice(&buf);
42        buf.truncate(0);
43    }
44    tracing::debug!("read {} bytes of body", body.len());
45
46    let mut r = hyper::Request::builder()
47        .method(req.method.as_str())
48        .uri(req.uri);
49    for (name, value) in req.headers {
50        r = r.header(name, value);
51    }
52
53    tracing::debug!("request: {r:?}");
54
55    let pool = get_pool(addr, client_pools).await?;
56    tracing::trace!("got pool");
57    let mut client = match pool.get().await {
58        Ok(v) => v,
59        Err(e) => {
60            tracing::error!("failed to get connection: {e:?}");
61            return Err(eyre::anyhow!("failed to get connection: {e:?}"));
62        }
63    };
64    // tracing::info!("got client");
65
66    let (resp, body) = client
67        .send_request(
68            r.body(
69                http_body_util::Full::new(hyper::body::Bytes::from(body))
70                    .map_err(|e| match e {})
71                    .boxed(),
72            )?,
73        )
74        .await
75        .wrap_err_with(|| "failed to send request")?
76        .into_parts();
77
78    let r = ftnet_utils::http::Response {
79        status: resp.status.as_u16(),
80        headers: resp
81            .headers
82            .iter()
83            .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
84            .collect(),
85    };
86
87    send.write_all(
88        serde_json::to_string(&r)
89            .wrap_err_with(|| "failed to serialize json while writing http response")?
90            .as_bytes(),
91    )
92    .await?;
93    send.write_all(b"\n").await?;
94    let bytes = body.collect().await?.to_bytes();
95    tracing::debug!("got response body: {} bytes", bytes.len());
96    send.write_all(&bytes).await?;
97
98    tracing::info!("handled http request in {:?}", start.elapsed());
99    Ok(())
100}
101
102async fn get_pool(
103    addr: &str,
104    client_pools: ftnet_utils::HttpConnectionPools,
105) -> eyre::Result<bb8::Pool<ftnet_utils::HttpConnectionManager>> {
106    tracing::trace!("get pool called");
107    let mut pools = client_pools.lock().await;
108
109    Ok(match pools.get(addr) {
110        Some(v) => {
111            tracing::debug!("found existing pool for {addr}");
112            v.clone()
113        }
114        None => {
115            tracing::debug!("creating new pool for {addr}");
116
117            let pool = bb8::Pool::builder()
118                .build(ftnet_utils::HttpConnectionManager::new(addr.to_string()))
119                .await?;
120
121            pools.insert(addr.to_string(), pool.clone());
122            pool
123        }
124    })
125}