1use std::time::Duration;
102
103use anyhow::{anyhow, ensure, Result};
104use futures_lite::stream::{Boxed as BoxStream, StreamExt};
105use iroh_base::node_addr::NodeAddr;
106use tokio::{sync::oneshot, task::JoinHandle};
107use tracing::{debug, error_span, warn, Instrument};
108
109use crate::{AddrInfo, Endpoint, NodeId};
110
111pub mod dns;
112
113#[cfg(feature = "discovery-local-network")]
114#[cfg_attr(iroh_docsrs, doc(cfg(feature = "discovery-local-network")))]
115pub mod local_swarm_discovery;
116pub mod pkarr;
117pub mod static_provider;
118
119pub trait Discovery: std::fmt::Debug + Send + Sync {
136 fn publish(&self, _info: &AddrInfo) {}
144
145 fn resolve(
150 &self,
151 _endpoint: Endpoint,
152 _node_id: NodeId,
153 ) -> Option<BoxStream<Result<DiscoveryItem>>> {
154 None
155 }
156
157 fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
180 None
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct DiscoveryItem {
187 pub node_id: NodeId,
189 pub provenance: &'static str,
193 pub last_updated: Option<u64>,
198 pub addr_info: AddrInfo,
200}
201
202#[derive(Debug, Default)]
206pub struct ConcurrentDiscovery {
207 services: Vec<Box<dyn Discovery>>,
208}
209
210impl ConcurrentDiscovery {
211 pub fn empty() -> Self {
213 Self::default()
214 }
215
216 pub fn from_services(services: Vec<Box<dyn Discovery>>) -> Self {
218 Self { services }
219 }
220
221 pub fn add(&mut self, service: impl Discovery + 'static) {
223 self.services.push(Box::new(service));
224 }
225}
226
227impl<T> From<T> for ConcurrentDiscovery
228where
229 T: IntoIterator<Item = Box<dyn Discovery>>,
230{
231 fn from(iter: T) -> Self {
232 let services = iter.into_iter().collect::<Vec<_>>();
233 Self { services }
234 }
235}
236
237impl Discovery for ConcurrentDiscovery {
238 fn publish(&self, info: &AddrInfo) {
239 for service in &self.services {
240 service.publish(info);
241 }
242 }
243
244 fn resolve(
245 &self,
246 endpoint: Endpoint,
247 node_id: NodeId,
248 ) -> Option<BoxStream<Result<DiscoveryItem>>> {
249 let streams = self
250 .services
251 .iter()
252 .filter_map(|service| service.resolve(endpoint.clone(), node_id));
253
254 let streams = futures_buffered::MergeBounded::from_iter(streams);
255 Some(Box::pin(streams))
256 }
257
258 fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
259 let mut streams = vec![];
260 for service in self.services.iter() {
261 if let Some(stream) = service.subscribe() {
262 streams.push(stream)
263 }
264 }
265
266 let streams = futures_buffered::MergeBounded::from_iter(streams);
267 Some(Box::pin(streams))
268 }
269}
270
271const MAX_AGE: Duration = Duration::from_secs(10);
274
275pub(super) struct DiscoveryTask {
277 on_first_rx: oneshot::Receiver<Result<()>>,
278 task: JoinHandle<()>,
279}
280
281impl DiscoveryTask {
282 pub(super) fn start(ep: Endpoint, node_id: NodeId) -> Result<Self> {
284 ensure!(ep.discovery().is_some(), "No discovery services configured");
285 let (on_first_tx, on_first_rx) = oneshot::channel();
286 let me = ep.node_id();
287 let task = tokio::task::spawn(
288 async move { Self::run(ep, node_id, on_first_tx).await }.instrument(
289 error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
290 ),
291 );
292 Ok(Self { task, on_first_rx })
293 }
294
295 pub(super) fn maybe_start_after_delay(
304 ep: &Endpoint,
305 node_id: NodeId,
306 delay: Option<Duration>,
307 ) -> Result<Option<Self>> {
308 if !Self::needs_discovery(ep, node_id) {
310 return Ok(None);
311 }
312 ensure!(ep.discovery().is_some(), "No discovery services configured");
313 let (on_first_tx, on_first_rx) = oneshot::channel();
314 let ep = ep.clone();
315 let me = ep.node_id();
316 let task = tokio::task::spawn(
317 async move {
318 if let Some(delay) = delay {
320 tokio::time::sleep(delay).await;
321 if !Self::needs_discovery(&ep, node_id) {
322 debug!("no discovery needed, abort");
323 on_first_tx.send(Ok(())).ok();
324 return;
325 }
326 }
327 Self::run(ep, node_id, on_first_tx).await
328 }
329 .instrument(
330 error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
331 ),
332 );
333 Ok(Some(Self { task, on_first_rx }))
334 }
335
336 pub(super) async fn first_arrived(&mut self) -> Result<()> {
338 let fut = &mut self.on_first_rx;
339 fut.await??;
340 Ok(())
341 }
342
343 pub(super) fn cancel(&self) {
345 self.task.abort();
346 }
347
348 fn create_stream(ep: &Endpoint, node_id: NodeId) -> Result<BoxStream<Result<DiscoveryItem>>> {
349 let discovery = ep
350 .discovery()
351 .ok_or_else(|| anyhow!("No discovery service configured"))?;
352 let stream = discovery
353 .resolve(ep.clone(), node_id)
354 .ok_or_else(|| anyhow!("No discovery service can resolve node {node_id}",))?;
355 Ok(stream)
356 }
357
358 fn needs_discovery(ep: &Endpoint, node_id: NodeId) -> bool {
361 match ep.remote_info(node_id) {
362 None => true,
364 Some(info) => {
365 match (
366 info.last_received(),
367 info.relay_url.as_ref().and_then(|r| r.last_alive),
368 ) {
369 (None, None) => true,
371 (Some(elapsed), Some(elapsed_relay)) => {
374 elapsed > MAX_AGE && elapsed_relay > MAX_AGE
375 }
376 (Some(elapsed), _) | (_, Some(elapsed)) => elapsed > MAX_AGE,
377 }
378 }
379 }
380 }
381
382 async fn run(ep: Endpoint, node_id: NodeId, on_first_tx: oneshot::Sender<Result<()>>) {
383 let mut stream = match Self::create_stream(&ep, node_id) {
384 Ok(stream) => stream,
385 Err(err) => {
386 on_first_tx.send(Err(err)).ok();
387 return;
388 }
389 };
390 let mut on_first_tx = Some(on_first_tx);
391 debug!("discovery: start");
392 loop {
393 let next = tokio::select! {
394 _ = ep.cancelled() => break,
395 next = stream.next() => next
396 };
397 match next {
398 Some(Ok(r)) => {
399 if r.addr_info.is_empty() {
400 debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: empty address found");
401 continue;
402 }
403 debug!(provenance = %r.provenance, addr = ?r.addr_info, "discovery: new address found");
404 let addr = NodeAddr {
405 info: r.addr_info,
406 node_id,
407 };
408 ep.add_node_addr_with_source(addr, r.provenance).ok();
409 if let Some(tx) = on_first_tx.take() {
410 tx.send(Ok(())).ok();
411 }
412 }
413 Some(Err(err)) => {
414 warn!(?err, "discovery service produced error");
415 break;
416 }
417 None => break,
418 }
419 }
420 if let Some(tx) = on_first_tx.take() {
421 let err = anyhow!("Discovery produced no results for {}", node_id.fmt_short());
422 tx.send(Err(err)).ok();
423 }
424 }
425}
426
427impl Drop for DiscoveryTask {
428 fn drop(&mut self) {
429 self.task.abort();
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use std::{
436 collections::{BTreeSet, HashMap},
437 net::SocketAddr,
438 sync::Arc,
439 time::SystemTime,
440 };
441
442 use parking_lot::Mutex;
443 use rand::Rng;
444 use tokio_util::task::AbortOnDropHandle;
445
446 use super::*;
447 use crate::{key::SecretKey, relay::RelayMode};
448
449 #[derive(Debug, Clone, Default)]
450 struct TestDiscoveryShared {
451 nodes: Arc<Mutex<HashMap<NodeId, (AddrInfo, u64)>>>,
452 }
453 impl TestDiscoveryShared {
454 pub fn create_discovery(&self, node_id: NodeId) -> TestDiscovery {
455 TestDiscovery {
456 node_id,
457 shared: self.clone(),
458 publish: true,
459 resolve_wrong: false,
460 delay: Duration::from_millis(200),
461 }
462 }
463
464 pub fn create_lying_discovery(&self, node_id: NodeId) -> TestDiscovery {
465 TestDiscovery {
466 node_id,
467 shared: self.clone(),
468 publish: false,
469 resolve_wrong: true,
470 delay: Duration::from_millis(100),
471 }
472 }
473 }
474 #[derive(Debug)]
475 struct TestDiscovery {
476 node_id: NodeId,
477 shared: TestDiscoveryShared,
478 publish: bool,
479 resolve_wrong: bool,
480 delay: Duration,
481 }
482
483 impl Discovery for TestDiscovery {
484 fn publish(&self, info: &AddrInfo) {
485 if !self.publish {
486 return;
487 }
488 let now = system_time_now();
489 self.shared
490 .nodes
491 .lock()
492 .insert(self.node_id, (info.clone(), now));
493 }
494
495 fn resolve(
496 &self,
497 endpoint: Endpoint,
498 node_id: NodeId,
499 ) -> Option<BoxStream<Result<DiscoveryItem>>> {
500 let addr_info = match self.resolve_wrong {
501 false => self.shared.nodes.lock().get(&node_id).cloned(),
502 true => {
503 let ts = system_time_now() - 100_000;
504 let port: u16 = rand::thread_rng().gen_range(10_000..20_000);
505 let addr: SocketAddr = format!("240.0.0.1:{port}").parse().unwrap();
507 let addr_info = AddrInfo {
508 relay_url: None,
509 direct_addresses: BTreeSet::from([addr]),
510 };
511 Some((addr_info, ts))
512 }
513 };
514 let stream = match addr_info {
515 Some((addr_info, ts)) => {
516 let item = DiscoveryItem {
517 node_id,
518 provenance: "test-disco",
519 last_updated: Some(ts),
520 addr_info,
521 };
522 let delay = self.delay;
523 let fut = async move {
524 tokio::time::sleep(delay).await;
525 tracing::debug!(
526 "resolve on {}: {} = {item:?}",
527 endpoint.node_id().fmt_short(),
528 node_id.fmt_short()
529 );
530 Ok(item)
531 };
532 futures_lite::stream::once_future(fut).boxed()
533 }
534 None => futures_lite::stream::empty().boxed(),
535 };
536 Some(stream)
537 }
538 }
539
540 #[derive(Debug)]
541 struct EmptyDiscovery;
542 impl Discovery for EmptyDiscovery {
543 fn publish(&self, _info: &AddrInfo) {}
544
545 fn resolve(
546 &self,
547 _endpoint: Endpoint,
548 _node_id: NodeId,
549 ) -> Option<BoxStream<Result<DiscoveryItem>>> {
550 Some(futures_lite::stream::empty().boxed())
551 }
552 }
553
554 const TEST_ALPN: &[u8] = b"n0/iroh/test";
555
556 #[tokio::test]
558 async fn endpoint_discovery_simple_shared() -> anyhow::Result<()> {
559 let _guard = iroh_test::logging::setup();
560 let disco_shared = TestDiscoveryShared::default();
561 let (ep1, _guard1) = {
562 let secret = SecretKey::generate();
563 let disco = disco_shared.create_discovery(secret.public());
564 new_endpoint(secret, disco).await
565 };
566 let (ep2, _guard2) = {
567 let secret = SecretKey::generate();
568 let disco = disco_shared.create_discovery(secret.public());
569 new_endpoint(secret, disco).await
570 };
571 let ep1_addr = NodeAddr::new(ep1.node_id());
572 ep1.node_addr().await?;
574 let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
575 Ok(())
576 }
577
578 #[tokio::test]
580 async fn endpoint_discovery_combined_with_empty() -> anyhow::Result<()> {
581 let _guard = iroh_test::logging::setup();
582 let disco_shared = TestDiscoveryShared::default();
583 let (ep1, _guard1) = {
584 let secret = SecretKey::generate();
585 let disco = disco_shared.create_discovery(secret.public());
586 new_endpoint(secret, disco).await
587 };
588 let (ep2, _guard2) = {
589 let secret = SecretKey::generate();
590 let disco1 = EmptyDiscovery;
591 let disco2 = disco_shared.create_discovery(secret.public());
592 let mut disco = ConcurrentDiscovery::empty();
593 disco.add(disco1);
594 disco.add(disco2);
595 new_endpoint(secret, disco).await
596 };
597 let ep1_addr = NodeAddr::new(ep1.node_id());
598 ep1.node_addr().await?;
600 let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
601 Ok(())
602 }
603
604 #[tokio::test]
608 async fn endpoint_discovery_combined_with_empty_and_wrong() -> anyhow::Result<()> {
609 let _guard = iroh_test::logging::setup();
610 let disco_shared = TestDiscoveryShared::default();
611 let (ep1, _guard1) = {
612 let secret = SecretKey::generate();
613 let disco = disco_shared.create_discovery(secret.public());
614 new_endpoint(secret, disco).await
615 };
616 let (ep2, _guard2) = {
617 let secret = SecretKey::generate();
618 let disco1 = EmptyDiscovery;
619 let disco2 = disco_shared.create_lying_discovery(secret.public());
620 let disco3 = disco_shared.create_discovery(secret.public());
621 let mut disco = ConcurrentDiscovery::empty();
622 disco.add(disco1);
623 disco.add(disco2);
624 disco.add(disco3);
625 new_endpoint(secret, disco).await
626 };
627 let ep1_addr = NodeAddr::new(ep1.node_id());
628 ep1.node_addr().await?;
630 let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
631 Ok(())
632 }
633
634 #[tokio::test]
636 async fn endpoint_discovery_combined_wrong_only() -> anyhow::Result<()> {
637 let _guard = iroh_test::logging::setup();
638 let disco_shared = TestDiscoveryShared::default();
639 let (ep1, _guard1) = {
640 let secret = SecretKey::generate();
641 let disco = disco_shared.create_discovery(secret.public());
642 new_endpoint(secret, disco).await
643 };
644 let (ep2, _guard2) = {
645 let secret = SecretKey::generate();
646 let disco1 = disco_shared.create_lying_discovery(secret.public());
647 let disco = ConcurrentDiscovery::from_services(vec![Box::new(disco1)]);
648 new_endpoint(secret, disco).await
649 };
650 let ep1_addr = NodeAddr::new(ep1.node_id());
651 ep1.node_addr().await?;
653 let res = ep2.connect(ep1_addr, TEST_ALPN).await;
654 assert!(res.is_err());
655 Ok(())
656 }
657
658 #[tokio::test]
661 async fn endpoint_discovery_with_wrong_existing_addr() -> anyhow::Result<()> {
662 let _guard = iroh_test::logging::setup();
663 let disco_shared = TestDiscoveryShared::default();
664 let (ep1, _guard1) = {
665 let secret = SecretKey::generate();
666 let disco = disco_shared.create_discovery(secret.public());
667 new_endpoint(secret, disco).await
668 };
669 let (ep2, _guard2) = {
670 let secret = SecretKey::generate();
671 let disco = disco_shared.create_discovery(secret.public());
672 new_endpoint(secret, disco).await
673 };
674 ep1.node_addr().await?;
676 let ep1_wrong_addr = NodeAddr {
677 node_id: ep1.node_id(),
678 info: AddrInfo {
679 relay_url: None,
680 direct_addresses: BTreeSet::from(["240.0.0.1:1000".parse().unwrap()]),
681 },
682 };
683 let _conn = ep2.connect(ep1_wrong_addr, TEST_ALPN).await?;
684 Ok(())
685 }
686
687 async fn new_endpoint(
688 secret: SecretKey,
689 disco: impl Discovery + 'static,
690 ) -> (Endpoint, AbortOnDropHandle<anyhow::Result<()>>) {
691 let ep = Endpoint::builder()
692 .secret_key(secret)
693 .discovery(Box::new(disco))
694 .relay_mode(RelayMode::Disabled)
695 .alpns(vec![TEST_ALPN.to_vec()])
696 .bind()
697 .await
698 .unwrap();
699
700 let handle = tokio::spawn({
701 let ep = ep.clone();
702 async move {
703 while let Some(connecting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
705 let _conn = connecting.await?;
706 }
708
709 anyhow::Ok(())
710 }
711 });
712
713 (ep, AbortOnDropHandle::new(handle))
714 }
715
716 fn system_time_now() -> u64 {
717 SystemTime::now()
718 .duration_since(SystemTime::UNIX_EPOCH)
719 .expect("time drift")
720 .as_micros() as u64
721 }
722}
723
724#[cfg(test)]
729mod test_dns_pkarr {
730 use std::time::Duration;
731
732 use anyhow::Result;
733 use iroh_base::key::SecretKey;
734 use tokio_util::task::AbortOnDropHandle;
735
736 use crate::{
737 discovery::pkarr::PkarrPublisher,
738 dns::{node_info::NodeInfo, ResolverExt},
739 relay::{RelayMap, RelayMode},
740 test_utils::{
741 dns_server::{create_dns_resolver, run_dns_server},
742 pkarr_dns_state::State,
743 run_relay_server, DnsPkarrServer,
744 },
745 AddrInfo, Endpoint, NodeAddr,
746 };
747
748 const PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
749
750 #[tokio::test]
751 async fn dns_resolve() -> Result<()> {
752 let _logging_guard = iroh_test::logging::setup();
753
754 let origin = "testdns.example".to_string();
755 let state = State::new(origin.clone());
756 let (nameserver, _dns_drop_guard) = run_dns_server(state.clone()).await?;
757
758 let secret_key = SecretKey::generate();
759 let node_info = NodeInfo::new(
760 secret_key.public(),
761 Some("https://relay.example".parse().unwrap()),
762 Default::default(),
763 );
764 let signed_packet = node_info.to_pkarr_signed_packet(&secret_key, 30)?;
765 state.upsert(signed_packet)?;
766
767 let resolver = create_dns_resolver(nameserver)?;
768 let resolved = resolver.lookup_by_id(&node_info.node_id, &origin).await?;
769
770 assert_eq!(resolved, node_info.into());
771
772 Ok(())
773 }
774
775 #[tokio::test]
776 async fn pkarr_publish_dns_resolve() -> Result<()> {
777 let _logging_guard = iroh_test::logging::setup();
778
779 let origin = "testdns.example".to_string();
780
781 let dns_pkarr_server = DnsPkarrServer::run_with_origin(origin.clone()).await?;
782
783 let secret_key = SecretKey::generate();
784 let node_id = secret_key.public();
785
786 let addr_info = AddrInfo {
787 relay_url: Some("https://relay.example".parse().unwrap()),
788 ..Default::default()
789 };
790
791 let resolver = create_dns_resolver(dns_pkarr_server.nameserver)?;
792 let publisher = PkarrPublisher::new(secret_key, dns_pkarr_server.pkarr_url.clone());
793 publisher.update_addr_info(&addr_info);
795 dns_pkarr_server.on_node(&node_id, PUBLISH_TIMEOUT).await?;
797 let resolved = resolver.lookup_by_id(&node_id, &origin).await?;
798
799 let expected = NodeAddr {
800 info: addr_info,
801 node_id,
802 };
803
804 assert_eq!(resolved, expected);
805 Ok(())
806 }
807
808 const TEST_ALPN: &[u8] = b"TEST";
809
810 #[tokio::test]
811 async fn pkarr_publish_dns_discover() -> Result<()> {
812 let _logging_guard = iroh_test::logging::setup();
813
814 let dns_pkarr_server = DnsPkarrServer::run().await?;
815 let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
816
817 let (ep1, _guard1) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
818 let (ep2, _guard2) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
819
820 dns_pkarr_server
822 .on_node(&ep1.node_id(), PUBLISH_TIMEOUT)
823 .await?;
824
825 let res = ep2.connect(ep1.node_id(), TEST_ALPN).await;
827 assert!(res.is_ok(), "connection established");
828 Ok(())
829 }
830
831 #[tokio::test]
832 async fn pkarr_publish_dns_discover_empty_node_addr() -> Result<()> {
833 let _logging_guard = iroh_test::logging::setup();
834
835 let dns_pkarr_server = DnsPkarrServer::run().await?;
836 let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
837
838 let (ep1, _guard1) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
839 let (ep2, _guard2) = ep_with_discovery(&relay_map, &dns_pkarr_server).await?;
840
841 dns_pkarr_server
843 .on_node(&ep1.node_id(), PUBLISH_TIMEOUT)
844 .await?;
845
846 let res = ep2.connect(ep1.node_id(), TEST_ALPN).await;
848 assert!(res.is_ok(), "connection established");
849 Ok(())
850 }
851
852 async fn ep_with_discovery(
853 relay_map: &RelayMap,
854 dns_pkarr_server: &DnsPkarrServer,
855 ) -> Result<(Endpoint, AbortOnDropHandle<Result<()>>)> {
856 let secret_key = SecretKey::generate();
857 let ep = Endpoint::builder()
858 .relay_mode(RelayMode::Custom(relay_map.clone()))
859 .insecure_skip_relay_cert_verify(true)
860 .secret_key(secret_key.clone())
861 .alpns(vec![TEST_ALPN.to_vec()])
862 .dns_resolver(dns_pkarr_server.dns_resolver())
863 .discovery(dns_pkarr_server.discovery(secret_key))
864 .bind()
865 .await?;
866
867 let handle = tokio::spawn({
868 let ep = ep.clone();
869 async move {
870 while let Some(connecting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
872 let _conn = connecting.await?;
873 }
875
876 anyhow::Ok(())
877 }
878 });
879
880 Ok((ep, AbortOnDropHandle::new(handle)))
881 }
882}