1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
58
59use ant_libp2p_core as libp2p_core;
60
61#[cfg(feature = "async-std")]
62pub mod async_std {
63 use std::{io, sync::Arc};
64
65 use async_std_resolver::AsyncStdResolver;
66 use futures::FutureExt;
67 use hickory_resolver::{
68 config::{ResolverConfig, ResolverOpts},
69 system_conf,
70 };
71 use parking_lot::Mutex;
72
73 pub type Transport<T> = crate::Transport<T, AsyncStdResolver>;
76
77 impl<T> Transport<T> {
78 pub async fn system(inner: T) -> Result<Transport<T>, io::Error> {
80 let (cfg, opts) = system_conf::read_system_conf()?;
81 Ok(Self::custom(inner, cfg, opts).await)
82 }
83
84 pub async fn custom(inner: T, cfg: ResolverConfig, opts: ResolverOpts) -> Transport<T> {
86 Transport {
87 inner: Arc::new(Mutex::new(inner)),
88 resolver: async_std_resolver::resolver(cfg, opts).await,
89 }
90 }
91
92 #[doc(hidden)]
94 pub fn system2(inner: T) -> Result<Transport<T>, io::Error> {
95 Ok(Transport {
96 inner: Arc::new(Mutex::new(inner)),
97 resolver: async_std_resolver::resolver_from_system_conf()
98 .now_or_never()
99 .expect(
100 "async_std_resolver::resolver_from_system_conf did not resolve immediately",
101 )?,
102 })
103 }
104
105 #[doc(hidden)]
107 pub fn custom2(inner: T, cfg: ResolverConfig, opts: ResolverOpts) -> Transport<T> {
108 Transport {
109 inner: Arc::new(Mutex::new(inner)),
110 resolver: async_std_resolver::resolver(cfg, opts)
111 .now_or_never()
112 .expect("async_std_resolver::resolver did not resolve immediately"),
113 }
114 }
115 }
116}
117
118#[cfg(feature = "tokio")]
119pub mod tokio {
120 use std::sync::Arc;
121
122 use hickory_resolver::{system_conf, TokioAsyncResolver};
123 use parking_lot::Mutex;
124
125 pub type Transport<T> = crate::Transport<T, TokioAsyncResolver>;
128
129 impl<T> Transport<T> {
130 pub fn system(inner: T) -> Result<Transport<T>, std::io::Error> {
132 let (cfg, opts) = system_conf::read_system_conf()?;
133 Ok(Self::custom(inner, cfg, opts))
134 }
135
136 pub fn custom(
139 inner: T,
140 cfg: hickory_resolver::config::ResolverConfig,
141 opts: hickory_resolver::config::ResolverOpts,
142 ) -> Transport<T> {
143 Transport {
144 inner: Arc::new(Mutex::new(inner)),
145 resolver: TokioAsyncResolver::tokio(cfg, opts),
146 }
147 }
148 }
149}
150
151use std::{
152 error, fmt, io, iter,
153 net::{Ipv4Addr, Ipv6Addr},
154 ops::DerefMut,
155 pin::Pin,
156 str,
157 sync::Arc,
158 task::{Context, Poll},
159};
160
161use async_trait::async_trait;
162use futures::{future::BoxFuture, prelude::*};
163pub use hickory_resolver::{
164 config::{ResolverConfig, ResolverOpts},
165 error::{ResolveError, ResolveErrorKind},
166};
167use hickory_resolver::{
168 lookup::{Ipv4Lookup, Ipv6Lookup, TxtLookup},
169 lookup_ip::LookupIp,
170 name_server::ConnectionProvider,
171 AsyncResolver,
172};
173use libp2p_core::{
174 multiaddr::{Multiaddr, Protocol},
175 transport::{DialOpts, ListenerId, TransportError, TransportEvent},
176};
177use parking_lot::Mutex;
178use smallvec::SmallVec;
179
180const DNSADDR_PREFIX: &str = "_dnsaddr.";
182
183const MAX_DIAL_ATTEMPTS: usize = 16;
185
186const MAX_DNS_LOOKUPS: usize = 32;
192
193const MAX_TXT_RECORDS: usize = 16;
197
198#[derive(Debug)]
202pub struct Transport<T, R> {
203 inner: Arc<Mutex<T>>,
205 resolver: R,
207}
208
209impl<T, R> libp2p_core::Transport for Transport<T, R>
210where
211 T: libp2p_core::Transport + Send + Unpin + 'static,
212 T::Error: Send,
213 T::Dial: Send,
214 R: Clone + Send + Sync + Resolver + 'static,
215{
216 type Output = T::Output;
217 type Error = Error<T::Error>;
218 type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
219 type Dial = future::Either<
220 future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
221 BoxFuture<'static, Result<Self::Output, Self::Error>>,
222 >;
223
224 fn listen_on(
225 &mut self,
226 id: ListenerId,
227 addr: Multiaddr,
228 ) -> Result<(), TransportError<Self::Error>> {
229 self.inner
230 .lock()
231 .listen_on(id, addr)
232 .map_err(|e| e.map(Error::Transport))
233 }
234
235 fn remove_listener(&mut self, id: ListenerId) -> bool {
236 self.inner.lock().remove_listener(id)
237 }
238
239 fn dial(
240 &mut self,
241 addr: Multiaddr,
242 dial_opts: DialOpts,
243 ) -> Result<Self::Dial, TransportError<Self::Error>> {
244 Ok(self.do_dial(addr, dial_opts))
245 }
246
247 fn poll(
248 self: Pin<&mut Self>,
249 cx: &mut Context<'_>,
250 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
251 let mut inner = self.inner.lock();
252 libp2p_core::Transport::poll(Pin::new(inner.deref_mut()), cx).map(|event| {
253 event
254 .map_upgrade(|upgr| upgr.map_err::<_, fn(_) -> _>(Error::Transport))
255 .map_err(Error::Transport)
256 })
257 }
258}
259
260impl<T, R> Transport<T, R>
261where
262 T: libp2p_core::Transport + Send + Unpin + 'static,
263 T::Error: Send,
264 T::Dial: Send,
265 R: Clone + Send + Sync + Resolver + 'static,
266{
267 fn do_dial(
268 &mut self,
269 addr: Multiaddr,
270 dial_opts: DialOpts,
271 ) -> <Self as libp2p_core::Transport>::Dial {
272 let resolver = self.resolver.clone();
273 let inner = self.inner.clone();
274
275 async move {
278 let mut last_err = None;
279 let mut dns_lookups = 0;
280 let mut dial_attempts = 0;
281 let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
284 unresolved.push(addr.clone());
285
286 while let Some(addr) = unresolved.pop() {
290 if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| {
291 matches!(
292 p,
293 Protocol::Dns(_)
294 | Protocol::Dns4(_)
295 | Protocol::Dns6(_)
296 | Protocol::Dnsaddr(_)
297 )
298 }) {
299 if dns_lookups == MAX_DNS_LOOKUPS {
300 tracing::debug!(address=%addr, "Too many DNS lookups, dropping unresolved address");
301 last_err = Some(Error::TooManyLookups);
302 continue;
305 }
306 dns_lookups += 1;
307 match resolve(&name, &resolver).await {
308 Err(e) => {
309 if unresolved.is_empty() {
310 return Err(e);
311 }
312 last_err = Some(e);
315 }
316 Ok(Resolved::One(ip)) => {
317 tracing::trace!(protocol=%name, resolved=%ip);
318 let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
319 unresolved.push(addr);
320 }
321 Ok(Resolved::Many(ips)) => {
322 for ip in ips {
323 tracing::trace!(protocol=%name, resolved=%ip);
324 let addr =
325 addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
326 unresolved.push(addr);
327 }
328 }
329 Ok(Resolved::Addrs(addrs)) => {
330 let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
331 let prefix = addr.iter().take(i).collect::<Multiaddr>();
332 let mut n = 0;
333 for a in addrs {
334 if a.ends_with(&suffix) {
335 if n < MAX_TXT_RECORDS {
336 n += 1;
337 tracing::trace!(protocol=%name, resolved=%a);
338 let addr =
339 prefix.iter().chain(a.iter()).collect::<Multiaddr>();
340 unresolved.push(addr);
341 } else {
342 tracing::debug!(
343 resolved=%a,
344 "Too many TXT records, dropping resolved"
345 );
346 }
347 }
348 }
349 }
350 }
351 } else {
352 tracing::debug!(address=%addr, "Dialing address");
354
355 let transport = inner.clone();
356 let dial = transport.lock().dial(addr, dial_opts);
357 let result = match dial {
358 Ok(out) => {
359 dial_attempts += 1;
363 out.await.map_err(Error::Transport)
364 }
365 Err(TransportError::MultiaddrNotSupported(a)) => {
366 Err(Error::MultiaddrNotSupported(a))
367 }
368 Err(TransportError::Other(err)) => Err(Error::Transport(err)),
369 };
370
371 match result {
372 Ok(out) => return Ok(out),
373 Err(err) => {
374 tracing::debug!("Dial error: {:?}.", err);
375 if unresolved.is_empty() {
376 return Err(err);
377 }
378 if dial_attempts == MAX_DIAL_ATTEMPTS {
379 tracing::debug!(
380 "Aborting dialing after {} attempts.",
381 MAX_DIAL_ATTEMPTS
382 );
383 return Err(err);
384 }
385 last_err = Some(err);
386 }
387 }
388 }
389 }
390
391 Err(last_err.unwrap_or_else(|| {
396 Error::ResolveError(ResolveErrorKind::Message("No matching records found.").into())
397 }))
398 }
399 .boxed()
400 .right_future()
401 }
402}
403
404#[derive(Debug)]
406#[allow(clippy::large_enum_variant)]
407pub enum Error<TErr> {
408 Transport(TErr),
410 #[allow(clippy::enum_variant_names)]
412 ResolveError(ResolveError),
413 MultiaddrNotSupported(Multiaddr),
415 TooManyLookups,
422}
423
424impl<TErr> fmt::Display for Error<TErr>
425where
426 TErr: fmt::Display,
427{
428 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
429 match self {
430 Error::Transport(err) => write!(f, "{err}"),
431 Error::ResolveError(err) => write!(f, "{err}"),
432 Error::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {a}"),
433 Error::TooManyLookups => write!(f, "Too many DNS lookups"),
434 }
435 }
436}
437
438impl<TErr> error::Error for Error<TErr>
439where
440 TErr: error::Error + 'static,
441{
442 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
443 match self {
444 Error::Transport(err) => Some(err),
445 Error::ResolveError(err) => Some(err),
446 Error::MultiaddrNotSupported(_) => None,
447 Error::TooManyLookups => None,
448 }
449 }
450}
451
452enum Resolved<'a> {
454 One(Protocol<'a>),
458 Many(Vec<Protocol<'a>>),
461 Addrs(Vec<Multiaddr>),
465}
466
467fn resolve<'a, E: 'a + Send, R: Resolver>(
471 proto: &Protocol<'a>,
472 resolver: &'a R,
473) -> BoxFuture<'a, Result<Resolved<'a>, Error<E>>> {
474 match proto {
475 Protocol::Dns(ref name) => resolver
476 .lookup_ip(name.clone().into_owned())
477 .map(move |res| match res {
478 Ok(ips) => {
479 let mut ips = ips.into_iter();
480 let one = ips
481 .next()
482 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
483 if let Some(two) = ips.next() {
484 Ok(Resolved::Many(
485 iter::once(one)
486 .chain(iter::once(two))
487 .chain(ips)
488 .map(Protocol::from)
489 .collect(),
490 ))
491 } else {
492 Ok(Resolved::One(Protocol::from(one)))
493 }
494 }
495 Err(e) => Err(Error::ResolveError(e)),
496 })
497 .boxed(),
498 Protocol::Dns4(ref name) => resolver
499 .ipv4_lookup(name.clone().into_owned())
500 .map(move |res| match res {
501 Ok(ips) => {
502 let mut ips = ips.into_iter();
503 let one = ips
504 .next()
505 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
506 if let Some(two) = ips.next() {
507 Ok(Resolved::Many(
508 iter::once(one)
509 .chain(iter::once(two))
510 .chain(ips)
511 .map(Ipv4Addr::from)
512 .map(Protocol::from)
513 .collect(),
514 ))
515 } else {
516 Ok(Resolved::One(Protocol::from(Ipv4Addr::from(one))))
517 }
518 }
519 Err(e) => Err(Error::ResolveError(e)),
520 })
521 .boxed(),
522 Protocol::Dns6(ref name) => resolver
523 .ipv6_lookup(name.clone().into_owned())
524 .map(move |res| match res {
525 Ok(ips) => {
526 let mut ips = ips.into_iter();
527 let one = ips
528 .next()
529 .expect("If there are no results, `Err(NoRecordsFound)` is expected.");
530 if let Some(two) = ips.next() {
531 Ok(Resolved::Many(
532 iter::once(one)
533 .chain(iter::once(two))
534 .chain(ips)
535 .map(Ipv6Addr::from)
536 .map(Protocol::from)
537 .collect(),
538 ))
539 } else {
540 Ok(Resolved::One(Protocol::from(Ipv6Addr::from(one))))
541 }
542 }
543 Err(e) => Err(Error::ResolveError(e)),
544 })
545 .boxed(),
546 Protocol::Dnsaddr(ref name) => {
547 let name = [DNSADDR_PREFIX, name].concat();
548 resolver
549 .txt_lookup(name)
550 .map(move |res| match res {
551 Ok(txts) => {
552 let mut addrs = Vec::new();
553 for txt in txts {
554 if let Some(chars) = txt.txt_data().first() {
555 match parse_dnsaddr_txt(chars) {
556 Err(e) => {
557 tracing::debug!("Invalid TXT record: {:?}", e);
559 }
560 Ok(a) => {
561 addrs.push(a);
562 }
563 }
564 }
565 }
566 Ok(Resolved::Addrs(addrs))
567 }
568 Err(e) => Err(Error::ResolveError(e)),
569 })
570 .boxed()
571 }
572 proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed(),
573 }
574}
575
576fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
578 let s = str::from_utf8(txt).map_err(invalid_data)?;
579 match s.strip_prefix("dnsaddr=") {
580 None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
581 Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?),
582 }
583}
584
585fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
586 io::Error::new(io::ErrorKind::InvalidData, e)
587}
588
589#[async_trait::async_trait]
590#[doc(hidden)]
591pub trait Resolver {
592 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError>;
593 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError>;
594 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError>;
595 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError>;
596}
597
598#[async_trait]
599impl<C> Resolver for AsyncResolver<C>
600where
601 C: ConnectionProvider,
602{
603 async fn lookup_ip(&self, name: String) -> Result<LookupIp, ResolveError> {
604 self.lookup_ip(name).await
605 }
606
607 async fn ipv4_lookup(&self, name: String) -> Result<Ipv4Lookup, ResolveError> {
608 self.ipv4_lookup(name).await
609 }
610
611 async fn ipv6_lookup(&self, name: String) -> Result<Ipv6Lookup, ResolveError> {
612 self.ipv6_lookup(name).await
613 }
614
615 async fn txt_lookup(&self, name: String) -> Result<TxtLookup, ResolveError> {
616 self.txt_lookup(name).await
617 }
618}
619
620#[cfg(all(test, any(feature = "tokio", feature = "async-std")))]
621mod tests {
622 use futures::future::BoxFuture;
623 use libp2p_core::{
624 multiaddr::{Multiaddr, Protocol},
625 transport::{PortUse, TransportError, TransportEvent},
626 Endpoint, Transport,
627 };
628 use libp2p_identity::PeerId;
629
630 use super::*;
631
632 #[test]
633 fn basic_resolve() {
634 let _ = tracing_subscriber::fmt()
635 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
636 .try_init();
637
638 #[derive(Clone)]
639 struct CustomTransport;
640
641 impl Transport for CustomTransport {
642 type Output = ();
643 type Error = std::io::Error;
644 type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
645 type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
646
647 fn listen_on(
648 &mut self,
649 _: ListenerId,
650 _: Multiaddr,
651 ) -> Result<(), TransportError<Self::Error>> {
652 unreachable!()
653 }
654
655 fn remove_listener(&mut self, _: ListenerId) -> bool {
656 false
657 }
658
659 fn dial(
660 &mut self,
661 addr: Multiaddr,
662 _: DialOpts,
663 ) -> Result<Self::Dial, TransportError<Self::Error>> {
664 assert!(!addr.iter().any(|p| matches!(
666 p,
667 Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
668 )));
669 Ok(Box::pin(future::ready(Ok(()))))
670 }
671
672 fn poll(
673 self: Pin<&mut Self>,
674 _: &mut Context<'_>,
675 ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
676 unreachable!()
677 }
678 }
679
680 async fn run<T, R>(mut transport: super::Transport<T, R>)
681 where
682 T: Transport + Clone + Send + Unpin + 'static,
683 T::Error: Send,
684 T::Dial: Send,
685 R: Clone + Send + Sync + Resolver + 'static,
686 {
687 let dial_opts = DialOpts {
688 role: Endpoint::Dialer,
689 port_use: PortUse::Reuse,
690 };
691 let _ = transport
693 .dial("/dns4/example.com/tcp/20000".parse().unwrap(), dial_opts)
694 .unwrap()
695 .await
696 .unwrap();
697
698 let _ = transport
700 .dial("/dns6/example.com/tcp/20000".parse().unwrap(), dial_opts)
701 .unwrap()
702 .await
703 .unwrap();
704
705 let _ = transport
707 .dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap(), dial_opts)
708 .unwrap()
709 .await
710 .unwrap();
711
712 let _ = transport
714 .dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap(), dial_opts)
715 .unwrap()
716 .await
717 .unwrap();
718
719 let _ = transport
723 .dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), dial_opts)
724 .unwrap()
725 .await
726 .unwrap();
727
728 match transport
731 .dial(
732 format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random())
733 .parse()
734 .unwrap(),
735 dial_opts,
736 )
737 .unwrap()
738 .await
739 {
740 Err(Error::ResolveError(_)) => {}
741 Err(e) => panic!("Unexpected error: {e:?}"),
742 Ok(_) => panic!("Unexpected success."),
743 }
744
745 match transport
747 .dial(
748 "/dns4/example.invalid/tcp/20000".parse().unwrap(),
749 dial_opts,
750 )
751 .unwrap()
752 .await
753 {
754 Err(Error::ResolveError(e)) => match e.kind() {
755 ResolveErrorKind::NoRecordsFound { .. } => {}
756 _ => panic!("Unexpected DNS error: {e:?}"),
757 },
758 Err(e) => panic!("Unexpected error: {e:?}"),
759 Ok(_) => panic!("Unexpected success."),
760 }
761 }
762
763 #[cfg(feature = "async-std")]
764 {
765 let config = ResolverConfig::quad9();
768 let opts = ResolverOpts::default();
769 async_std_crate::task::block_on(
770 async_std::Transport::custom(CustomTransport, config, opts).then(run),
771 );
772 }
773
774 #[cfg(feature = "tokio")]
775 {
776 let config = ResolverConfig::quad9();
779 let opts = ResolverOpts::default();
780 let rt = ::tokio::runtime::Builder::new_current_thread()
781 .enable_io()
782 .enable_time()
783 .build()
784 .unwrap();
785
786 rt.block_on(run(tokio::Transport::custom(CustomTransport, config, opts)));
787 }
788 }
789}