1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
57
58#[cfg(feature = "tokio")]
59pub mod tokio {
60 use std::sync::Arc;
61
62 use hickory_resolver::{name_server::TokioConnectionProvider, system_conf, TokioResolver};
63 use parking_lot::Mutex;
64
65 pub type Transport<T> = crate::Transport<T, TokioResolver>;
68
69 impl<T> Transport<T> {
70 pub fn system(inner: T) -> Result<Transport<T>, std::io::Error> {
72 let (cfg, opts) = system_conf::read_system_conf()?;
73 Ok(Self::custom(inner, cfg, opts))
74 }
75
76 pub fn custom(
79 inner: T,
80 cfg: hickory_resolver::config::ResolverConfig,
81 opts: hickory_resolver::config::ResolverOpts,
82 ) -> Transport<T> {
83 Transport {
84 inner: Arc::new(Mutex::new(inner)),
85 resolver: TokioResolver::builder_with_config(
86 cfg,
87 TokioConnectionProvider::default(),
88 )
89 .with_options(opts)
90 .build(),
91 }
92 }
93 }
94}
95
96use std::{
97 error, fmt, io, iter,
98 net::{Ipv4Addr, Ipv6Addr},
99 ops::DerefMut,
100 pin::Pin,
101 str,
102 sync::Arc,
103 task::{Context, Poll},
104};
105
106use async_trait::async_trait;
107use futures::{future::BoxFuture, prelude::*};
108pub use hickory_resolver::{
109 config::{ResolverConfig, ResolverOpts},
110 ResolveError, ResolveErrorKind,
111};
112use hickory_resolver::{
113 lookup::{Ipv4Lookup, Ipv6Lookup, TxtLookup},
114 lookup_ip::LookupIp,
115 name_server::ConnectionProvider,
116};
117use libp2p_core::{
118 multiaddr::{Multiaddr, Protocol},
119 transport::{DialOpts, ListenerId, TransportError, TransportEvent},
120};
121use parking_lot::Mutex;
122use smallvec::SmallVec;
123
124const DNSADDR_PREFIX: &str = "_dnsaddr.";
126
127const MAX_DIAL_ATTEMPTS: usize = 16;
129
130const MAX_DNS_LOOKUPS: usize = 32;
136
137const MAX_TXT_RECORDS: usize = 16;
141
142#[derive(Debug)]
145pub struct Transport<T, R> {
146 inner: Arc<Mutex<T>>,
148 resolver: R,
150}
151
152impl<T, R> libp2p_core::Transport for Transport<T, R>
153where
154 T: libp2p_core::Transport + Send + Unpin + 'static,
155 T::Error: Send,
156 T::Dial: Send,
157 R: Clone + Send + Sync + Resolver + 'static,
158{
159 type Output = T::Output;
160 type Error = Error<T::Error>;
161 type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
162 type Dial = future::Either<
163 future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
164 BoxFuture<'static, Result<Self::Output, Self::Error>>,
165 >;
166
167 fn listen_on(
168 &mut self,
169 id: ListenerId,
170 addr: Multiaddr,
171 ) -> Result<(), TransportError<Self::Error>> {
172 self.inner
173 .lock()
174 .listen_on(id, addr)
175 .map_err(|e| e.map(Error::Transport))
176 }
177
178 fn remove_listener(&mut self, id: ListenerId) -> bool {
179 self.inner.lock().remove_listener(id)
180 }
181
182 fn dial(
183 &mut self,
184 addr: Multiaddr,
185 dial_opts: DialOpts,
186 ) -> Result<Self::Dial, TransportError<Self::Error>> {
187 Ok(self.do_dial(addr, dial_opts))
188 }
189
190 fn poll(
191 self: Pin<&mut Self>,
192 cx: &mut Context<'_>,
193 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
194 let mut inner = self.inner.lock();
195 libp2p_core::Transport::poll(Pin::new(inner.deref_mut()), cx).map(|event| {
196 event
197 .map_upgrade(|upgr| upgr.map_err::<_, fn(_) -> _>(Error::Transport))
198 .map_err(Error::Transport)
199 })
200 }
201}
202
203impl<T, R> Transport<T, R>
204where
205 T: libp2p_core::Transport + Send + Unpin + 'static,
206 T::Error: Send,
207 T::Dial: Send,
208 R: Clone + Send + Sync + Resolver + 'static,
209{
210 fn do_dial(
211 &mut self,
212 addr: Multiaddr,
213 dial_opts: DialOpts,
214 ) -> <Self as libp2p_core::Transport>::Dial {
215 let resolver = self.resolver.clone();
216 let inner = self.inner.clone();
217
218 async move {
221 let mut dial_errors: Vec<Error<T::Error>> = Vec::new();
222 let mut dns_lookups = 0;
223 let mut dial_attempts = 0;
224 let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
227 unresolved.push(addr.clone());
228
229 while let Some(addr) = unresolved.pop() {
233 if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| {
234 matches!(
235 p,
236 Protocol::Dns(_)
237 | Protocol::Dns4(_)
238 | Protocol::Dns6(_)
239 | Protocol::Dnsaddr(_)
240 )
241 }) {
242 if dns_lookups == MAX_DNS_LOOKUPS {
243 tracing::debug!(address=%addr, "Too many DNS lookups, dropping unresolved address");
244 dial_errors.push(Error::TooManyLookups);
245 continue;
248 }
249 dns_lookups += 1;
250 match resolve(&name, &resolver).await {
251 Err(e) => {
252 dial_errors.push(e);
254 }
255 Ok(Resolved::One(ip)) => {
256 tracing::trace!(protocol=%name, resolved=%ip);
257 let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
258 unresolved.push(addr);
259 }
260 Ok(Resolved::Many(ips)) => {
261 for ip in ips {
262 tracing::trace!(protocol=%name, resolved=%ip);
263 let addr =
264 addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
265 unresolved.push(addr);
266 }
267 }
268 Ok(Resolved::Addrs(addrs)) => {
269 let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
270 let prefix = addr.iter().take(i).collect::<Multiaddr>();
271 let mut n = 0;
272 for a in addrs {
273 if a.ends_with(&suffix) {
274 if n < MAX_TXT_RECORDS {
275 n += 1;
276 tracing::trace!(protocol=%name, resolved=%a);
277 let addr =
278 prefix.iter().chain(a.iter()).collect::<Multiaddr>();
279 unresolved.push(addr);
280 } else {
281 tracing::debug!(
282 resolved=%a,
283 "Too many TXT records, dropping resolved"
284 );
285 }
286 }
287 }
288 }
289 }
290 } else {
291 tracing::debug!(address=%addr, "Dialing address");
293
294 let transport = inner.clone();
295 let dial = transport.lock().dial(addr, dial_opts);
296 let result = match dial {
297 Ok(out) => {
298 dial_attempts += 1;
302 out.await.map_err(Error::Transport)
303 }
304 Err(TransportError::MultiaddrNotSupported(a)) => {
305 Err(Error::MultiaddrNotSupported(a))
306 }
307 Err(TransportError::Other(err)) => Err(Error::Transport(err)),
308 };
309
310 match result {
311 Ok(out) => return Ok(out),
312 Err(err) => {
313 tracing::debug!("Dial error: {:?}.", err);
314 dial_errors.push(err);
315
316 if unresolved.is_empty() {
317 break;
318 }
319
320 if dial_attempts == MAX_DIAL_ATTEMPTS {
321 tracing::debug!(
322 "Aborting dialing after {} attempts.",
323 MAX_DIAL_ATTEMPTS
324 );
325 break;
326 }
327 }
328 }
329 }
330 }
331
332 if !dial_errors.is_empty() {
336 Err(Error::Dial(dial_errors))
337 } else {
338 Err(Error::ResolveError(
339 ResolveErrorKind::Message("No Matching Records Found").into(),
340 ))
341 }
342 }
343 .boxed()
344 .right_future()
345 }
346}
347
348#[derive(Debug)]
350#[allow(clippy::large_enum_variant)]
351pub enum Error<TErr> {
352 Transport(TErr),
354 #[allow(clippy::enum_variant_names)]
356 ResolveError(ResolveError),
357 MultiaddrNotSupported(Multiaddr),
359 TooManyLookups,
366 Dial(Vec<Error<TErr>>),
368}
369
370impl<TErr> fmt::Display for Error<TErr>
371where
372 TErr: fmt::Display,
373{
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 match self {
376 Error::Transport(err) => write!(f, "{err}"),
377 Error::ResolveError(err) => write!(f, "{err}"),
378 Error::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {a}"),
379 Error::TooManyLookups => write!(f, "Too many DNS lookups"),
380 Error::Dial(errs) => {
381 write!(f, "Multiple dial errors occurred:")?;
382 for err in errs {
383 write!(f, "\n - {err}")?;
384 }
385 Ok(())
386 }
387 }
388 }
389}
390
391impl<TErr> error::Error for Error<TErr>
392where
393 TErr: error::Error + 'static,
394{
395 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
396 match self {
397 Error::Transport(err) => Some(err),
398 Error::ResolveError(err) => Some(err),
399 Error::MultiaddrNotSupported(_) => None,
400 Error::TooManyLookups => None,
401 Error::Dial(errs) => errs.last().and_then(|e| e.source()),
402 }
403 }
404}
405
406enum Resolved<'a> {
408 One(Protocol<'a>),
412 Many(Vec<Protocol<'a>>),
415 Addrs(Vec<Multiaddr>),
419}
420
421fn resolve<'a, E: 'a + Send, R: Resolver>(
425 proto: &Protocol<'a>,
426 resolver: &'a R,
427) -> BoxFuture<'a, Result<Resolved<'a>, Error<E>>> {
428 match proto {
429 Protocol::Dns(ref name) => resolver
430 .lookup_ip(name.clone().into_owned())
431 .map(move |res| match res {
432 Ok(ips) => {
433 let mut ips = ips.into_iter();
434 let one = ips
435 .next()
436 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
437 if let Some(two) = ips.next() {
438 Ok(Resolved::Many(
439 iter::once(one)
440 .chain(iter::once(two))
441 .chain(ips)
442 .map(Protocol::from)
443 .collect(),
444 ))
445 } else {
446 Ok(Resolved::One(Protocol::from(one)))
447 }
448 }
449 Err(e) => Err(Error::ResolveError(e)),
450 })
451 .boxed(),
452 Protocol::Dns4(ref name) => resolver
453 .ipv4_lookup(name.clone().into_owned())
454 .map(move |res| match res {
455 Ok(ips) => {
456 let mut ips = ips.into_iter();
457 let one = ips
458 .next()
459 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
460 if let Some(two) = ips.next() {
461 Ok(Resolved::Many(
462 iter::once(one)
463 .chain(iter::once(two))
464 .chain(ips)
465 .map(Ipv4Addr::from)
466 .map(Protocol::from)
467 .collect(),
468 ))
469 } else {
470 Ok(Resolved::One(Protocol::from(Ipv4Addr::from(one))))
471 }
472 }
473 Err(e) => Err(Error::ResolveError(e)),
474 })
475 .boxed(),
476 Protocol::Dns6(ref name) => resolver
477 .ipv6_lookup(name.clone().into_owned())
478 .map(move |res| match res {
479 Ok(ips) => {
480 let mut ips = ips.into_iter();
481 let one = ips
482 .next()
483 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
484 if let Some(two) = ips.next() {
485 Ok(Resolved::Many(
486 iter::once(one)
487 .chain(iter::once(two))
488 .chain(ips)
489 .map(Ipv6Addr::from)
490 .map(Protocol::from)
491 .collect(),
492 ))
493 } else {
494 Ok(Resolved::One(Protocol::from(Ipv6Addr::from(one))))
495 }
496 }
497 Err(e) => Err(Error::ResolveError(e)),
498 })
499 .boxed(),
500 Protocol::Dnsaddr(ref name) => {
501 let name = [DNSADDR_PREFIX, name].concat();
502 resolver
503 .txt_lookup(name)
504 .map(move |res| match res {
505 Ok(txts) => {
506 let mut addrs = Vec::new();
507 for txt in txts {
508 if let Some(chars) = txt.txt_data().first() {
509 match parse_dnsaddr_txt(chars) {
510 Err(e) => {
511 tracing::debug!("Invalid TXT record: {:?}", e);
513 }
514 Ok(a) => {
515 addrs.push(a);
516 }
517 }
518 }
519 }
520 Ok(Resolved::Addrs(addrs))
521 }
522 Err(e) => Err(Error::ResolveError(e)),
523 })
524 .boxed()
525 }
526 proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed(),
527 }
528}
529
530fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
532 let s = str::from_utf8(txt).map_err(invalid_data)?;
533 match s.strip_prefix("dnsaddr=") {
534 None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
535 Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?),
536 }
537}
538
539fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
540 io::Error::new(io::ErrorKind::InvalidData, e)
541}
542
543#[async_trait::async_trait]
544#[doc(hidden)]
545pub trait Resolver {
546 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError>;
547 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError>;
548 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError>;
549 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError>;
550}
551
552#[async_trait]
553impl<C> Resolver for hickory_resolver::Resolver<C>
554where
555 C: ConnectionProvider,
556{
557 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError> {
558 self.lookup_ip(name).await
559 }
560
561 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError> {
562 self.ipv4_lookup(name).await
563 }
564
565 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError> {
566 self.ipv6_lookup(name).await
567 }
568
569 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError> {
570 self.txt_lookup(name).await
571 }
572}
573
574#[cfg(all(test, feature = "tokio"))]
575mod tests {
576 use futures::future::BoxFuture;
577 use hickory_resolver::proto::{ProtoError, ProtoErrorKind};
578 use libp2p_core::{
579 multiaddr::{Multiaddr, Protocol},
580 transport::{PortUse, TransportError, TransportEvent},
581 Endpoint, Transport,
582 };
583 use libp2p_identity::PeerId;
584
585 use super::*;
586
587 fn test_tokio<T, F: Future<Output = ()>>(
588 transport: T,
589 test_fn: impl FnOnce(tokio::Transport<T>) -> F,
590 ) {
591 let config = ResolverConfig::quad9();
592 let opts = ResolverOpts::default();
593 let transport = tokio::Transport::custom(transport, config, opts);
594 let rt = ::tokio::runtime::Builder::new_current_thread()
595 .enable_io()
596 .enable_time()
597 .build()
598 .unwrap();
599 rt.block_on(test_fn(transport));
600 }
601
602 #[test]
603 fn basic_resolve() {
604 let _ = tracing_subscriber::fmt()
605 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
606 .try_init();
607
608 #[derive(Clone)]
609 struct CustomTransport;
610
611 impl Transport for CustomTransport {
612 type Output = ();
613 type Error = std::io::Error;
614 type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
615 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
616
617 fn listen_on(
618 &mut self,
619 _: ListenerId,
620 _: Multiaddr,
621 ) -> Result<(), TransportError<Self::Error>> {
622 unreachable!()
623 }
624
625 fn remove_listener(&mut self, _: ListenerId) -> bool {
626 false
627 }
628
629 fn dial(
630 &mut self,
631 addr: Multiaddr,
632 _: DialOpts,
633 ) -> Result<Self::Dial, TransportError<Self::Error>> {
634 assert!(!addr.iter().any(|p| matches!(
636 p,
637 Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
638 )));
639 Ok(Box::pin(future::ready(Ok(()))))
640 }
641
642 fn poll(
643 self: Pin<&mut Self>,
644 _: &mut Context<'_>,
645 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
646 unreachable!()
647 }
648 }
649
650 async fn run<T, R>(mut transport: super::Transport<T, R>)
651 where
652 T: Transport + Clone + Send + Unpin + 'static,
653 T::Error: Send,
654 T::Dial: Send,
655 R: Clone + Send + Sync + Resolver + 'static,
656 {
657 let dial_opts = DialOpts {
658 role: Endpoint::Dialer,
659 port_use: PortUse::Reuse,
660 };
661
662 let _ = transport
664 .dial("/dns4/example.com/tcp/20000".parse().unwrap(), dial_opts)
665 .unwrap()
666 .await
667 .unwrap();
668
669 let _ = transport
671 .dial("/dns6/example.com/tcp/20000".parse().unwrap(), dial_opts)
672 .unwrap()
673 .await
674 .unwrap();
675
676 let _ = transport
678 .dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap(), dial_opts)
679 .unwrap()
680 .await
681 .unwrap();
682
683 let _ = transport
685 .dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap(), dial_opts)
686 .unwrap()
687 .await
688 .unwrap();
689
690 let _ = transport
694 .dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), dial_opts)
695 .unwrap()
696 .await
697 .unwrap();
698
699 match transport
702 .dial(
703 format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random())
704 .parse()
705 .unwrap(),
706 dial_opts,
707 )
708 .unwrap()
709 .await
710 {
711 Err(Error::ResolveError(_)) => {}
712 Err(e) => panic!("Unexpected error: {e:?}"),
713 Ok(_) => panic!("Unexpected success."),
714 }
715
716 match transport
718 .dial(
719 "/dns4/example.invalid/tcp/20000".parse().unwrap(),
720 dial_opts,
721 )
722 .unwrap()
723 .await
724 {
725 Err(Error::Dial(dial_errs)) => {
726 assert_eq!(
727 dial_errs.len(),
728 1,
729 "Expected exactly 1 error for 'no records' scenario, got {dial_errs:?}"
730 );
731
732 match &dial_errs[0] {
733 Error::ResolveError(e) => match e.kind() {
734 ResolveErrorKind::Proto(ProtoError { kind, .. })
735 if matches!(
736 kind.as_ref(),
737 ProtoErrorKind::NoRecordsFound { .. }
738 ) => {}
739 _ => panic!("Unexpected DNS error: {e:?}"),
740 },
741 other => {
742 panic!("Expected a single ResolveError(...) sub-error, got {other:?}")
743 }
744 }
745 }
746
747 Err(e) => panic!("Unexpected error: {e:?}"),
748 Ok(_) => panic!("Unexpected success."),
749 }
750 }
751
752 test_tokio(CustomTransport, run);
753 }
754
755 #[test]
756 fn aggregated_dial_errors() {
757 let _ = tracing_subscriber::fmt()
758 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
759 .try_init();
760
761 #[derive(Clone)]
762 struct AlwaysFailTransport;
763
764 impl libp2p_core::Transport for AlwaysFailTransport {
765 type Output = ();
766 type Error = std::io::Error;
767 type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
768 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
769
770 fn listen_on(
771 &mut self,
772 _id: ListenerId,
773 _addr: Multiaddr,
774 ) -> Result<(), TransportError<Self::Error>> {
775 unimplemented!()
776 }
777
778 fn remove_listener(&mut self, _id: ListenerId) -> bool {
779 false
780 }
781
782 fn dial(
783 &mut self,
784 addr: Multiaddr,
785 _: DialOpts,
786 ) -> Result<Self::Dial, TransportError<Self::Error>> {
787 Ok(Box::pin(future::ready(Err(io::Error::new(
789 io::ErrorKind::Unsupported,
790 format!("No support for dialing {addr}"),
791 )))))
792 }
793
794 fn poll(
795 self: Pin<&mut Self>,
796 _cx: &mut Context<'_>,
797 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
798 unimplemented!()
799 }
800 }
801
802 async fn run_test<T, R>(mut transport: super::Transport<T, R>)
803 where
804 T: Transport<Error = std::io::Error> + Clone + Send + Unpin + 'static,
805 T::Error: Send,
806 T::Dial: Send,
807 R: Clone + Send + Sync + Resolver + 'static,
808 {
809 let dial_opts = DialOpts {
810 role: Endpoint::Dialer,
811 port_use: PortUse::Reuse,
812 };
813
814 let addr: Multiaddr = "/dnsaddr/bootstrap.libp2p.io".parse().unwrap();
817 let dial_future = transport.dial(addr, dial_opts).unwrap();
818 let result = dial_future.await;
819
820 match result {
821 Err(Error::Dial(errs)) => {
822 assert!(
824 errs.len() >= 2,
825 "Expected multiple dial errors, but got {}",
826 errs.len()
827 );
828 for e in errs {
829 match e {
830 Error::Transport(io_err) => {
831 assert_eq!(
832 io_err.kind(),
833 io::ErrorKind::Unsupported,
834 "Expected Unsupported dial error, got: {io_err:?}"
835 );
836 }
837 _ => panic!("Expected Error::Transport(Unsupported), got: {e:?}"),
838 }
839 }
840 }
841 Err(e) => panic!("Expected aggregated dial errors, got {e:?}"),
842 Ok(_) => panic!("Dial unexpectedly succeeded"),
843 }
844 }
845
846 test_tokio(AlwaysFailTransport, run_test);
847 }
848}