iroh_pkarr_node_discovery/
lib.rs1use std::{
10 sync::{Arc, Mutex},
11 time::Duration,
12};
13
14use futures_lite::StreamExt;
15use genawaiter::sync::{Co, Gen};
16use iroh_net::{
17 discovery::{
18 pkarr::{DEFAULT_PKARR_TTL, N0_DNS_PKARR_RELAY_PROD},
19 Discovery, DiscoveryItem,
20 },
21 dns::node_info::NodeInfo,
22 key::SecretKey,
23 util::AbortingJoinHandle,
24 AddrInfo, Endpoint, NodeId,
25};
26use pkarr::{
27 PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey,
28 RelaySettings, SignedPacket,
29};
30use url::Url;
31
32const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
35const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
38
39#[derive(Debug, Clone)]
46pub struct PkarrNodeDiscovery(Arc<Inner>);
47
48impl Default for PkarrNodeDiscovery {
49 fn default() -> Self {
50 Self::builder().build().expect("valid builder")
51 }
52}
53
54#[derive(derive_more::Debug)]
55struct Inner {
56 pkarr: PkarrClientAsync,
58 #[debug("Option<PkarrRelayClientAsync>")]
60 pkarr_relay: Option<PkarrRelayClientAsync>,
61 task: Mutex<Option<AbortingJoinHandle<()>>>,
64 secret_key: Option<SecretKey>,
68 relay_url: Option<Url>,
70 dht: bool,
72 ttl: u32,
74 include_direct_addresses: bool,
76}
77
78#[derive(Debug)]
82pub struct Builder {
83 client: Option<PkarrClient>,
84 secret_key: Option<SecretKey>,
85 ttl: Option<u32>,
86 pkarr_relay: Option<Url>,
87 dht: bool,
88 include_direct_addresses: bool,
89}
90
91impl Default for Builder {
92 fn default() -> Self {
93 Self {
94 client: None,
95 secret_key: None,
96 ttl: None,
97 pkarr_relay: None,
98 dht: true,
99 include_direct_addresses: false,
100 }
101 }
102}
103
104impl Builder {
105 pub fn client(mut self, client: PkarrClient) -> Self {
107 self.client = Some(client);
108 self
109 }
110
111 pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
115 self.secret_key = Some(secret_key);
116 self
117 }
118
119 pub fn ttl(mut self, ttl: u32) -> Self {
121 self.ttl = Some(ttl);
122 self
123 }
124
125 pub fn pkarr_relay(mut self, pkarr_relay: Url) -> Self {
127 self.pkarr_relay = Some(pkarr_relay);
128 self
129 }
130
131 pub fn n0_dns_pkarr_relay(mut self) -> Self {
133 self.pkarr_relay = Some(N0_DNS_PKARR_RELAY_PROD.parse().expect("valid URL"));
134 self
135 }
136
137 pub fn dht(mut self, dht: bool) -> Self {
139 self.dht = dht;
140 self
141 }
142
143 pub fn include_direct_addresses(mut self, include_direct_addresses: bool) -> Self {
145 self.include_direct_addresses = include_direct_addresses;
146 self
147 }
148
149 pub fn build(self) -> anyhow::Result<PkarrNodeDiscovery> {
151 let pkarr = self
152 .client
153 .unwrap_or_else(|| PkarrClient::new(Default::default()).unwrap())
154 .as_async();
155 let ttl = self.ttl.unwrap_or(DEFAULT_PKARR_TTL);
156 let relay_url = self.pkarr_relay;
157 let dht = self.dht;
158 let include_direct_addresses = self.include_direct_addresses;
159 anyhow::ensure!(
160 dht || relay_url.is_some(),
161 "at least one of DHT or relay must be enabled"
162 );
163
164 let pkarr_relay = match relay_url.clone() {
165 Some(url) => Some(
166 PkarrRelayClient::new(RelaySettings {
167 relays: vec![url.to_string()],
168 ..RelaySettings::default()
169 })?
170 .as_async(),
171 ),
172 None => None,
173 };
174
175 Ok(PkarrNodeDiscovery(Arc::new(Inner {
176 pkarr,
177 pkarr_relay,
178 secret_key: self.secret_key,
179 ttl,
180 relay_url,
181 dht,
182 include_direct_addresses,
183 task: Default::default(),
184 })))
185 }
186}
187
188impl PkarrNodeDiscovery {
189 pub fn builder() -> Builder {
191 Builder::default()
192 }
193
194 async fn publish_loop(self, keypair: SecretKey, signed_packet: SignedPacket) {
196 let this = self;
197 let z32 = pkarr::PublicKey::try_from(keypair.public().as_bytes())
198 .expect("valid public key")
199 .to_z32();
200 tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
203 loop {
204 let dht_publish = async {
206 if this.0.dht {
207 let res = this.0.pkarr.publish(&signed_packet).await;
208 match res {
209 Ok(()) => {
210 tracing::debug!("pkarr publish success. published under {z32}",);
211 }
212 Err(e) => {
213 tracing::warn!("pkarr publish error: {}", e);
220 }
221 }
222 }
223 };
224 let relay_publish = async {
226 if let Some(relay) = this.0.pkarr_relay.as_ref() {
227 tracing::info!(
228 "publishing to relay: {}",
229 this.0.relay_url.as_ref().unwrap().to_string()
230 );
231 match relay.publish(&signed_packet).await {
232 Ok(_) => {
233 tracing::debug!("pkarr publish to relay success");
234 }
235 Err(e) => {
236 tracing::warn!("pkarr publish to relay error: {}", e);
237 }
238 }
239 }
240 };
241 tokio::join!(relay_publish, dht_publish);
243 tokio::time::sleep(REPUBLISH_DELAY).await;
244 }
245 }
246
247 async fn resolve_relay(
248 &self,
249 pkarr_public_key: PublicKey,
250 co: &Co<anyhow::Result<DiscoveryItem>>,
251 ) {
252 let Some(relay) = &self.0.pkarr_relay else {
253 return;
254 };
255 let url = self.0.relay_url.as_ref().unwrap();
256 tracing::info!("resolving {} from relay {}", pkarr_public_key.to_z32(), url);
257 let response = relay.resolve(&pkarr_public_key).await;
258 match response {
259 Ok(Some(signed_packet)) => {
260 if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
261 let addr_info = node_info.into();
262 tracing::info!("discovered node info from relay {:?}", addr_info);
263 co.yield_(Ok(DiscoveryItem {
264 provenance: "relay",
265 last_updated: None,
266 addr_info,
267 }))
268 .await;
269 } else {
270 tracing::debug!("failed to parse signed packet as node info");
271 }
272 }
273 Ok(None) => {
274 tracing::debug!("no signed packet found in relay");
275 }
276 Err(e) => {
277 tracing::debug!("failed to get signed packet from relay: {}", e);
278 co.yield_(Err(e.into())).await;
279 }
280 }
281 }
282
283 async fn resolve_dht(
285 &self,
286 pkarr_public_key: PublicKey,
287 co: &Co<anyhow::Result<DiscoveryItem>>,
288 ) {
289 if !self.0.dht {
290 return;
291 };
292 tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());
293 let response = match self.0.pkarr.resolve(&pkarr_public_key).await {
294 Ok(r) => r,
295 Err(e) => {
296 co.yield_(Err(e.into())).await;
297 return;
298 }
299 };
300 let Some(signed_packet) = response else {
301 tracing::debug!("no signed packet found in DHT");
302 return;
303 };
304 if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
305 let addr_info = node_info.into();
306 tracing::info!("discovered node info from DHT {:?}", addr_info);
307 co.yield_(Ok(DiscoveryItem {
308 provenance: "mainline",
309 last_updated: None,
310 addr_info,
311 }))
312 .await;
313 } else {
314 tracing::debug!("failed to parse signed packet as node info");
315 }
316 }
317
318 async fn resolve(self, node_id: NodeId, co: Co<anyhow::Result<DiscoveryItem>>) {
319 let pkarr_public_key =
320 pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
321 tokio::join!(
322 self.resolve_dht(pkarr_public_key.clone(), &co),
323 self.resolve_relay(pkarr_public_key, &co)
324 );
325 }
326}
327
328impl Discovery for PkarrNodeDiscovery {
329 fn publish(&self, info: &AddrInfo) {
330 let Some(keypair) = &self.0.secret_key else {
331 tracing::debug!("no keypair set, not publishing");
332 return;
333 };
334 tracing::debug!("publishing {:?}", info);
335 let info = NodeInfo {
336 node_id: keypair.public(),
337 relay_url: info.relay_url.clone().map(Url::from),
338 direct_addresses: if self.0.include_direct_addresses {
339 info.direct_addresses.clone()
340 } else {
341 Default::default()
342 },
343 };
344 let Ok(signed_packet) = info.to_pkarr_signed_packet(keypair, self.0.ttl) else {
345 tracing::warn!("failed to create signed packet");
346 return;
347 };
348 let this = self.clone();
349 let curr = tokio::spawn(this.publish_loop(keypair.clone(), signed_packet));
350 let mut task = self.0.task.lock().unwrap();
351 *task = Some(curr.into());
352 }
353
354 fn resolve(
355 &self,
356 _endpoint: Endpoint,
357 node_id: NodeId,
358 ) -> Option<futures_lite::stream::Boxed<anyhow::Result<DiscoveryItem>>> {
359 let this = self.clone();
360 let pkarr_public_key =
361 pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
362 tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32());
363 Some(Gen::new(|co| async move { this.resolve(node_id, co).await }).boxed())
364 }
365}