1pub async fn get_remote_id52(conn: &iroh::endpoint::Connection) -> eyre::Result<String> {
12    let remote_node_id = match conn.remote_node_id() {
13        Ok(id) => id,
14        Err(e) => {
15            tracing::error!("could not read remote node id: {e}, closing connection");
16            let e2 = conn.closed().await;
19            tracing::info!("connection closed: {e2}");
20            conn.close(0u8.into(), &[]);
22            return Err(eyre::anyhow!("could not read remote node id: {e}"));
23        }
24    };
25
26    let bytes = remote_node_id.as_bytes();
28    Ok(data_encoding::BASE32_DNSSEC.encode(bytes))
29}
30
31async fn ack(send: &mut iroh::endpoint::SendStream) -> eyre::Result<()> {
32    tracing::trace!("sending ack");
33    send.write_all(format!("{}\n", crate::ACK).as_bytes())
34        .await?;
35    tracing::trace!("sent ack");
36    Ok(())
37}
38
39pub async fn accept_bi(
48    conn: &iroh::endpoint::Connection,
49    expected: crate::Protocol,
50) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
51    loop {
52        tracing::trace!("accepting bidirectional stream");
53        match accept_bi_(conn).await? {
54            (mut send, _recv, crate::Protocol::Ping) => {
55                tracing::trace!("got ping");
56                tracing::trace!("sending PONG");
57                send.write_all(crate::PONG)
58                    .await
59                    .inspect_err(|e| tracing::error!("failed to write PONG: {e:?}"))?;
60                tracing::trace!("sent PONG");
61            }
62            (s, r, found) => {
63                tracing::trace!("got bidirectional stream: {found:?}");
64                if found != expected {
65                    return Err(eyre::anyhow!("expected: {expected:?}, got {found:?}"));
66                }
67                return Ok((s, r));
68            }
69        }
70    }
71}
72
73pub async fn accept_bi_with<T: serde::de::DeserializeOwned>(
86    conn: &iroh::endpoint::Connection,
87    expected: crate::Protocol,
88) -> eyre::Result<(T, iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
89    let (send, mut recv) = accept_bi(conn, expected).await?;
90    let next = next_json(&mut recv)
91        .await
92        .inspect_err(|e| tracing::error!("failed to read next message: {e}"))?;
93
94    Ok((next, send, recv))
95}
96
97async fn accept_bi_(
98    conn: &iroh::endpoint::Connection,
99) -> eyre::Result<(
100    iroh::endpoint::SendStream,
101    iroh::endpoint::RecvStream,
102    crate::Protocol,
103)> {
104    tracing::trace!("accept_bi_ called");
105    let (mut send, mut recv) = conn.accept_bi().await?;
106    tracing::trace!("accept_bi_ got send and recv");
107
108    let msg: crate::Protocol = next_json(&mut recv)
109        .await
110        .inspect_err(|e| tracing::error!("failed to read next message: {e}"))?;
111
112    tracing::trace!("msg: {msg:?}");
113
114    ack(&mut send).await?;
115
116    tracing::trace!("ack sent");
117    Ok((send, recv, msg))
118}
119
120pub async fn next_json<T: serde::de::DeserializeOwned>(
131    recv: &mut iroh::endpoint::RecvStream,
132) -> eyre::Result<T> {
133    let mut buffer = Vec::with_capacity(1024);
135
136    loop {
137        let mut byte = [0u8];
138        let n = recv.read(&mut byte).await?;
139
140        if n == Some(0) || n.is_none() {
141            return Err(eyre::anyhow!(
142                "connection closed while reading response header"
143            ));
144        }
145
146        if byte[0] == b'\n' {
147            break;
148        } else {
149            buffer.push(byte[0]);
150        }
151    }
152
153    Ok(serde_json::from_slice(&buffer)?)
154}
155
156pub async fn next_string(recv: &mut iroh::endpoint::RecvStream) -> eyre::Result<String> {
167    let mut buffer = Vec::with_capacity(1024);
169
170    loop {
171        let mut byte = [0u8];
172        let n = recv.read(&mut byte).await?;
173
174        if n == Some(0) || n.is_none() {
175            return Err(eyre::anyhow!(
176                "connection closed while reading response header"
177            ));
178        }
179
180        if byte[0] == b'\n' {
181            break;
182        } else {
183            buffer.push(byte[0]);
184        }
185    }
186
187    String::from_utf8(buffer).map_err(|e| eyre::anyhow!("failed to convert bytes to string: {e}"))
188}
189
190pub async fn global_iroh_endpoint() -> iroh::Endpoint {
202    async fn new_iroh_endpoint() -> iroh::Endpoint {
203        iroh::Endpoint::builder()
205            .discovery_n0()
206            .discovery_local_network()
207            .alpns(vec![crate::APNS_IDENTITY.into()])
208            .bind()
209            .await
210            .expect("failed to create iroh Endpoint")
211    }
212
213    static IROH_ENDPOINT: tokio::sync::OnceCell<iroh::Endpoint> =
214        tokio::sync::OnceCell::const_new();
215    IROH_ENDPOINT.get_or_init(new_iroh_endpoint).await.clone()
216}