#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#[cfg(feature = "async-std")]
use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider};
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{
connection::Endpoint,
multiaddr::{Multiaddr, Protocol},
transport::{ListenerId, TransportError, TransportEvent},
Transport,
};
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::io;
use std::{
convert::TryFrom,
error, fmt, iter,
net::IpAddr,
ops::DerefMut,
pin::Pin,
str,
sync::Arc,
task::{Context, Poll},
};
#[cfg(any(feature = "async-std", feature = "tokio"))]
use trust_dns_resolver::system_conf;
use trust_dns_resolver::{proto::xfer::dns_handle::DnsHandle, AsyncResolver, ConnectionProvider};
#[cfg(feature = "tokio")]
use trust_dns_resolver::{TokioAsyncResolver, TokioConnection, TokioConnectionProvider};
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind};
const DNSADDR_PREFIX: &str = "_dnsaddr.";
const MAX_DIAL_ATTEMPTS: usize = 16;
const MAX_DNS_LOOKUPS: usize = 32;
const MAX_TXT_RECORDS: usize = 16;
#[cfg(feature = "async-std")]
pub type DnsConfig<T> = GenDnsConfig<T, AsyncStdConnection, AsyncStdConnectionProvider>;
#[cfg(feature = "tokio")]
pub type TokioDnsConfig<T> = GenDnsConfig<T, TokioConnection, TokioConnectionProvider>;
pub struct GenDnsConfig<T, C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
inner: Arc<Mutex<T>>,
resolver: AsyncResolver<C, P>,
}
#[cfg(feature = "async-std")]
impl<T> DnsConfig<T> {
pub async fn system(inner: T) -> Result<DnsConfig<T>, io::Error> {
let (cfg, opts) = system_conf::read_system_conf()?;
Self::custom(inner, cfg, opts).await
}
pub async fn custom(
inner: T,
cfg: ResolverConfig,
opts: ResolverOpts,
) -> Result<DnsConfig<T>, io::Error> {
Ok(DnsConfig {
inner: Arc::new(Mutex::new(inner)),
resolver: async_std_resolver::resolver(cfg, opts).await?,
})
}
}
#[cfg(feature = "tokio")]
impl<T> TokioDnsConfig<T> {
pub fn system(inner: T) -> Result<TokioDnsConfig<T>, io::Error> {
let (cfg, opts) = system_conf::read_system_conf()?;
Self::custom(inner, cfg, opts)
}
pub fn custom(
inner: T,
cfg: ResolverConfig,
opts: ResolverOpts,
) -> Result<TokioDnsConfig<T>, io::Error> {
Ok(TokioDnsConfig {
inner: Arc::new(Mutex::new(inner)),
resolver: TokioAsyncResolver::tokio(cfg, opts)?,
})
}
}
impl<T, C, P> fmt::Debug for GenDnsConfig<T, C, P>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("GenDnsConfig").field(&self.inner).finish()
}
}
impl<T, C, P> Transport for GenDnsConfig<T, C, P>
where
T: Transport + Send + Unpin + 'static,
T::Error: Send,
T::Dial: Send,
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
type Output = T::Output;
type Error = DnsErr<T::Error>;
type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
type Dial = future::Either<
future::MapErr<T::Dial, fn(T::Error) -> Self::Error>,
BoxFuture<'static, Result<Self::Output, Self::Error>>,
>;
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner
.lock()
.listen_on(addr)
.map_err(|e| e.map(DnsErr::Transport))
}
fn remove_listener(&mut self, id: ListenerId) -> bool {
self.inner.lock().remove_listener(id)
}
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Dialer)
}
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
self.do_dial(addr, Endpoint::Listener)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.lock().address_translation(server, observed)
}
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
let mut inner = self.inner.lock();
Transport::poll(Pin::new(inner.deref_mut()), cx).map(|event| {
event
.map_upgrade(|upgr| upgr.map_err::<_, fn(_) -> _>(DnsErr::Transport))
.map_err(DnsErr::Transport)
})
}
}
impl<T, C, P> GenDnsConfig<T, C, P>
where
T: Transport + Send + Unpin + 'static,
T::Error: Send,
T::Dial: Send,
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
fn do_dial(
&mut self,
addr: Multiaddr,
role_override: Endpoint,
) -> Result<<Self as Transport>::Dial, TransportError<<Self as Transport>::Error>> {
let resolver = self.resolver.clone();
let inner = self.inner.clone();
Ok(async move {
let mut last_err = None;
let mut dns_lookups = 0;
let mut dial_attempts = 0;
let mut unresolved = SmallVec::<[Multiaddr; 1]>::new();
unresolved.push(addr.clone());
while let Some(addr) = unresolved.pop() {
if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| {
matches!(
p,
Protocol::Dns(_)
| Protocol::Dns4(_)
| Protocol::Dns6(_)
| Protocol::Dnsaddr(_)
)
}) {
if dns_lookups == MAX_DNS_LOOKUPS {
log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr);
last_err = Some(DnsErr::TooManyLookups);
continue;
}
dns_lookups += 1;
match resolve(&name, &resolver).await {
Err(e) => {
if unresolved.is_empty() {
return Err(e);
}
last_err = Some(e);
}
Ok(Resolved::One(ip)) => {
log::trace!("Resolved {} -> {}", name, ip);
let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
Ok(Resolved::Many(ips)) => {
for ip in ips {
log::trace!("Resolved {} -> {}", name, ip);
let addr =
addr.replace(i, |_| Some(ip)).expect("`i` is a valid index");
unresolved.push(addr);
}
}
Ok(Resolved::Addrs(addrs)) => {
let suffix = addr.iter().skip(i + 1).collect::<Multiaddr>();
let prefix = addr.iter().take(i).collect::<Multiaddr>();
let mut n = 0;
for a in addrs {
if a.ends_with(&suffix) {
if n < MAX_TXT_RECORDS {
n += 1;
log::trace!("Resolved {} -> {}", name, a);
let addr =
prefix.iter().chain(a.iter()).collect::<Multiaddr>();
unresolved.push(addr);
} else {
log::debug!(
"Too many TXT records. Dropping resolved {}.",
a
);
}
}
}
}
}
} else {
log::debug!("Dialing {}", addr);
let transport = inner.clone();
let dial = match role_override {
Endpoint::Dialer => transport.lock().dial(addr),
Endpoint::Listener => transport.lock().dial_as_listener(addr),
};
let result = match dial {
Ok(out) => {
dial_attempts += 1;
out.await.map_err(DnsErr::Transport)
}
Err(TransportError::MultiaddrNotSupported(a)) => {
Err(DnsErr::MultiaddrNotSupported(a))
}
Err(TransportError::Other(err)) => Err(DnsErr::Transport(err)),
};
match result {
Ok(out) => return Ok(out),
Err(err) => {
log::debug!("Dial error: {:?}.", err);
if unresolved.is_empty() {
return Err(err);
}
if dial_attempts == MAX_DIAL_ATTEMPTS {
log::debug!(
"Aborting dialing after {} attempts.",
MAX_DIAL_ATTEMPTS
);
return Err(err);
}
last_err = Some(err);
}
}
}
}
Err(last_err.unwrap_or_else(|| {
DnsErr::ResolveError(ResolveErrorKind::Message("No matching records found.").into())
}))
}
.boxed()
.right_future())
}
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum DnsErr<TErr> {
Transport(TErr),
ResolveError(ResolveError),
MultiaddrNotSupported(Multiaddr),
TooManyLookups,
}
impl<TErr> fmt::Display for DnsErr<TErr>
where
TErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DnsErr::Transport(err) => write!(f, "{}", err),
DnsErr::ResolveError(err) => write!(f, "{}", err),
DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a),
DnsErr::TooManyLookups => write!(f, "Too many DNS lookups"),
}
}
}
impl<TErr> error::Error for DnsErr<TErr>
where
TErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DnsErr::Transport(err) => Some(err),
DnsErr::ResolveError(err) => Some(err),
DnsErr::MultiaddrNotSupported(_) => None,
DnsErr::TooManyLookups => None,
}
}
}
enum Resolved<'a> {
One(Protocol<'a>),
Many(Vec<Protocol<'a>>),
Addrs(Vec<Multiaddr>),
}
fn resolve<'a, E: 'a + Send, C, P>(
proto: &Protocol<'a>,
resolver: &'a AsyncResolver<C, P>,
) -> BoxFuture<'a, Result<Resolved<'a>, DnsErr<E>>>
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
{
match proto {
Protocol::Dns(ref name) => resolver
.lookup_ip(name.clone().into_owned())
.map(move |res| match res {
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips
.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one)
.chain(iter::once(two))
.chain(ips)
.map(Protocol::from)
.collect(),
))
} else {
Ok(Resolved::One(Protocol::from(one)))
}
}
Err(e) => Err(DnsErr::ResolveError(e)),
})
.boxed(),
Protocol::Dns4(ref name) => resolver
.ipv4_lookup(name.clone().into_owned())
.map(move |res| match res {
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips
.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one)
.chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect(),
))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e)),
})
.boxed(),
Protocol::Dns6(ref name) => resolver
.ipv6_lookup(name.clone().into_owned())
.map(move |res| match res {
Ok(ips) => {
let mut ips = ips.into_iter();
let one = ips
.next()
.expect("If there are no results, `Err(NoRecordsFound)` is expected.");
if let Some(two) = ips.next() {
Ok(Resolved::Many(
iter::once(one)
.chain(iter::once(two))
.chain(ips)
.map(IpAddr::from)
.map(Protocol::from)
.collect(),
))
} else {
Ok(Resolved::One(Protocol::from(IpAddr::from(one))))
}
}
Err(e) => Err(DnsErr::ResolveError(e)),
})
.boxed(),
Protocol::Dnsaddr(ref name) => {
let name = [DNSADDR_PREFIX, name].concat();
resolver
.txt_lookup(name)
.map(move |res| match res {
Ok(txts) => {
let mut addrs = Vec::new();
for txt in txts {
if let Some(chars) = txt.txt_data().first() {
match parse_dnsaddr_txt(chars) {
Err(e) => {
log::debug!("Invalid TXT record: {:?}", e);
}
Ok(a) => {
addrs.push(a);
}
}
}
}
Ok(Resolved::Addrs(addrs))
}
Err(e) => Err(DnsErr::ResolveError(e)),
})
.boxed()
}
proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed(),
}
}
fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result<Multiaddr> {
let s = str::from_utf8(txt).map_err(invalid_data)?;
match s.strip_prefix("dnsaddr=") {
None => Err(invalid_data("Missing `dnsaddr=` prefix.")),
Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?),
}
}
fn invalid_data(e: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, e)
}
#[cfg(test)]
mod tests {
use super::*;
use futures::future::BoxFuture;
use libp2p_core::{
multiaddr::{Multiaddr, Protocol},
transport::{TransportError, TransportEvent},
PeerId, Transport,
};
#[test]
fn basic_resolve() {
let _ = env_logger::try_init();
#[derive(Clone)]
struct CustomTransport;
impl Transport for CustomTransport {
type Output = ();
type Error = std::io::Error;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(
&mut self,
_: Multiaddr,
) -> Result<ListenerId, TransportError<Self::Error>> {
unreachable!()
}
fn remove_listener(&mut self, _: ListenerId) -> bool {
false
}
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
assert!(!addr.iter().any(|p| matches!(
p,
Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_)
)));
Ok(Box::pin(future::ready(Ok(()))))
}
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
self.dial(addr)
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
unreachable!()
}
}
async fn run<T, C, P>(mut transport: GenDnsConfig<T, C, P>)
where
C: DnsHandle<Error = ResolveError>,
P: ConnectionProvider<Conn = C>,
T: Transport + Clone + Send + Unpin + 'static,
T::Error: Send,
T::Dial: Send,
{
let _ = transport
.dial("/dns4/example.com/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.dial("/dns6/example.com/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap())
.unwrap()
.await
.unwrap();
let _ = transport
.dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
.unwrap()
.await
.unwrap();
match transport
.dial(
format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random())
.parse()
.unwrap(),
)
.unwrap()
.await
{
Err(DnsErr::ResolveError(_)) => {}
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success."),
}
match transport
.dial("/dns4/example.invalid/tcp/20000".parse().unwrap())
.unwrap()
.await
{
Err(DnsErr::ResolveError(e)) => match e.kind() {
ResolveErrorKind::NoRecordsFound { .. } => {}
_ => panic!("Unexpected DNS error: {:?}", e),
},
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => panic!("Unexpected success."),
}
}
#[cfg(feature = "async-std")]
{
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
async_std_crate::task::block_on(
DnsConfig::custom(CustomTransport, config, opts).then(|dns| run(dns.unwrap())),
);
}
#[cfg(feature = "tokio")]
{
let config = ResolverConfig::quad9();
let opts = ResolverOpts::default();
let rt = tokio_crate::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
rt.block_on(run(
TokioDnsConfig::custom(CustomTransport, config, opts).unwrap()
));
}
}
}