iroh_net/
discovery.rs

1//! Node address discovery.
2//!
3//! To connect to an iroh-net node a [`NodeAddr`] is needed, which may contain a
4//! [`RelayUrl`] or one or more *direct addresses* in addition to the [`NodeId`].
5//!
6//! Since there is a conversion from [`NodeId`] to [`NodeAddr`], you can also use
7//! connect directly with a [`NodeId`].
8//!
9//! For this to work however, the endpoint has to get the addressing  information by
10//! other means.  This can be done by manually calling [`Endpoint::add_node_addr`],
11//! but that still requires knowing the other addressing information.
12//!
13//! Node discovery is an automated system for an [`Endpoint`] to retrieve this addressing
14//! information.  Each iroh-net node will automatically publish their own addressing
15//! information.  Usually this means publishing which [`RelayUrl`] to use for their
16//! [`NodeId`], but they could also publish their direct addresses.
17//!
18//! The [`Discovery`] trait is used to define node discovery.  This allows multiple
19//! implementations to co-exist because there are many possible ways to implement this.
20//! Each [`Endpoint`] can use the discovery mechanisms most suitable to the application.
21//! The [`Builder::discovery`] method is used to add a discovery mechanism to an
22//! [`Endpoint`].
23//!
24//! Some generally useful discovery implementations are provided:
25//!
26//! - The [`DnsDiscovery`] which performs lookups via the standard DNS systems.  To publish
27//!   to this DNS server a [`PkarrPublisher`] is needed.  [Number 0] runs a public instance
28//!   of a [`PkarrPublisher`] with attached DNS server which is globally available and a
29//!   reliable default choice.
30//!
31//! - The [`PkarrResolver`] which can perform lookups from designated [pkarr relay servers]
32//!   using HTTP.
33//!
34//! - The [`LocalSwarmDiscovery`] discovers iroh-net nodes present on the local network,
35//!   very similar to mdNS.
36//!
37//! - The [`DhtDiscovery`] also uses the [`pkarr`] system but can also publish and lookup
38//!   records to/from the Mainline DHT.
39//!
40//! To use multiple discovery systems simultaneously use [`ConcurrentDiscovery`] which will
41//! perform lookups to all discovery systems at the same time.
42//!
43//! # Examples
44//!
45//! A very common setup is to enable DNS discovery, which needs to be done in two parts as a
46//! [`PkarrPublisher`] and [`DnsDiscovery`]:
47//!
48//! ```no_run
49//! use iroh_net::discovery::dns::DnsDiscovery;
50//! use iroh_net::discovery::pkarr::PkarrPublisher;
51//! use iroh_net::discovery::ConcurrentDiscovery;
52//! use iroh_net::key::SecretKey;
53//! use iroh_net::Endpoint;
54//!
55//! # async fn wrapper() -> anyhow::Result<()> {
56//! let secret_key = SecretKey::generate();
57//! let discovery = ConcurrentDiscovery::from_services(vec![
58//!     Box::new(PkarrPublisher::n0_dns(secret_key.clone())),
59//!     Box::new(DnsDiscovery::n0_dns()),
60//! ]);
61//! let ep = Endpoint::builder()
62//!     .secret_key(secret_key)
63//!     .discovery(Box::new(discovery))
64//!     .bind()
65//!     .await?;
66//! # Ok(())
67//! # }
68//! ```
69//!
70//! To also enable [`LocalSwarmDiscovery`], it can be added as another service in the
71//! [`ConcurrentDiscovery`]:
72//!
73//! ```no_run
74//! # use iroh_net::discovery::dns::DnsDiscovery;
75//! # use iroh_net::discovery::local_swarm_discovery::LocalSwarmDiscovery;
76//! # use iroh_net::discovery::pkarr::PkarrPublisher;
77//! # use iroh_net::discovery::ConcurrentDiscovery;
78//! # use iroh_net::key::SecretKey;
79//! #
80//! # async fn wrapper() -> anyhow::Result<()> {
81//! # let secret_key = SecretKey::generate();
82//! let discovery = ConcurrentDiscovery::from_services(vec![
83//!     Box::new(PkarrPublisher::n0_dns(secret_key.clone())),
84//!     Box::new(DnsDiscovery::n0_dns()),
85//!     Box::new(LocalSwarmDiscovery::new(secret_key.public())?),
86//! ]);
87//! # Ok(())
88//! # }
89//! ```
90//!
91//! [`RelayUrl`]: crate::relay::RelayUrl
92//! [`Builder::discovery`]: crate::endpoint::Builder::discovery
93//! [`DnsDiscovery`]: dns::DnsDiscovery
94//! [Number 0]: https://n0.computer
95//! [`PkarrResolver`]: pkarr::PkarrResolver
96//! [`PkarrPublisher`]: pkarr::PkarrPublisher
97//! [`LocalSwarmDiscovery`]: local_swarm_discovery::LocalSwarmDiscovery
98//! [`DhtDiscovery`]: pkarr::dht::DhtDiscovery
99//! [pkarr relay servers]: https://pkarr.org/#servers
100
101use std::time::Duration;
102
103use anyhow::{anyhow, ensure, Result};
104use futures_lite::stream::{Boxed as BoxStream, StreamExt};
105use iroh_base::node_addr::NodeAddr;
106use tokio::{sync::oneshot, task::JoinHandle};
107use tracing::{debug, error_span, warn, Instrument};
108
109use crate::{AddrInfo, Endpoint, NodeId};
110
111pub mod dns;
112
113#[cfg(feature = "discovery-local-network")]
114#[cfg_attr(iroh_docsrs, doc(cfg(feature = "discovery-local-network")))]
115pub mod local_swarm_discovery;
116pub mod pkarr;
117pub mod static_provider;
118
119/// Node discovery for [`super::Endpoint`].
120///
121/// This trait defines publishing and resolving addressing information for a [`NodeId`].
122/// This enables connecting to other nodes with only knowing the [`NodeId`], by using this
123/// [`Discovery`] system to look up the actual addressing information.  It is common for
124/// implementations to require each node to publish their own information before it can be
125/// looked up by other nodes.
126///
127/// The published addressing information can include both a [`RelayUrl`] and/or direct
128/// addresses.
129///
130/// To allow for discovery, the [`super::Endpoint`] will call `publish` whenever
131/// discovery information changes. If a discovery mechanism requires a periodic
132/// refresh, it should start its own task.
133///
134/// [`RelayUrl`]: crate::relay::RelayUrl
135pub trait Discovery: std::fmt::Debug + Send + Sync {
136    /// Publishes the given [`AddrInfo`] to the discovery mechanism.
137    ///
138    /// This is fire and forget, since the [`Endpoint`] can not wait for successful
139    /// publishing. If publishing is async, the implementation should start it's own task.
140    ///
141    /// This will be called from a tokio task, so it is safe to spawn new tasks.
142    /// These tasks will be run on the runtime of the [`super::Endpoint`].
143    fn publish(&self, _info: &AddrInfo) {}
144
145    /// Resolves the [`AddrInfo`] for the given [`NodeId`].
146    ///
147    /// Once the returned [`BoxStream`] is dropped, the service should stop any pending
148    /// work.
149    fn resolve(
150        &self,
151        _endpoint: Endpoint,
152        _node_id: NodeId,
153    ) -> Option<BoxStream<Result<DiscoveryItem>>> {
154        None
155    }
156
157    /// Subscribe to all addresses that get *passively* discovered.
158    ///
159    /// An implementation may choose to defer emitting passively discovered nodes
160    /// until the stream is actually polled. To avoid missing discovered nodes,
161    /// poll the stream as soon as possible.
162    ///
163    /// If you do not regularly poll the stream, you may miss discovered nodes.
164    ///
165    /// Any discovery systems that only discover when explicitly resolving a
166    /// specific [`NodeId`] do not need to implement this method. Any nodes or
167    /// addresses that are discovered by calling `resolve` should NOT be added
168    /// to the `subscribe` stream.
169    ///
170    /// Discovery systems that are capable of receiving information about [`NodeId`]s
171    /// and their [`AddrInfo`]s without explicitly calling `resolve`, i.e.,
172    /// systems that do "passive" discovery, should implement this method. If
173    /// `subscribe` is called multiple times, the passively discovered addresses
174    /// should be sent on all streams.
175    ///
176    /// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system
177    /// and add the discovered addresses to the internal address book as they arrive
178    /// on this stream.
179    fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
180        None
181    }
182}
183
184/// The results returned from [`Discovery::resolve`].
185#[derive(Debug, Clone)]
186pub struct DiscoveryItem {
187    /// The [`NodeId`] whose address we have discovered
188    pub node_id: NodeId,
189    /// A static string to identify the discovery source.
190    ///
191    /// Should be uniform per discovery service.
192    pub provenance: &'static str,
193    /// Optional timestamp when this node address info was last updated.
194    ///
195    /// Must be microseconds since the unix epoch.
196    // TODO(ramfox): this is currently unused. As we develop more `DiscoveryService`s, we may discover that we do not need this. It is only truly relevant when comparing `relay_urls`, since we can attempt to dial any number of socket addresses, but expect each node to have one "home relay" that we will attempt to contact them on. This means we would need some way to determine which relay url to choose between, if more than one relay url is reported.
197    pub last_updated: Option<u64>,
198    /// The address info for the node being resolved.
199    pub addr_info: AddrInfo,
200}
201
202/// A discovery service that combines multiple discovery sources.
203///
204/// The discovery services will resolve concurrently.
205#[derive(Debug, Default)]
206pub struct ConcurrentDiscovery {
207    services: Vec<Box<dyn Discovery>>,
208}
209
210impl ConcurrentDiscovery {
211    /// Creates an empty [`ConcurrentDiscovery`].
212    pub fn empty() -> Self {
213        Self::default()
214    }
215
216    /// Creates a new [`ConcurrentDiscovery`].
217    pub fn from_services(services: Vec<Box<dyn Discovery>>) -> Self {
218        Self { services }
219    }
220
221    /// Adds a [`Discovery`] service.
222    pub fn add(&mut self, service: impl Discovery + 'static) {
223        self.services.push(Box::new(service));
224    }
225}
226
227impl<T> From<T> for ConcurrentDiscovery
228where
229    T: IntoIterator<Item = Box<dyn Discovery>>,
230{
231    fn from(iter: T) -> Self {
232        let services = iter.into_iter().collect::<Vec<_>>();
233        Self { services }
234    }
235}
236
237impl Discovery for ConcurrentDiscovery {
238    fn publish(&self, info: &AddrInfo) {
239        for service in &self.services {
240            service.publish(info);
241        }
242    }
243
244    fn resolve(
245        &self,
246        endpoint: Endpoint,
247        node_id: NodeId,
248    ) -> Option<BoxStream<Result<DiscoveryItem>>> {
249        let streams = self
250            .services
251            .iter()
252            .filter_map(|service| service.resolve(endpoint.clone(), node_id));
253
254        let streams = futures_buffered::MergeBounded::from_iter(streams);
255        Some(Box::pin(streams))
256    }
257
258    fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
259        let mut streams = vec![];
260        for service in self.services.iter() {
261            if let Some(stream) = service.subscribe() {
262                streams.push(stream)
263            }
264        }
265
266        let streams = futures_buffered::MergeBounded::from_iter(streams);
267        Some(Box::pin(streams))
268    }
269}
270
271/// Maximum duration since the last control or data message received from an endpoint to make us
272/// start a discovery task.
273const MAX_AGE: Duration = Duration::from_secs(10);
274
275/// A wrapper around a tokio task which runs a node discovery.
276pub(super) struct DiscoveryTask {
277    on_first_rx: oneshot::Receiver<Result<()>>,
278    task: JoinHandle<()>,
279}
280
281impl DiscoveryTask {
282    /// Starts a discovery task.
283    pub(super) fn start(ep: Endpoint, node_id: NodeId) -> Result<Self> {
284        ensure!(ep.discovery().is_some(), "No discovery services configured");
285        let (on_first_tx, on_first_rx) = oneshot::channel();
286        let me = ep.node_id();
287        let task = tokio::task::spawn(
288            async move { Self::run(ep, node_id, on_first_tx).await }.instrument(
289                error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
290            ),
291        );
292        Ok(Self { task, on_first_rx })
293    }
294
295    /// Starts a discovery task after a delay and only if no path to the node was recently active.
296    ///
297    /// This returns `None` if we received data or control messages from the remote endpoint
298    /// recently enough. If not it returns a [`DiscoveryTask`].
299    ///
300    /// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again
301    /// if we recently received messages from remote endpoint. If true, the task will abort.
302    /// Otherwise, or if no `delay` is set, the discovery will be started.
303    pub(super) fn maybe_start_after_delay(
304        ep: &Endpoint,
305        node_id: NodeId,
306        delay: Option<Duration>,
307    ) -> Result<Option<Self>> {
308        // If discovery is not needed, don't even spawn a task.
309        if !Self::needs_discovery(ep, node_id) {
310            return Ok(None);
311        }
312        ensure!(ep.discovery().is_some(), "No discovery services configured");
313        let (on_first_tx, on_first_rx) = oneshot::channel();
314        let ep = ep.clone();
315        let me = ep.node_id();
316        let task = tokio::task::spawn(
317            async move {
318                // If delay is set, wait and recheck if discovery is needed. If not, early-exit.
319                if let Some(delay) = delay {
320                    tokio::time::sleep(delay).await;
321                    if !Self::needs_discovery(&ep, node_id) {
322                        debug!("no discovery needed, abort");
323                        on_first_tx.send(Ok(())).ok();
324                        return;
325                    }
326                }
327                Self::run(ep, node_id, on_first_tx).await
328            }
329            .instrument(
330                error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
331            ),
332        );
333        Ok(Some(Self { task, on_first_rx }))
334    }
335
336    /// Waits until the discovery task produced at least one result.
337    pub(super) async fn first_arrived(&mut self) -> Result<()> {
338        let fut = &mut self.on_first_rx;
339        fut.await??;
340        Ok(())
341    }
342
343    /// Cancels the discovery task.
344    pub(super) fn cancel(&self) {
345        self.task.abort();
346    }
347
348    fn create_stream(ep: &Endpoint, node_id: NodeId) -> Result<BoxStream<Result<DiscoveryItem>>> {
349        let discovery = ep
350            .discovery()
351            .ok_or_else(|| anyhow!("No discovery service configured"))?;
352        let stream = discovery
353            .resolve(ep.clone(), node_id)
354            .ok_or_else(|| anyhow!("No discovery service can resolve node {node_id}",))?;
355        Ok(stream)
356    }
357
358    /// We need discovery if we have no paths to the node, or if the paths we do have
359    /// have timed out.
360    fn needs_discovery(ep: &Endpoint, node_id: NodeId) -> bool {
361        match ep.remote_info(node_id) {
362            // No info means no path to node -> start discovery.
363            None => true,
364            Some(info) => {
365                match (
366                    info.last_received(),
367                    info.relay_url.as_ref().and_then(|r| r.last_alive),
368                ) {
369                    // No path to node -> start discovery.
370                    (None, None) => true,
371                    // If we haven't received on direct addresses or the relay for MAX_AGE,
372                    // start discovery.
373                    (Some(elapsed), Some(elapsed_relay)) => {
374                        elapsed > MAX_AGE && elapsed_relay > MAX_AGE
375                    }
376                    (Some(elapsed), _) | (_, Some(elapsed)) => elapsed > MAX_AGE,
377                }
378            }
379        }
380    }
381
382    async fn run(ep: Endpoint, node_id: NodeId, on_first_tx: oneshot::Sender<Result<()>>) {
383        let mut stream = match Self::create_stream(&ep, node_id) {
384            Ok(stream) => stream,
385            Err(err) => {
386                on_first_tx.send(Err(err)).ok();
387                return;
388            }
389        };
390        let mut on_first_tx = Some(on_first_tx);
391        debug!("discovery: start");
392        loop {
393            let next = tokio::select! {
394                _ = ep.cancelled() => break,
395                next = stream.next() => next
396            };
397            match next {
398                Some(Ok(r)) => {
399                    if r.addr_info.is_empty() {
400                        debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: empty address found");
401                        continue;
402                    }
403                    debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: new address found");
404                    let addr = NodeAddr {
405                        info: r.addr_info,
406                        node_id,
407                    };
408                    ep.add_node_addr_with_source(addr, r.provenance).ok();
409                    if let Some(tx) = on_first_tx.take() {
410                        tx.send(Ok(())).ok();
411                    }
412                }
413                Some(Err(err)) => {
414                    warn!(?err, "discovery service produced error");
415                    break;
416                }
417                None => break,
418            }
419        }
420        if let Some(tx) = on_first_tx.take() {
421            let err = anyhow!("Discovery produced no results for {}", node_id.fmt_short());
422            tx.send(Err(err)).ok();
423        }
424    }
425}
426
427impl Drop for DiscoveryTask {
428    fn drop(&mut self) {
429        self.task.abort();
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use std::{
436        collections::{BTreeSet, HashMap},
437        net::SocketAddr,
438        sync::Arc,
439        time::SystemTime,
440    };
441
442    use parking_lot::Mutex;
443    use rand::Rng;
444    use tokio_util::task::AbortOnDropHandle;
445
446    use super::*;
447    use crate::{key::SecretKey, relay::RelayMode};
448
449    #[derive(Debug, Clone, Default)]
450    struct TestDiscoveryShared {
451        nodes: Arc<Mutex<HashMap<NodeId, (AddrInfo, u64)>>>,
452    }
453    impl TestDiscoveryShared {
454        pub fn create_discovery(&self, node_id: NodeId) -> TestDiscovery {
455            TestDiscovery {
456                node_id,
457                shared: self.clone(),
458                publish: true,
459                resolve_wrong: false,
460                delay: Duration::from_millis(200),
461            }
462        }
463
464        pub fn create_lying_discovery(&self, node_id: NodeId) -> TestDiscovery {
465            TestDiscovery {
466                node_id,
467                shared: self.clone(),
468                publish: false,
469                resolve_wrong: true,
470                delay: Duration::from_millis(100),
471            }
472        }
473    }
474    #[derive(Debug)]
475    struct TestDiscovery {
476        node_id: NodeId,
477        shared: TestDiscoveryShared,
478        publish: bool,
479        resolve_wrong: bool,
480        delay: Duration,
481    }
482
483    impl Discovery for TestDiscovery {
484        fn publish(&self, info: &AddrInfo) {
485            if !self.publish {
486                return;
487            }
488            let now = system_time_now();
489            self.shared
490                .nodes
491                .lock()
492                .insert(self.node_id, (info.clone(), now));
493        }
494
495        fn resolve(
496            &self,
497            endpoint: Endpoint,
498            node_id: NodeId,
499        ) -> Option<BoxStream<Result<DiscoveryItem>>> {
500            let addr_info = match self.resolve_wrong {
501                false => self.shared.nodes.lock().get(&node_id).cloned(),
502                true => {
503                    let ts = system_time_now() - 100_000;
504                    let port: u16 = rand::thread_rng().gen_range(10_000..20_000);
505                    // "240.0.0.0/4" is reserved and unreachable
506                    let addr: SocketAddr = format!("240.0.0.1:{port}").parse().unwrap();
507                    let addr_info = AddrInfo {
508                        relay_url: None,
509                        direct_addresses: BTreeSet::from([addr]),
510                    };
511                    Some((addr_info, ts))
512                }
513            };
514            let stream = match addr_info {
515                Some((addr_info, ts)) => {
516                    let item = DiscoveryItem {
517                        node_id,
518                        provenance: "test-disco",
519                        last_updated: Some(ts),
520                        addr_info,
521                    };
522                    let delay = self.delay;
523                    let fut = async move {
524                        tokio::time::sleep(delay).await;
525                        tracing::debug!(
526                            "resolve on {}: {} = {item:?}",
527                            endpoint.node_id().fmt_short(),
528                            node_id.fmt_short()
529                        );
530                        Ok(item)
531                    };
532                    futures_lite::stream::once_future(fut).boxed()
533                }
534                None => futures_lite::stream::empty().boxed(),
535            };
536            Some(stream)
537        }
538    }
539
540    #[derive(Debug)]
541    struct EmptyDiscovery;
542    impl Discovery for EmptyDiscovery {
543        fn publish(&self, _info: &AddrInfo) {}
544
545        fn resolve(
546            &self,
547            _endpoint: Endpoint,
548            _node_id: NodeId,
549        ) -> Option<BoxStream<Result<DiscoveryItem>>> {
550            Some(futures_lite::stream::empty().boxed())
551        }
552    }
553
554    const TEST_ALPN: &[u8] = b"n0/iroh/test";
555
556    /// This is a smoke test for our discovery mechanism.
557    #[tokio::test]
558    async fn endpoint_discovery_simple_shared() -> anyhow::Result<()> {
559        let _guard = iroh_test::logging::setup();
560        let disco_shared = TestDiscoveryShared::default();
561        let (ep1, _guard1) = {
562            let secret = SecretKey::generate();
563            let disco = disco_shared.create_discovery(secret.public());
564            new_endpoint(secret, disco).await
565        };
566        let (ep2, _guard2) = {
567            let secret = SecretKey::generate();
568            let disco = disco_shared.create_discovery(secret.public());
569            new_endpoint(secret, disco).await
570        };
571        let ep1_addr = NodeAddr::new(ep1.node_id());
572        // wait for out address to be updated and thus published at least once
573        ep1.node_addr().await?;
574        let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
575        Ok(())
576    }
577
578    /// This test adds an empty discovery which provides no addresses.
579    #[tokio::test]
580    async fn endpoint_discovery_combined_with_empty() -> anyhow::Result<()> {
581        let _guard = iroh_test::logging::setup();
582        let disco_shared = TestDiscoveryShared::default();
583        let (ep1, _guard1) = {
584            let secret = SecretKey::generate();
585            let disco = disco_shared.create_discovery(secret.public());
586            new_endpoint(secret, disco).await
587        };
588        let (ep2, _guard2) = {
589            let secret = SecretKey::generate();
590            let disco1 = EmptyDiscovery;
591            let disco2 = disco_shared.create_discovery(secret.public());
592            let mut disco = ConcurrentDiscovery::empty();
593            disco.add(disco1);
594            disco.add(disco2);
595            new_endpoint(secret, disco).await
596        };
597        let ep1_addr = NodeAddr::new(ep1.node_id());
598        // wait for out address to be updated and thus published at least once
599        ep1.node_addr().await?;
600        let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
601        Ok(())
602    }
603
604    /// This test adds a "lying" discovery which provides a wrong address.
605    /// This is to make sure that as long as one of the discoveries returns a working address, we
606    /// will connect successfully.
607    #[tokio::test]
608    async fn endpoint_discovery_combined_with_empty_and_wrong() -> anyhow::Result<()> {
609        let _guard = iroh_test::logging::setup();
610        let disco_shared = TestDiscoveryShared::default();
611        let (ep1, _guard1) = {
612            let secret = SecretKey::generate();
613            let disco = disco_shared.create_discovery(secret.public());
614            new_endpoint(secret, disco).await
615        };
616        let (ep2, _guard2) = {
617            let secret = SecretKey::generate();
618            let disco1 = EmptyDiscovery;
619            let disco2 = disco_shared.create_lying_discovery(secret.public());
620            let disco3 = disco_shared.create_discovery(secret.public());
621            let mut disco = ConcurrentDiscovery::empty();
622            disco.add(disco1);
623            disco.add(disco2);
624            disco.add(disco3);
625            new_endpoint(secret, disco).await
626        };
627        let ep1_addr = NodeAddr::new(ep1.node_id());
628        // wait for out address to be updated and thus published at least once
629        ep1.node_addr().await?;
630        let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
631        Ok(())
632    }
633
634    /// This test only has the "lying" discovery. It is here to make sure that this actually fails.
635    #[tokio::test]
636    async fn endpoint_discovery_combined_wrong_only() -> anyhow::Result<()> {
637        let _guard = iroh_test::logging::setup();
638        let disco_shared = TestDiscoveryShared::default();
639        let (ep1, _guard1) = {
640            let secret = SecretKey::generate();
641            let disco = disco_shared.create_discovery(secret.public());
642            new_endpoint(secret, disco).await
643        };
644        let (ep2, _guard2) = {
645            let secret = SecretKey::generate();
646            let disco1 = disco_shared.create_lying_discovery(secret.public());
647            let disco = ConcurrentDiscovery::from_services(vec![Box::new(disco1)]);
648            new_endpoint(secret, disco).await
649        };
650        let ep1_addr = NodeAddr::new(ep1.node_id());
651        // wait for out address to be updated and thus published at least once
652        ep1.node_addr().await?;
653        let res = ep2.connect(ep1_addr, TEST_ALPN).await;
654        assert!(res.is_err());
655        Ok(())
656    }
657
658    /// This test first adds a wrong address manually (e.g. from an outdated&node_id ticket).
659    /// Connect should still succeed because the discovery service will be invoked (after a delay).
660    #[tokio::test]
661    async fn endpoint_discovery_with_wrong_existing_addr() -> anyhow::Result<()> {
662        let _guard = iroh_test::logging::setup();
663        let disco_shared = TestDiscoveryShared::default();
664        let (ep1, _guard1) = {
665            let secret = SecretKey::generate();
666            let disco = disco_shared.create_discovery(secret.public());
667            new_endpoint(secret, disco).await
668        };
669        let (ep2, _guard2) = {
670            let secret = SecretKey::generate();
671            let disco = disco_shared.create_discovery(secret.public());
672            new_endpoint(secret, disco).await
673        };
674        // wait for out address to be updated and thus published at least once
675        ep1.node_addr().await?;
676        let ep1_wrong_addr = NodeAddr {
677            node_id: ep1.node_id(),
678            info: AddrInfo {
679                relay_url: None,
680                direct_addresses: BTreeSet::from(["240.0.0.1:1000".parse().unwrap()]),
681            },
682        };
683        let _conn = ep2.connect(ep1_wrong_addr, TEST_ALPN).await?;
684        Ok(())
685    }
686
687    async fn new_endpoint(
688        secret: SecretKey,
689        disco: impl Discovery + 'static,
690    ) -> (Endpoint, AbortOnDropHandle<anyhow::Result<()>>) {
691        let ep = Endpoint::builder()
692            .secret_key(secret)
693            .discovery(Box::new(disco))
694            .relay_mode(RelayMode::Disabled)
695            .alpns(vec![TEST_ALPN.to_vec()])
696            .bind()
697            .await
698            .unwrap();
699
700        let handle = tokio::spawn({
701            let ep = ep.clone();
702            async move {
703                // we skip accept() errors, they can be caused by retransmits
704                while let Some(connecting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
705                    let _conn = connecting.await?;
706                    // Just accept incoming connections, but don't do anything with them.
707                }
708
709                anyhow::Ok(())
710            }
711        });
712
713        (ep, AbortOnDropHandle::new(handle))
714    }
715
716    fn system_time_now() -> u64 {
717        SystemTime::now()
718            .duration_since(SystemTime::UNIX_EPOCH)
719            .expect("time drift")
720            .as_micros() as u64
721    }
722}
723
724/// This module contains end-to-end tests for DNS node discovery.
725///
726/// The tests run a minimal test DNS server to resolve against, and a minimal pkarr relay to
727/// publish to. The DNS and pkarr servers share their state.
728#[cfg(test)]
729mod test_dns_pkarr {
730    use std::time::Duration;
731
732    use anyhow::Result;
733    use iroh_base::key::SecretKey;
734    use tokio_util::task::AbortOnDropHandle;
735
736    use crate::{
737        discovery::pkarr::PkarrPublisher,
738        dns::{node_info::NodeInfo, ResolverExt},
739        relay::{RelayMap, RelayMode},
740        test_utils::{
741            dns_server::{create_dns_resolver, run_dns_server},
742            pkarr_dns_state::State,
743            run_relay_server, DnsPkarrServer,
744        },
745        AddrInfo, Endpoint, NodeAddr,
746    };
747
748    const PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
749
750    #[tokio::test]
751    async fn dns_resolve() -> Result<()> {
752        let _logging_guard = iroh_test::logging::setup();
753
754        let origin = "testdns.example".to_string();
755        let state = State::new(origin.clone());
756        let (nameserver, _dns_drop_guard) = run_dns_server(state.clone()).await?;
757
758        let secret_key = SecretKey::generate();
759        let node_info = NodeInfo::new(
760            secret_key.public(),
761            Some("https://relay.example".parse().unwrap()),
762            Default::default(),
763        );
764        let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?;
765        state.upsert(signed_packet)?;
766
767        let resolver = create_dns_resolver(nameserver)?;
768        let resolved = resolver.lookup_by_id(&node_info.node_id, &origin).await?;
769
770        assert_eq!(resolved, node_info.into());
771
772        Ok(())
773    }
774
775    #[tokio::test]
776    async fn pkarr_publish_dns_resolve() -> Result<()> {
777        let _logging_guard = iroh_test::logging::setup();
778
779        let origin = "testdns.example".to_string();
780
781        let dns_pkarr_server = DnsPkarrServer::run_with_origin(origin.clone()).await?;
782
783        let secret_key = SecretKey::generate();
784        let node_id = secret_key.public();
785
786        let addr_info = AddrInfo {
787            relay_url: Some("https://relay.example".parse().unwrap()),
788            ..Default::default()
789        };
790
791        let resolver = create_dns_resolver(dns_pkarr_server.nameserver)?;
792        let publisher = PkarrPublisher::new(secret_key, dns_pkarr_server.pkarr_url.clone());
793        // does not block, update happens in background task
794        publisher.update_addr_info(&addr_info);
795        // wait until our shared state received the update from pkarr publishing
796        dns_pkarr_server.on_node(&node_id, PUBLISH_TIMEOUT).await?;
797        let resolved = resolver.lookup_by_id(&node_id, &origin).await?;
798
799        let expected = NodeAddr {
800            info: addr_info,
801            node_id,
802        };
803
804        assert_eq!(resolved, expected);
805        Ok(())
806    }
807
808    const TEST_ALPN: &[u8] = b"TEST";
809
810    #[tokio::test]
811    async fn pkarr_publish_dns_discover() -> Result<()> {
812        let _logging_guard = iroh_test::logging::setup();
813
814        let dns_pkarr_server = DnsPkarrServer::run().await?;
815        let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
816
817        let (ep1, _guard1) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
818        let (ep2, _guard2) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
819
820        // wait until our shared state received the update from pkarr publishing
821        dns_pkarr_server
822            .on_node(&ep1.node_id(), PUBLISH_TIMEOUT)
823            .await?;
824
825        // we connect only by node id!
826        let res = ep2.connect(ep1.node_id(), TEST_ALPN).await;
827        assert!(res.is_ok(), "connection established");
828        Ok(())
829    }
830
831    #[tokio::test]
832    async fn pkarr_publish_dns_discover_empty_node_addr() -> Result<()> {
833        let _logging_guard = iroh_test::logging::setup();
834
835        let dns_pkarr_server = DnsPkarrServer::run().await?;
836        let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
837
838        let (ep1, _guard1) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
839        let (ep2, _guard2) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
840
841        // wait until our shared state received the update from pkarr publishing
842        dns_pkarr_server
843            .on_node(&ep1.node_id(), PUBLISH_TIMEOUT)
844            .await?;
845
846        // we connect only by node id!
847        let res = ep2.connect(ep1.node_id(), TEST_ALPN).await;
848        assert!(res.is_ok(), "connection established");
849        Ok(())
850    }
851
852    async fn ep_with_discovery(
853        relay_map: &RelayMap,
854        dns_pkarr_server: &DnsPkarrServer,
855    ) -> Result<(Endpoint, AbortOnDropHandle<Result<()>>)> {
856        let secret_key = SecretKey::generate();
857        let ep = Endpoint::builder()
858            .relay_mode(RelayMode::Custom(relay_map.clone()))
859            .insecure_skip_relay_cert_verify(true)
860            .secret_key(secret_key.clone())
861            .alpns(vec![TEST_ALPN.to_vec()])
862            .dns_resolver(dns_pkarr_server.dns_resolver())
863            .discovery(dns_pkarr_server.discovery(secret_key))
864            .bind()
865            .await?;
866
867        let handle = tokio::spawn({
868            let ep = ep.clone();
869            async move {
870                // we skip accept() errors, they can be caused by retransmits
871                while let Some(connecting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
872                    let _conn = connecting.await?;
873                    // Just accept incoming connections, but don't do anything with them.
874                }
875
876                anyhow::Ok(())
877            }
878        });
879
880        Ok((ep, AbortOnDropHandle::new(handle)))
881    }
882}