fastn_net/
utils_iroh.rs

1// Functions that work with iroh types
2
3/// Gets the ID52 string of the remote peer from a connection.
4///
5/// Extracts the remote node's public key and converts it to ID52 format
6/// (52-character BASE32_DNSSEC encoding).
7///
8/// # Errors
9///
10/// Returns an error if the remote node ID cannot be read from the connection.
11pub async fn get_remote_id52(conn: &iroh::endpoint::Connection) -> eyre::Result<String> {
12    let remote_node_id = match conn.remote_node_id() {
13        Ok(id) => id,
14        Err(e) => {
15            tracing::error!("could not read remote node id: {e}, closing connection");
16            // TODO: is this how we close the connection in error cases or do we send some error
17            //       and wait for other side to close the connection?
18            let e2 = conn.closed().await;
19            tracing::info!("connection closed: {e2}");
20            // TODO: send another error_code to indicate bad remote node id?
21            conn.close(0u8.into(), &[]);
22            return Err(eyre::anyhow!("could not read remote node id: {e}"));
23        }
24    };
25
26    // Convert iroh::PublicKey to ID52 string
27    let bytes = remote_node_id.as_bytes();
28    Ok(data_encoding::BASE32_DNSSEC.encode(bytes))
29}
30
31async fn ack(send: &mut iroh::endpoint::SendStream) -> eyre::Result<()> {
32    tracing::trace!("sending ack");
33    send.write_all(format!("{}\n", crate::ACK).as_bytes())
34        .await?;
35    tracing::trace!("sent ack");
36    Ok(())
37}
38
39/// Accepts an incoming bidirectional stream with the expected protocol.
40///
41/// Continuously accepts incoming streams until one matches the expected protocol.
42/// Automatically handles and responds to ping messages.
43///
44/// # Errors
45///
46/// Returns an error if a non-ping stream has unexpected protocol.
47pub async fn accept_bi(
48    conn: &iroh::endpoint::Connection,
49    expected: crate::Protocol,
50) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
51    loop {
52        tracing::trace!("accepting bidirectional stream");
53        match accept_bi_(conn).await? {
54            (mut send, _recv, crate::Protocol::Ping) => {
55                tracing::trace!("got ping");
56                tracing::trace!("sending PONG");
57                send.write_all(crate::PONG)
58                    .await
59                    .inspect_err(|e| tracing::error!("failed to write PONG: {e:?}"))?;
60                tracing::trace!("sent PONG");
61            }
62            (s, r, found) => {
63                tracing::trace!("got bidirectional stream: {found:?}");
64                if found != expected {
65                    return Err(eyre::anyhow!("expected: {expected:?}, got {found:?}"));
66                }
67                return Ok((s, r));
68            }
69        }
70    }
71}
72
73/// Accepts an incoming bidirectional stream and reads additional data.
74///
75/// Like `accept_bi` but also reads and deserializes the next JSON message
76/// from the stream after protocol negotiation.
77///
78/// # Type Parameters
79///
80/// * `T` - The type to deserialize from the stream
81///
82/// # Errors
83///
84/// Returns an error if protocol doesn't match or deserialization fails.
85pub async fn accept_bi_with<T: serde::de::DeserializeOwned>(
86    conn: &iroh::endpoint::Connection,
87    expected: crate::Protocol,
88) -> eyre::Result<(T, iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
89    let (send, mut recv) = accept_bi(conn, expected).await?;
90    let next = next_json(&mut recv)
91        .await
92        .inspect_err(|e| tracing::error!("failed to read next message: {e}"))?;
93
94    Ok((next, send, recv))
95}
96
97async fn accept_bi_(
98    conn: &iroh::endpoint::Connection,
99) -> eyre::Result<(
100    iroh::endpoint::SendStream,
101    iroh::endpoint::RecvStream,
102    crate::Protocol,
103)> {
104    tracing::trace!("accept_bi_ called");
105    let (mut send, mut recv) = conn.accept_bi().await?;
106    tracing::trace!("accept_bi_ got send and recv");
107
108    let msg: crate::Protocol = next_json(&mut recv)
109        .await
110        .inspect_err(|e| tracing::error!("failed to read next message: {e}"))?;
111
112    tracing::trace!("msg: {msg:?}");
113
114    ack(&mut send).await?;
115
116    tracing::trace!("ack sent");
117    Ok((send, recv, msg))
118}
119
120/// Reads a newline-terminated JSON message from a stream.
121///
122/// Reads bytes until a newline character is encountered, then deserializes
123/// the buffer as JSON into the specified type.
124///
125/// # Errors
126///
127/// Returns an error if:
128/// - Connection is closed while reading
129/// - JSON deserialization fails
130pub async fn next_json<T: serde::de::DeserializeOwned>(
131    recv: &mut iroh::endpoint::RecvStream,
132) -> eyre::Result<T> {
133    // NOTE: the capacity is just a guess to avoid reallocations
134    let mut buffer = Vec::with_capacity(1024);
135
136    loop {
137        let mut byte = [0u8];
138        let n = recv.read(&mut byte).await?;
139
140        if n == Some(0) || n.is_none() {
141            return Err(eyre::anyhow!(
142                "connection closed while reading response header"
143            ));
144        }
145
146        if byte[0] == b'\n' {
147            break;
148        } else {
149            buffer.push(byte[0]);
150        }
151    }
152
153    Ok(serde_json::from_slice(&buffer)?)
154}
155
156/// Reads a newline-terminated string from a stream.
157///
158/// Reads bytes until a newline character is encountered and returns
159/// the result as a UTF-8 string.
160///
161/// # Errors
162///
163/// Returns an error if:
164/// - Connection is closed while reading
165/// - Bytes are not valid UTF-8
166pub async fn next_string(recv: &mut iroh::endpoint::RecvStream) -> eyre::Result<String> {
167    // NOTE: the capacity is just a guess to avoid reallocations
168    let mut buffer = Vec::with_capacity(1024);
169
170    loop {
171        let mut byte = [0u8];
172        let n = recv.read(&mut byte).await?;
173
174        if n == Some(0) || n.is_none() {
175            return Err(eyre::anyhow!(
176                "connection closed while reading response header"
177            ));
178        }
179
180        if byte[0] == b'\n' {
181            break;
182        } else {
183            buffer.push(byte[0]);
184        }
185    }
186
187    String::from_utf8(buffer).map_err(|e| eyre::anyhow!("failed to convert bytes to string: {e}"))
188}
189
190/// Returns a global singleton Iroh endpoint.
191///
192/// Creates the endpoint on first call and returns the same instance
193/// on subsequent calls. Configured with:
194/// - Local network discovery
195/// - N0 discovery (DHT-based)
196/// - ALPN: `/fastn/identity/0.1`
197///
198/// # Panics
199///
200/// Panics if endpoint creation fails.
201pub async fn global_iroh_endpoint() -> iroh::Endpoint {
202    async fn new_iroh_endpoint() -> iroh::Endpoint {
203        // TODO: read secret key from ENV VAR
204        iroh::Endpoint::builder()
205            .discovery_n0()
206            .discovery_local_network()
207            .alpns(vec![crate::APNS_IDENTITY.into()])
208            .bind()
209            .await
210            .expect("failed to create iroh Endpoint")
211    }
212
213    static IROH_ENDPOINT: tokio::sync::OnceCell<iroh::Endpoint> =
214        tokio::sync::OnceCell::const_new();
215    IROH_ENDPOINT.get_or_init(new_iroh_endpoint).await.clone()
216}