ftnet_utils/
proxy.rs

1use crate::utils;
2use crate::{IDMap, PeerConnections, Protocol};
3
4pub async fn peer_to_peer(
5    req: hyper::Request<hyper::body::Incoming>,
6    self_id52: &str,
7    remote_node_id52: &str,
8    peer_connections: PeerConnections,
9    _patch: ftnet_sdk::RequestPatch,
10    id_map: IDMap,
11) -> crate::http::ProxyResult {
12    use http_body_util::BodyExt;
13    use tokio_stream::StreamExt;
14
15    tracing::info!("peer_proxy: {remote_node_id52}");
16
17    let (mut send, recv) =
18        get_stream(self_id52, remote_node_id52, peer_connections, id_map).await?;
19
20    tracing::info!("got stream");
21    send.write_all(&serde_json::to_vec(&Protocol::Identity)?)
22        .await?;
23    send.write(b"\n").await?;
24
25    tracing::info!("wrote protocol");
26
27    let (head, body) = req.into_parts();
28    send.write_all(&serde_json::to_vec(&crate::http::Request::from(head))?)
29        .await?;
30    send.write_all("\n".as_bytes()).await?;
31    tracing::info!("sent request header");
32
33    let mut body = http_body_util::BodyDataStream::new(body);
34    while let Some(v) = body.next().await {
35        send.write_all(&v?).await?;
36    }
37
38    tracing::info!("sent body");
39
40    let mut recv = crate::utils::frame_reader(recv);
41    let r: crate::http::Response = match recv.next().await {
42        Some(v) => serde_json::from_str(&v?)?,
43        None => {
44            tracing::error!("failed to read from incoming connection");
45            return Err(eyre::anyhow!("failed to read from incoming connection"));
46        }
47    };
48
49    tracing::info!("got response header: {r:?}");
50
51    let mut body = recv.read_buffer().to_vec();
52    let mut recv = recv.into_inner();
53
54    let mut buf = Vec::with_capacity(1024 * 64);
55
56    while let Some(v) = recv.read(&mut buf).await? {
57        if v == 0 {
58            tracing::info!("finished reading body");
59            break;
60        }
61        body.extend_from_slice(&buf);
62        buf.truncate(0);
63    }
64
65    tracing::info!("got body");
66
67    let mut res = hyper::Response::new(
68        http_body_util::Full::new(hyper::body::Bytes::from(body))
69            .map_err(|e| match e {})
70            .boxed(),
71    );
72    *res.status_mut() = hyper::http::StatusCode::from_u16(r.status)?;
73    for (k, v) in r.headers {
74        res.headers_mut().insert(
75            hyper::http::header::HeaderName::from_bytes(k.as_bytes())?,
76            hyper::http::header::HeaderValue::from_bytes(&v)?,
77        );
78    }
79
80    tracing::info!("all done");
81    Ok(res)
82}
83
84async fn get_stream(
85    self_id52: &str,
86    remote_node_id52: &str,
87    peer_connections: PeerConnections,
88    id_map: IDMap,
89) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
90    let conn = get_connection(self_id52, remote_node_id52, id_map, peer_connections).await?;
91    // TODO: this is where we can check if the connection is healthy or not. if we fail to get the
92    //       bidirectional stream, probably we should try to recreate connection.
93    Ok(conn.open_bi().await?)
94}
95
96async fn get_connection(
97    self_id52: &str,
98    remote_node_id52: &str,
99    id_map: IDMap,
100    peer_connections: PeerConnections,
101) -> eyre::Result<iroh::endpoint::Connection> {
102    let connections = peer_connections.lock().await;
103    let connection = connections.get(remote_node_id52).map(ToOwned::to_owned);
104
105    // we drop the connections mutex guard so that we do not hold lock across await point.
106    drop(connections);
107
108    if let Some(conn) = connection {
109        return Ok(conn);
110    }
111
112    let ep = get_endpoint(self_id52, id_map).await?;
113    let conn = match ep
114        .connect(
115            utils::id52_to_public_key(remote_node_id52)?,
116            crate::APNS_IDENTITY,
117        )
118        .await
119    {
120        Ok(conn) => conn,
121        Err(e) => {
122            tracing::error!("failed to create connection: {e:?}");
123            return Err(eyre::anyhow!("failed to create connection: {e:?}"));
124        }
125    };
126
127    let mut connections = peer_connections.lock().await;
128    connections.insert(remote_node_id52.to_string(), conn.clone());
129
130    Ok(conn)
131}
132
133async fn get_endpoint(self_id52: &str, id_map: IDMap) -> eyre::Result<iroh::endpoint::Endpoint> {
134    let map = id_map.lock().await;
135
136    for (id, (_port, ep)) in map.iter() {
137        if id == self_id52 {
138            return Ok(ep.clone());
139        }
140    }
141
142    tracing::error!("no entry for {self_id52} in the id_map: {id_map:?}");
143    Err(eyre::anyhow!(
144        "no entry for {self_id52} in the id_map: {id_map:?}"
145    ))
146}