ftnet_utils/
peer_to_http.rs1pub async fn peer_to_http(
2 addr: &str,
3 client_pools: ftnet_utils::HttpConnectionPools,
4 send: &mut iroh::endpoint::SendStream,
5 mut recv: ftnet_utils::FrameReader,
6) -> eyre::Result<()> {
7 use eyre::WrapErr;
8 use http_body_util::BodyExt;
9 use tokio_stream::StreamExt;
10
11 tracing::info!("http request with {addr}");
12 let start = std::time::Instant::now();
13
14 let req: ftnet_utils::http::Request = match recv.next().await {
15 Some(Ok(v)) => serde_json::from_str(&v)
16 .wrap_err_with(|| "failed to serialize json while reading http request")?,
17 Some(Err(e)) => {
18 tracing::error!("failed to read request: {e}");
19 return Err(eyre::anyhow!("failed to read request: {e}"));
20 }
21 None => {
22 tracing::error!("no request");
23 return Err(eyre::anyhow!("no request"));
24 }
25 };
26
27 tracing::info!("got request: {req:?}");
28
29 let mut body = recv.read_buffer().to_vec();
30 let mut recv = recv.into_inner();
31
32 let mut buf = Vec::with_capacity(1024 * 64);
33
34 tracing::trace!("reading body");
35 while let Some(v) = recv.read(&mut buf).await? {
36 if v == 0 {
37 tracing::trace!("finished reading");
38 break;
39 }
40 tracing::trace!("reading body, partial: {v}");
41 body.extend_from_slice(&buf);
42 buf.truncate(0);
43 }
44 tracing::debug!("read {} bytes of body", body.len());
45
46 let mut r = hyper::Request::builder()
47 .method(req.method.as_str())
48 .uri(req.uri);
49 for (name, value) in req.headers {
50 r = r.header(name, value);
51 }
52
53 tracing::debug!("request: {r:?}");
54
55 let pool = get_pool(addr, client_pools).await?;
56 tracing::trace!("got pool");
57 let mut client = match pool.get().await {
58 Ok(v) => v,
59 Err(e) => {
60 tracing::error!("failed to get connection: {e:?}");
61 return Err(eyre::anyhow!("failed to get connection: {e:?}"));
62 }
63 };
64 let (resp, body) = client
67 .send_request(
68 r.body(
69 http_body_util::Full::new(hyper::body::Bytes::from(body))
70 .map_err(|e| match e {})
71 .boxed(),
72 )?,
73 )
74 .await
75 .wrap_err_with(|| "failed to send request")?
76 .into_parts();
77
78 let r = ftnet_utils::http::Response {
79 status: resp.status.as_u16(),
80 headers: resp
81 .headers
82 .iter()
83 .map(|(k, v)| (k.to_string(), v.as_bytes().to_vec()))
84 .collect(),
85 };
86
87 send.write_all(
88 serde_json::to_string(&r)
89 .wrap_err_with(|| "failed to serialize json while writing http response")?
90 .as_bytes(),
91 )
92 .await?;
93 send.write_all(b"\n").await?;
94 let bytes = body.collect().await?.to_bytes();
95 tracing::debug!("got response body: {} bytes", bytes.len());
96 send.write_all(&bytes).await?;
97
98 tracing::info!("handled http request in {:?}", start.elapsed());
99 Ok(())
100}
101
102async fn get_pool(
103 addr: &str,
104 client_pools: ftnet_utils::HttpConnectionPools,
105) -> eyre::Result<bb8::Pool<ftnet_utils::HttpConnectionManager>> {
106 tracing::trace!("get pool called");
107 let mut pools = client_pools.lock().await;
108
109 Ok(match pools.get(addr) {
110 Some(v) => {
111 tracing::debug!("found existing pool for {addr}");
112 v.clone()
113 }
114 None => {
115 tracing::debug!("creating new pool for {addr}");
116
117 let pool = bb8::Pool::builder()
118 .build(ftnet_utils::HttpConnectionManager::new(addr.to_string()))
119 .await?;
120
121 pools.insert(addr.to_string(), pool.clone());
122 pool
123 }
124 })
125}