iroh_pkarr_node_discovery/
lib.rs

1//! # Pkarr based node discovery for iroh-net
2//!
3//! Node discovery is being able to find connecting information about an iroh node based on just its node id.
4//!
5//! This crate implements a discovery mechanism for iroh-net based on <https://pkarr.org/>.
6//!
7//! TLDR: Each node publishes its address to the mainline DHT as a DNS packet, signed with its private key.
8//! The DNS packet contains the node's direct addresses and optionally a DERP URL.
9use std::{
10    sync::{Arc, Mutex},
11    time::Duration,
12};
13
14use futures_lite::StreamExt;
15use genawaiter::sync::{Co, Gen};
16use iroh_net::{
17    discovery::{
18        pkarr::{DEFAULT_PKARR_TTL, N0_DNS_PKARR_RELAY_PROD},
19        Discovery, DiscoveryItem,
20    },
21    dns::node_info::NodeInfo,
22    key::SecretKey,
23    util::AbortingJoinHandle,
24    AddrInfo, Endpoint, NodeId,
25};
26use pkarr::{
27    PkarrClient, PkarrClientAsync, PkarrRelayClient, PkarrRelayClientAsync, PublicKey,
28    RelaySettings, SignedPacket,
29};
30use url::Url;
31
32/// Republish delay for the DHT. This is only for when the info does not change.
33/// If the info changes, it will be published immediately.
34const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
35/// Initial publish delay. This is to avoid spamming the DHT when there are
36/// frequent network changes at startup.
37const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);
38
39/// A discovery mechanism for iroh-net based on <https://pkarr.org/>.
40///
41/// TLDR: it stores node addresses in DNS records, signed by the node's private key,
42/// and publishes them to the bittorrent mainline DHT.
43///
44/// Calling publish will start a background task that periodically publishes the node address.
45#[derive(Debug, Clone)]
46pub struct PkarrNodeDiscovery(Arc<Inner>);
47
48impl Default for PkarrNodeDiscovery {
49    fn default() -> Self {
50        Self::builder().build().expect("valid builder")
51    }
52}
53
54#[derive(derive_more::Debug)]
55struct Inner {
56    /// Pkarr client for interacting with the DHT.
57    pkarr: PkarrClientAsync,
58    /// Pkarr client for interacting with a pkarr relay
59    #[debug("Option<PkarrRelayClientAsync>")]
60    pkarr_relay: Option<PkarrRelayClientAsync>,
61    /// The background task that periodically publishes the node address.
62    /// Due to AbortingJoinHandle, this will be aborted when the discovery is dropped.
63    task: Mutex<Option<AbortingJoinHandle<()>>>,
64    /// Optional keypair for signing the DNS packets.
65    ///
66    /// If this is None, the node will not publish its address to the DHT.
67    secret_key: Option<SecretKey>,
68    /// Optional pkarr relay URL to use.
69    relay_url: Option<Url>,
70    /// Whether to publish to the mainline DHT.
71    dht: bool,
72    /// Time-to-live value for the DNS packets.
73    ttl: u32,
74    /// True to include the direct addresses in the DNS packet.
75    include_direct_addresses: bool,
76}
77
78/// Builder for PkarrNodeDiscovery.
79///
80/// By default, publishing to the DHT is enabled, and relay publishing is disabled.
81#[derive(Debug)]
82pub struct Builder {
83    client: Option<PkarrClient>,
84    secret_key: Option<SecretKey>,
85    ttl: Option<u32>,
86    pkarr_relay: Option<Url>,
87    dht: bool,
88    include_direct_addresses: bool,
89}
90
91impl Default for Builder {
92    fn default() -> Self {
93        Self {
94            client: None,
95            secret_key: None,
96            ttl: None,
97            pkarr_relay: None,
98            dht: true,
99            include_direct_addresses: false,
100        }
101    }
102}
103
104impl Builder {
105    /// Explicitly set the pkarr client to use.
106    pub fn client(mut self, client: PkarrClient) -> Self {
107        self.client = Some(client);
108        self
109    }
110
111    /// Set the secret key to use for signing the DNS packets.
112    ///
113    /// Without a secret key, the node will not publish its address to the DHT.
114    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
115        self.secret_key = Some(secret_key);
116        self
117    }
118
119    /// Set the time-to-live value for the DNS packets.
120    pub fn ttl(mut self, ttl: u32) -> Self {
121        self.ttl = Some(ttl);
122        self
123    }
124
125    /// Set the pkarr relay URL to use.
126    pub fn pkarr_relay(mut self, pkarr_relay: Url) -> Self {
127        self.pkarr_relay = Some(pkarr_relay);
128        self
129    }
130
131    /// Use the default pkarr relay URL.
132    pub fn n0_dns_pkarr_relay(mut self) -> Self {
133        self.pkarr_relay = Some(N0_DNS_PKARR_RELAY_PROD.parse().expect("valid URL"));
134        self
135    }
136
137    /// Set whether to publish to the mainline DHT.
138    pub fn dht(mut self, dht: bool) -> Self {
139        self.dht = dht;
140        self
141    }
142
143    /// Set whether to include the direct addresses in the DNS packet.
144    pub fn include_direct_addresses(mut self, include_direct_addresses: bool) -> Self {
145        self.include_direct_addresses = include_direct_addresses;
146        self
147    }
148
149    /// Build the discovery mechanism.
150    pub fn build(self) -> anyhow::Result<PkarrNodeDiscovery> {
151        let pkarr = self
152            .client
153            .unwrap_or_else(|| PkarrClient::new(Default::default()).unwrap())
154            .as_async();
155        let ttl = self.ttl.unwrap_or(DEFAULT_PKARR_TTL);
156        let relay_url = self.pkarr_relay;
157        let dht = self.dht;
158        let include_direct_addresses = self.include_direct_addresses;
159        anyhow::ensure!(
160            dht || relay_url.is_some(),
161            "at least one of DHT or relay must be enabled"
162        );
163
164        let pkarr_relay = match relay_url.clone() {
165            Some(url) => Some(
166                PkarrRelayClient::new(RelaySettings {
167                    relays: vec![url.to_string()],
168                    ..RelaySettings::default()
169                })?
170                .as_async(),
171            ),
172            None => None,
173        };
174
175        Ok(PkarrNodeDiscovery(Arc::new(Inner {
176            pkarr,
177            pkarr_relay,
178            secret_key: self.secret_key,
179            ttl,
180            relay_url,
181            dht,
182            include_direct_addresses,
183            task: Default::default(),
184        })))
185    }
186}
187
188impl PkarrNodeDiscovery {
189    /// Create a new builder for PkarrNodeDiscovery.
190    pub fn builder() -> Builder {
191        Builder::default()
192    }
193
194    /// Periodically publish the node address to the DHT and relay.
195    async fn publish_loop(self, keypair: SecretKey, signed_packet: SignedPacket) {
196        let this = self;
197        let z32 = pkarr::PublicKey::try_from(keypair.public().as_bytes())
198            .expect("valid public key")
199            .to_z32();
200        // initial delay. If the task gets aborted before this delay is over,
201        // we have not published anything to the DHT yet.
202        tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
203        loop {
204            // publish to the DHT if enabled
205            let dht_publish = async {
206                if this.0.dht {
207                    let res = this.0.pkarr.publish(&signed_packet).await;
208                    match res {
209                        Ok(()) => {
210                            tracing::debug!("pkarr publish success. published under {z32}",);
211                        }
212                        Err(e) => {
213                            // we could do a smaller delay here, but in general DHT publish
214                            // not working is due to a network issue, and if the network changes
215                            // the task will be restarted anyway.
216                            //
217                            // Being unable to publish to the DHT is something that is expected
218                            // to happen from time to time, so this does not warrant a error log.
219                            tracing::warn!("pkarr publish error: {}", e);
220                        }
221                    }
222                }
223            };
224            // publish to the relay if enabled
225            let relay_publish = async {
226                if let Some(relay) = this.0.pkarr_relay.as_ref() {
227                    tracing::info!(
228                        "publishing to relay: {}",
229                        this.0.relay_url.as_ref().unwrap().to_string()
230                    );
231                    match relay.publish(&signed_packet).await {
232                        Ok(_) => {
233                            tracing::debug!("pkarr publish to relay success");
234                        }
235                        Err(e) => {
236                            tracing::warn!("pkarr publish to relay error: {}", e);
237                        }
238                    }
239                }
240            };
241            // do both at the same time
242            tokio::join!(relay_publish, dht_publish);
243            tokio::time::sleep(REPUBLISH_DELAY).await;
244        }
245    }
246
247    async fn resolve_relay(
248        &self,
249        pkarr_public_key: PublicKey,
250        co: &Co<anyhow::Result<DiscoveryItem>>,
251    ) {
252        let Some(relay) = &self.0.pkarr_relay else {
253            return;
254        };
255        let url = self.0.relay_url.as_ref().unwrap();
256        tracing::info!("resolving {} from relay {}", pkarr_public_key.to_z32(), url);
257        let response = relay.resolve(&pkarr_public_key).await;
258        match response {
259            Ok(Some(signed_packet)) => {
260                if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
261                    let addr_info = node_info.into();
262                    tracing::info!("discovered node info from relay {:?}", addr_info);
263                    co.yield_(Ok(DiscoveryItem {
264                        provenance: "relay",
265                        last_updated: None,
266                        addr_info,
267                    }))
268                    .await;
269                } else {
270                    tracing::debug!("failed to parse signed packet as node info");
271                }
272            }
273            Ok(None) => {
274                tracing::debug!("no signed packet found in relay");
275            }
276            Err(e) => {
277                tracing::debug!("failed to get signed packet from relay: {}", e);
278                co.yield_(Err(e.into())).await;
279            }
280        }
281    }
282
283    /// Resolve a node id from the DHT.
284    async fn resolve_dht(
285        &self,
286        pkarr_public_key: PublicKey,
287        co: &Co<anyhow::Result<DiscoveryItem>>,
288    ) {
289        if !self.0.dht {
290            return;
291        };
292        tracing::info!("resolving {} from DHT", pkarr_public_key.to_z32());
293        let response = match self.0.pkarr.resolve(&pkarr_public_key).await {
294            Ok(r) => r,
295            Err(e) => {
296                co.yield_(Err(e.into())).await;
297                return;
298            }
299        };
300        let Some(signed_packet) = response else {
301            tracing::debug!("no signed packet found in DHT");
302            return;
303        };
304        if let Ok(node_info) = NodeInfo::from_pkarr_signed_packet(&signed_packet) {
305            let addr_info = node_info.into();
306            tracing::info!("discovered node info from DHT {:?}", addr_info);
307            co.yield_(Ok(DiscoveryItem {
308                provenance: "mainline",
309                last_updated: None,
310                addr_info,
311            }))
312            .await;
313        } else {
314            tracing::debug!("failed to parse signed packet as node info");
315        }
316    }
317
318    async fn resolve(self, node_id: NodeId, co: Co<anyhow::Result<DiscoveryItem>>) {
319        let pkarr_public_key =
320            pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
321        tokio::join!(
322            self.resolve_dht(pkarr_public_key.clone(), &co),
323            self.resolve_relay(pkarr_public_key, &co)
324        );
325    }
326}
327
328impl Discovery for PkarrNodeDiscovery {
329    fn publish(&self, info: &AddrInfo) {
330        let Some(keypair) = &self.0.secret_key else {
331            tracing::debug!("no keypair set, not publishing");
332            return;
333        };
334        tracing::debug!("publishing {:?}", info);
335        let info = NodeInfo {
336            node_id: keypair.public(),
337            relay_url: info.relay_url.clone().map(Url::from),
338            direct_addresses: if self.0.include_direct_addresses {
339                info.direct_addresses.clone()
340            } else {
341                Default::default()
342            },
343        };
344        let Ok(signed_packet) = info.to_pkarr_signed_packet(keypair, self.0.ttl) else {
345            tracing::warn!("failed to create signed packet");
346            return;
347        };
348        let this = self.clone();
349        let curr = tokio::spawn(this.publish_loop(keypair.clone(), signed_packet));
350        let mut task = self.0.task.lock().unwrap();
351        *task = Some(curr.into());
352    }
353
354    fn resolve(
355        &self,
356        _endpoint: Endpoint,
357        node_id: NodeId,
358    ) -> Option<futures_lite::stream::Boxed<anyhow::Result<DiscoveryItem>>> {
359        let this = self.clone();
360        let pkarr_public_key =
361            pkarr::PublicKey::try_from(node_id.as_bytes()).expect("valid public key");
362        tracing::info!("resolving {} as {}", node_id, pkarr_public_key.to_z32());
363        Some(Gen::new(|co| async move { this.resolve(node_id, co).await }).boxed())
364    }
365}