ftnet_utils/
http_peer_proxy.rs1pub async fn http(
2 addr: &str,
3 client_pools: ftnet_utils::ConnectionPools,
4 send: &mut iroh::endpoint::SendStream,
5 mut recv: ftnet_utils::utils::FrameReader,
6) -> eyre::Result<()> {
7 use eyre::WrapErr;
8 use http_body_util::BodyExt;
9 use tokio_stream::StreamExt;
10
11 tracing::info!("http called with {addr}");
12 let req: ftnet_utils::http::Request = match recv.next().await {
13 Some(Ok(v)) => serde_json::from_str(&v)
14 .wrap_err_with(|| "failed to serialize json while reading http request")?,
15 Some(Err(e)) => {
16 tracing::error!("failed to read request: {e}");
17 return Err(eyre::anyhow!("failed to read request: {e}"));
18 }
19 None => {
20 tracing::error!("no request");
21 return Err(eyre::anyhow!("no request"));
22 }
23 };
24
25 tracing::info!("got request: {req:?}");
26
27 let mut body = recv.read_buffer().to_vec();
28 let mut recv = recv.into_inner();
29
30 let mut buf = Vec::with_capacity(1024 * 64);
31
32 tracing::info!("reading body");
33 while let Some(v) = recv.read(&mut buf).await? {
34 if v == 0 {
35 tracing::info!("finished reading");
36 break;
37 }
38 tracing::info!("reading body, partial: {v}");
39 body.extend_from_slice(&buf);
40 buf.truncate(0);
41 }
42 tracing::info!("finished reading body");
43
44 let mut r = hyper::Request::builder()
45 .method(req.method.as_str())
46 .uri(req.uri);
47 for (name, value) in req.headers {
48 r = r.header(name, value);
49 }
50
51 tracing::info!("request: {r:?}");
52
53 let pool = get_pool(addr, client_pools).await?;
54 let mut client = match pool.get().await {
56 Ok(v) => v,
57 Err(e) => {
58 tracing::error!("failed to get connection: {e:?}");
59 return Err(eyre::anyhow!("failed to get connection: {e:?}"));
60 }
61 };
62 let (resp, body) = client
65 .send_request(
66 r.body(
67 http_body_util::Full::new(hyper::body::Bytes::from(body))
68 .map_err(|e| match e {})
69 .boxed(),
70 )?,
71 )
72 .await
73 .wrap_err_with(|| "failed to send request")?
74 .into_parts();
75
76 let r = ftnet_utils::http::Response {
77 status: resp.status.as_u16(),
78 headers: resp
79 .headers
80 .iter()
81 .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
82 .collect(),
83 };
84
85 send.write_all(
86 serde_json::to_string(&r)
87 .wrap_err_with(|| "failed to serialize json while writing http response")?
88 .as_bytes(),
89 )
90 .await?;
91 send.write_all(b"\n").await?;
92 send.write_all(&(body.collect().await?.to_bytes())).await?;
93
94 Ok(())
95}
96
97async fn get_pool(
98 addr: &str,
99 client_pools: ftnet_utils::ConnectionPools,
100) -> eyre::Result<bb8::Pool<ftnet_utils::ConnectionManager>> {
101 tracing::info!("get client");
102 let mut pools = client_pools.lock().await;
103 tracing::info!("get client1");
104
105 Ok(match pools.get(addr) {
106 Some(v) => v.clone(),
107 None => {
108 let pool = bb8::Pool::builder()
109 .build(ftnet_utils::ConnectionManager::new(addr.to_string()))
110 .await?;
111
112 pools.insert(addr.to_string(), pool.clone());
113 pool
114 }
115 })
116}