Skip to main content

iroh_mainline_address_lookup/
lib.rs

1//! Pkarr based address lookup for iroh, supporting both relay servers and the DHT.
2//!
3//! This module contains pkarr-based address lookup for iroh which can use both pkarr
4//! relay servers as well as the Mainline DHT directly.  See the [pkarr module] in the
5//! `iroh-dns` crate for an overview of pkarr.
6//!
7//! [pkarr module]: iroh_dns::pkarr
8use std::sync::{Arc, Mutex};
9
10use iroh::{
11    Endpoint,
12    address_lookup::{
13        AddrFilter, AddressLookup, AddressLookupBuilder, AddressLookupBuilderError, EndpointData,
14        Error as AddressLookupError, Item as AddressLookupItem,
15    },
16    endpoint_info::EndpointInfo,
17};
18use iroh_base::{EndpointId, SecretKey};
19use iroh_dns::pkarr::{SignedPacket, SignedPacketVerifyError, Timestamp};
20use n0_future::{
21    boxed::BoxStream,
22    stream::StreamExt,
23    task::{self, AbortOnDropHandle},
24    time::{self, Duration},
25};
26use n0_mainline::{Dht, DhtBuilder, MutableItem};
27// DEFAULT_PKARR_TTL is private to iroh's pkarr module; redefine it here.
28const DEFAULT_PKARR_TTL: u32 = 30;
29
30/// Republish delay for the DHT.
31///
32/// This is only for when the info does not change.  If the info changes, it will be
33/// published immediately.
34const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
35
36/// Debounce delay before starting a publish loop.
37///
38/// Rapid endpoint updates can trigger repeated `publish` calls in quick succession.
39/// We delay loop start a little so earlier tasks get replaced and only the latest
40/// endpoint data is published.
41const PUBLISH_DEBOUNCE_DELAY: Duration = Duration::from_millis(50);
42
43/// Convert a [`SignedPacket`] to a mainline [`MutableItem`].
44fn signed_packet_to_mutable_item(packet: &SignedPacket) -> MutableItem {
45    MutableItem::new_signed_unchecked(
46        *packet.public_key().as_bytes(),
47        packet.signature().to_bytes(),
48        packet.encoded_packet(),
49        packet.timestamp().as_micros() as i64,
50        None,
51    )
52}
53
54/// Convert a mainline [`MutableItem`] to a [`SignedPacket`].
55fn mutable_item_to_signed_packet(
56    item: &MutableItem,
57) -> Result<SignedPacket, SignedPacketVerifyError> {
58    SignedPacket::from_parts_unchecked(
59        item.key(),
60        item.signature(),
61        Timestamp::from_micros(item.seq() as u64),
62        item.value(),
63    )
64}
65
66/// Pkarr Mainline DHT and relay server address lookup.
67///
68/// It stores endpoint addresses in DNS records, signed by the endpoint's private key, and publishes
69/// them to the BitTorrent Mainline DHT.  See the [pkarr module] in the `iroh-dns` crate
70/// for more details.
71///
72/// This implements the [`AddressLookup`] trait to be used as an address lookup service which can
73/// be used as both a publisher and resolver.  Calling [`DhtAddressLookup::publish`] will start
74/// a background task that periodically publishes the endpoint address.
75///
76/// [`DhtAddressLookup`] filters published addresses: only relay addresses are published by default.
77/// To change this behavior, use [`Builder::addr_filter`] and set it to e.g. [`AddrFilter::unfiltered`].
78/// This can be useful to enable publishing IP addresses if the iroh endpoint is reachable via public
79/// IP addresses.
80///
81/// [pkarr module]: iroh_dns::pkarr
82/// [`AddrFilter::relay_only`]: iroh::address_lookup::AddrFilter::relay_only
83/// [`AddrFilter::unfiltered`]: iroh::address_lookup::AddrFilter::unfiltered
84#[derive(Debug, Clone)]
85pub struct DhtAddressLookup(Arc<Inner>);
86
87#[derive(derive_more::Debug)]
88struct Inner {
89    /// Mainline DHT node.
90    dht: Dht,
91    /// The background task that periodically publishes the endpoint address.
92    ///
93    /// Due to [`AbortOnDropHandle`], this will be aborted when the Address Lookup is dropped.
94    task: Mutex<Option<AbortOnDropHandle<()>>>,
95    /// Optional keypair for signing the DNS packets.
96    ///
97    /// If this is None, the endpoint will not publish its address to the DHT.
98    secret_key: Option<SecretKey>,
99    /// Time-to-live value for the DNS packets.
100    ttl: u32,
101    /// Republish delay for the DHT.
102    republish_delay: Duration,
103    /// User supplied filter to filter and reorder addresses for publishing
104    filter: AddrFilter,
105}
106
107impl Inner {
108    async fn resolve_dht(
109        &self,
110        public_key: EndpointId,
111    ) -> Option<Result<AddressLookupItem, AddressLookupError>> {
112        tracing::info!("resolving {} from DHT", public_key.to_z32());
113
114        let maybe_item = self
115            .dht
116            .get_mutable_most_recent(public_key.as_bytes(), None)
117            .await
118            .ok()
119            .flatten();
120        match maybe_item {
121            Some(item) => {
122                let signed_packet = match mutable_item_to_signed_packet(&item) {
123                    Ok(packet) => packet,
124                    Err(err) => {
125                        tracing::debug!("failed to parse mutable item as signed packet: {err}");
126                        return None;
127                    }
128                };
129                match EndpointInfo::from_pkarr_signed_packet(&signed_packet) {
130                    Ok(endpoint_info) => {
131                        tracing::info!("discovered endpoint info {:?}", endpoint_info);
132                        Some(Ok(AddressLookupItem::new(endpoint_info, "pkarr", None)))
133                    }
134                    Err(_err) => {
135                        tracing::debug!("failed to parse signed packet as endpoint info");
136                        None
137                    }
138                }
139            }
140            None => {
141                tracing::debug!("no signed packet found");
142                None
143            }
144        }
145    }
146}
147
148/// Builder for [`DhtAddressLookup`].
149///
150/// By default, publishing to the DHT is enabled, and relay publishing is disabled.
151#[derive(Debug)]
152pub struct Builder {
153    dht_builder: Option<DhtBuilder>,
154    secret_key: Option<SecretKey>,
155    ttl: Option<u32>,
156    republish_delay: Duration,
157    enable_publish: bool,
158    addr_filter: AddrFilter,
159}
160
161impl Default for Builder {
162    fn default() -> Self {
163        Self {
164            dht_builder: None,
165            secret_key: None,
166            ttl: None,
167            republish_delay: REPUBLISH_DELAY,
168            enable_publish: true,
169            addr_filter: AddrFilter::relay_only(),
170        }
171    }
172}
173
174impl Builder {
175    /// Explicitly sets the DHT builder to use.
176    pub fn dht_builder(mut self, builder: DhtBuilder) -> Self {
177        self.dht_builder = Some(builder);
178        self
179    }
180
181    /// Sets the secret key to use for signing the DNS packets.
182    ///
183    /// Without a secret key, the endpoint will not publish its address to the DHT.
184    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
185        self.secret_key = Some(secret_key);
186        self
187    }
188
189    /// Sets the time-to-live value for the DNS packets.
190    pub fn ttl(mut self, ttl: u32) -> Self {
191        self.ttl = Some(ttl);
192        self
193    }
194
195    /// Sets the republish delay for the DHT.
196    pub fn republish_delay(mut self, republish_delay: Duration) -> Self {
197        self.republish_delay = republish_delay;
198        self
199    }
200
201    /// Disables publishing even if a secret key is set.
202    pub fn no_publish(mut self) -> Self {
203        self.enable_publish = false;
204        self
205    }
206
207    /// Sets the address filter to control which addresses are published to the DHT.
208    ///
209    /// By default [`AddrFilter::relay_only`] is used. This avoids leaking IP addresses
210    /// to the public DHT.
211    ///
212    /// It can be useful to override this with [`AddrFilter::unfiltered`], if this is
213    /// not a concern, e.g. when this endpoint runs on a machine with public IP
214    /// addresses and without a firewall. In such cases connecting to this endpoint
215    /// with just an [`EndpointId`] and DHT lookup can become faster and potentially
216    /// even bypass a relay connection entirely.
217    pub fn addr_filter(mut self, filter: AddrFilter) -> Self {
218        self.addr_filter = filter;
219        self
220    }
221
222    /// Builds the address lookup mechanism.
223    ///
224    /// Must be called from within a Tokio runtime context: the DHT's UDP socket
225    /// is registered with the Tokio reactor during construction.
226    pub fn build(self) -> Result<DhtAddressLookup, AddressLookupBuilderError> {
227        let dht_builder = self.dht_builder.unwrap_or_default();
228        let dht = dht_builder
229            .build()
230            .map_err(|e| AddressLookupBuilderError::from_err("pkarr-dht", e))?;
231        let ttl = self.ttl.unwrap_or(DEFAULT_PKARR_TTL);
232        let secret_key = self.secret_key.filter(|_| self.enable_publish);
233
234        Ok(DhtAddressLookup(Arc::new(Inner {
235            dht,
236            ttl,
237            secret_key,
238            republish_delay: self.republish_delay,
239            task: Default::default(),
240            filter: self.addr_filter,
241        })))
242    }
243}
244
245impl AddressLookupBuilder for Builder {
246    fn into_address_lookup(
247        self,
248        endpoint: &Endpoint,
249    ) -> Result<impl AddressLookup, AddressLookupBuilderError> {
250        self.secret_key(endpoint.secret_key().clone()).build()
251    }
252}
253
254impl DhtAddressLookup {
255    /// Creates a new builder for [`DhtAddressLookup`].
256    pub fn builder() -> Builder {
257        Builder::default()
258    }
259
260    /// Periodically publishes the endpoint address to the DHT.
261    ///
262    /// Publishes the current endpoint information to the DHT and refreshes it periodically.
263    ///
264    /// We publish without CAS. We assume a single logical writer per endpoint key.
265    async fn publish_loop(self, signed_packet: SignedPacket) {
266        let this = self;
267        let public_key = signed_packet.public_key();
268        let z32 = public_key.to_z32();
269        let item = signed_packet_to_mutable_item(&signed_packet);
270        let Ok(info) = this.0.dht.info().await else {
271            tracing::error!("failed to read dht info; stopping publish task");
272            return;
273        };
274        if info.routing_table_size() == 0 {
275            let Ok(bootstrapped) = this.0.dht.bootstrapped().await else {
276                tracing::error!("dht bootstrap probe failed; stopping publish task");
277                return;
278            };
279            if !bootstrapped {
280                tracing::warn!("dht bootstrap probe returned not ready");
281            }
282        } else {
283            // Coalesce bursts of updates: publish() replaces the previous task, so a short
284            // initial delay lets only the latest endpoint state proceed.
285            //
286            // We frequently see address changes within milliseconds, and we don't want to
287            // publish every intermediate state to the DHT.
288            //
289            // We only do this if we are bootstrapped, otherwise the bootstrapped call
290            // provides enough debouncing.
291            time::sleep(PUBLISH_DEBOUNCE_DELAY).await;
292        }
293
294        loop {
295            let res = this.0.dht.put_mutable(item.clone(), None).await;
296            match res {
297                Ok(_) => {
298                    tracing::debug!("pkarr publish success. published under {z32}");
299                }
300                Err(e) => {
301                    // we could do a smaller delay here, but in general DHT publish
302                    // not working is due to a network issue, and if the network changes
303                    // the task will be restarted anyway.
304                    //
305                    // Being unable to publish to the DHT is something that is expected
306                    // to happen from time to time, so this does not warrant a error log.
307                    tracing::warn!("pkarr publish error: {}", e);
308                }
309            }
310            time::sleep(this.0.republish_delay).await;
311        }
312    }
313}
314
315impl AddressLookup for DhtAddressLookup {
316    fn publish(&self, data: &EndpointData) {
317        let Some(keypair) = &self.0.secret_key else {
318            tracing::debug!("no keypair set, not publishing");
319            return;
320        };
321
322        // apply user-supplied filter
323        let data = data.apply_filter(&self.0.filter).into_owned();
324
325        if !data.has_addrs() {
326            tracing::debug!("no relay url or direct addresses in endpoint data, not publishing");
327            return;
328        }
329
330        tracing::debug!("publishing {data:?}");
331        let info = EndpointInfo::from_parts(keypair.public(), data);
332        let Ok(signed_packet) = info.to_pkarr_signed_packet(keypair, self.0.ttl) else {
333            tracing::warn!("failed to create signed packet");
334            return;
335        };
336        let this = self.clone();
337        let curr = task::spawn(this.publish_loop(signed_packet));
338        let mut task = self.0.task.lock().expect("poisoned");
339        *task = Some(AbortOnDropHandle::new(curr));
340    }
341
342    fn resolve(
343        &self,
344        endpoint_id: EndpointId,
345    ) -> Option<BoxStream<Result<AddressLookupItem, AddressLookupError>>> {
346        let z32 = endpoint_id.to_z32();
347        tracing::info!("resolving {} as {}", endpoint_id, z32);
348        let address_lookup = self.0.clone();
349        let stream =
350            n0_future::stream::once_future(
351                async move { address_lookup.resolve_dht(endpoint_id).await },
352            )
353            .filter_map(|x| x)
354            .boxed();
355        Some(stream)
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use std::collections::BTreeSet;
362
363    use iroh_base::{RelayUrl, TransportAddr};
364    use n0_error::{Result, StdResultExt};
365    use n0_mainline::Testnet;
366    use n0_tracing_test::traced_test;
367    use url::Url;
368
369    use super::*;
370
371    #[tokio::test]
372    #[ignore = "flaky"]
373    #[traced_test]
374    async fn dht_address_lookup_smoke() -> Result {
375        let secret = SecretKey::generate();
376        let testnet = Testnet::new(3).await.anyerr()?;
377        let mut dht_builder = DhtBuilder::default();
378        dht_builder.bootstrap(&testnet.bootstrap);
379        let address_lookup = DhtAddressLookup::builder()
380            .secret_key(secret.clone())
381            .dht_builder(dht_builder)
382            .addr_filter(AddrFilter::unfiltered())
383            .build()?;
384
385        let relay_url: RelayUrl = Url::parse("https://example.com").anyerr()?.into();
386
387        let data = EndpointData::from_iter([TransportAddr::Relay(relay_url.clone())]);
388        address_lookup.publish(&data);
389
390        // publish is fire and forget, so we have no way to wait until it is done.
391        tokio::time::timeout(Duration::from_secs(30), async move {
392            loop {
393                tokio::time::sleep(Duration::from_millis(200)).await;
394                let mut found_relay_urls = BTreeSet::new();
395                let items = address_lookup
396                    .resolve(secret.public())
397                    .unwrap()
398                    .collect::<Vec<_>>()
399                    .await;
400                for item in items.into_iter().flatten() {
401                    for url in item.relay_urls() {
402                        found_relay_urls.insert(url.clone());
403                    }
404                }
405                if found_relay_urls.contains(&relay_url) {
406                    break;
407                }
408            }
409        })
410        .await
411        .expect("timeout, relay_url not found on DHT");
412        Ok(())
413    }
414}