ftnet_utils/
http_peer_proxy.rs

1pub async fn http(
2    addr: &str,
3    client_pools: ftnet_utils::ConnectionPools,
4    send: &mut iroh::endpoint::SendStream,
5    mut recv: ftnet_utils::utils::FrameReader,
6) -> eyre::Result<()> {
7    use eyre::WrapErr;
8    use http_body_util::BodyExt;
9    use tokio_stream::StreamExt;
10
11    tracing::info!("http called with {addr}");
12    let req: ftnet_utils::http::Request = match recv.next().await {
13        Some(Ok(v)) => serde_json::from_str(&v)
14            .wrap_err_with(|| "failed to serialize json while reading http request")?,
15        Some(Err(e)) => {
16            tracing::error!("failed to read request: {e}");
17            return Err(eyre::anyhow!("failed to read request: {e}"));
18        }
19        None => {
20            tracing::error!("no request");
21            return Err(eyre::anyhow!("no request"));
22        }
23    };
24
25    tracing::info!("got request: {req:?}");
26
27    let mut body = recv.read_buffer().to_vec();
28    let mut recv = recv.into_inner();
29
30    let mut buf = Vec::with_capacity(1024 * 64);
31
32    tracing::info!("reading body");
33    while let Some(v) = recv.read(&mut buf).await? {
34        if v == 0 {
35            tracing::info!("finished reading");
36            break;
37        }
38        tracing::info!("reading body, partial: {v}");
39        body.extend_from_slice(&buf);
40        buf.truncate(0);
41    }
42    tracing::info!("finished reading body");
43
44    let mut r = hyper::Request::builder()
45        .method(req.method.as_str())
46        .uri(req.uri);
47    for (name, value) in req.headers {
48        r = r.header(name, value);
49    }
50
51    tracing::info!("request: {r:?}");
52
53    let pool = get_pool(addr, client_pools).await?;
54    // tracing::info!("got pool");
55    let mut client = match pool.get().await {
56        Ok(v) => v,
57        Err(e) => {
58            tracing::error!("failed to get connection: {e:?}");
59            return Err(eyre::anyhow!("failed to get connection: {e:?}"));
60        }
61    };
62    // tracing::info!("got client");
63
64    let (resp, body) = client
65        .send_request(
66            r.body(
67                http_body_util::Full::new(hyper::body::Bytes::from(body))
68                    .map_err(|e| match e {})
69                    .boxed(),
70            )?,
71        )
72        .await
73        .wrap_err_with(|| "failed to send request")?
74        .into_parts();
75
76    let r = ftnet_utils::http::Response {
77        status: resp.status.as_u16(),
78        headers: resp
79            .headers
80            .iter()
81            .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
82            .collect(),
83    };
84
85    send.write_all(
86        serde_json::to_string(&r)
87            .wrap_err_with(|| "failed to serialize json while writing http response")?
88            .as_bytes(),
89    )
90    .await?;
91    send.write_all(b"\n").await?;
92    send.write_all(&(body.collect().await?.to_bytes())).await?;
93
94    Ok(())
95}
96
97async fn get_pool(
98    addr: &str,
99    client_pools: ftnet_utils::ConnectionPools,
100) -> eyre::Result<bb8::Pool<ftnet_utils::ConnectionManager>> {
101    tracing::info!("get client");
102    let mut pools = client_pools.lock().await;
103    tracing::info!("get client1");
104
105    Ok(match pools.get(addr) {
106        Some(v) => v.clone(),
107        None => {
108            let pool = bb8::Pool::builder()
109                .build(ftnet_utils::ConnectionManager::new(addr.to_string()))
110                .await?;
111
112            pools.insert(addr.to_string(), pool.clone());
113            pool
114        }
115    })
116}