1pub async fn get_remote_id52(conn: &iroh::endpoint::Connection) -> eyre::Result<String> {
12 let remote_node_id = match conn.remote_node_id() {
13 Ok(id) => id,
14 Err(e) => {
15 tracing::error!("could not read remote node id: {e}, closing connection");
16 let e2 = conn.closed().await;
19 tracing::info!("connection closed: {e2}");
20 conn.close(0u8.into(), &[]);
22 return Err(eyre::anyhow!("could not read remote node id: {e}"));
23 }
24 };
25
26 let bytes = remote_node_id.as_bytes();
28 Ok(data_encoding::BASE32_DNSSEC.encode(bytes))
29}
30
31async fn ack(send: &mut iroh::endpoint::SendStream) -> eyre::Result<()> {
32 tracing::trace!("sending ack");
33 send.write_all(format!("{}\n", crate::ACK).as_bytes())
34 .await?;
35 tracing::trace!("sent ack");
36 Ok(())
37}
38
39pub async fn accept_bi(
48 conn: &iroh::endpoint::Connection,
49 expected: crate::Protocol,
50) -> eyre::Result<(iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
51 loop {
52 tracing::trace!("accepting bidirectional stream");
53 match accept_bi_(conn).await? {
54 (mut send, _recv, crate::Protocol::Ping) => {
55 tracing::trace!("got ping");
56 tracing::trace!("sending PONG");
57 send.write_all(crate::PONG)
58 .await
59 .inspect_err(|e| tracing::error!("failed to write PONG: {e:?}"))?;
60 tracing::trace!("sent PONG");
61 }
62 (s, r, found) => {
63 tracing::trace!("got bidirectional stream: {found:?}");
64 if found != expected {
65 return Err(eyre::anyhow!("expected: {expected:?}, got {found:?}"));
66 }
67 return Ok((s, r));
68 }
69 }
70 }
71}
72
73pub async fn accept_bi_with<T: serde::de::DeserializeOwned>(
86 conn: &iroh::endpoint::Connection,
87 expected: crate::Protocol,
88) -> eyre::Result<(T, iroh::endpoint::SendStream, iroh::endpoint::RecvStream)> {
89 let (send, mut recv) = accept_bi(conn, expected).await?;
90 let next = next_json(&mut recv)
91 .await
92 .inspect_err(|e| tracing::error!("failed to read next message: {e}"))?;
93
94 Ok((next, send, recv))
95}
96
97async fn accept_bi_(
98 conn: &iroh::endpoint::Connection,
99) -> eyre::Result<(
100 iroh::endpoint::SendStream,
101 iroh::endpoint::RecvStream,
102 crate::Protocol,
103)> {
104 tracing::trace!("accept_bi_ called");
105 let (mut send, mut recv) = conn.accept_bi().await?;
106 tracing::trace!("accept_bi_ got send and recv");
107
108 let msg: crate::Protocol = next_json(&mut recv)
109 .await
110 .inspect_err(|e| tracing::error!("failed to read next message: {e}"))?;
111
112 tracing::trace!("msg: {msg:?}");
113
114 ack(&mut send).await?;
115
116 tracing::trace!("ack sent");
117 Ok((send, recv, msg))
118}
119
120pub async fn next_json<T: serde::de::DeserializeOwned>(
131 recv: &mut iroh::endpoint::RecvStream,
132) -> eyre::Result<T> {
133 let mut buffer = Vec::with_capacity(1024);
135
136 loop {
137 let mut byte = [0u8];
138 let n = recv.read(&mut byte).await?;
139
140 if n == Some(0) || n.is_none() {
141 return Err(eyre::anyhow!(
142 "connection closed while reading response header"
143 ));
144 }
145
146 if byte[0] == b'\n' {
147 break;
148 } else {
149 buffer.push(byte[0]);
150 }
151 }
152
153 Ok(serde_json::from_slice(&buffer)?)
154}
155
156pub async fn next_string(recv: &mut iroh::endpoint::RecvStream) -> eyre::Result<String> {
167 let mut buffer = Vec::with_capacity(1024);
169
170 loop {
171 let mut byte = [0u8];
172 let n = recv.read(&mut byte).await?;
173
174 if n == Some(0) || n.is_none() {
175 return Err(eyre::anyhow!(
176 "connection closed while reading response header"
177 ));
178 }
179
180 if byte[0] == b'\n' {
181 break;
182 } else {
183 buffer.push(byte[0]);
184 }
185 }
186
187 String::from_utf8(buffer).map_err(|e| eyre::anyhow!("failed to convert bytes to string: {e}"))
188}
189
190pub async fn global_iroh_endpoint() -> iroh::Endpoint {
202 async fn new_iroh_endpoint() -> iroh::Endpoint {
203 iroh::Endpoint::builder()
205 .discovery_n0()
206 .discovery_local_network()
207 .alpns(vec![crate::APNS_IDENTITY.into()])
208 .bind()
209 .await
210 .expect("failed to create iroh Endpoint")
211 }
212
213 static IROH_ENDPOINT: tokio::sync::OnceCell<iroh::Endpoint> =
214 tokio::sync::OnceCell::const_new();
215 IROH_ENDPOINT.get_or_init(new_iroh_endpoint).await.clone()
216}