use std::{
borrow::{Borrow, Cow},
pin::Pin,
sync::{Arc, RwLock},
task::{Poll, ready},
};
use iroh_base::{EndpointAddr, EndpointId};
pub use iroh_relay::endpoint_info::AddrFilter;
use n0_error::{AnyError, e, stack_error};
use n0_future::{MergeBounded, Stream, boxed::BoxStream};
use tracing::debug;
pub use crate::endpoint_info::{EndpointData, EndpointInfo, ParseError, UserData};
use crate::{Endpoint, endpoint::EndpointError};
#[cfg(not(wasm_browser))]
pub mod dns;
#[cfg(feature = "address-lookup-mdns")]
pub mod mdns;
pub mod memory;
pub mod pkarr;
#[cfg(not(wasm_browser))]
pub use dns::*;
#[cfg(feature = "address-lookup-mdns")]
pub use mdns::*;
pub use memory::*;
#[cfg(feature = "address-lookup-pkarr-dht")]
pub use pkarr::dht::*;
pub use pkarr::*;
pub trait AddressLookupBuilder: Send + Sync + std::fmt::Debug + 'static {
fn into_address_lookup(
self,
endpoint: &Endpoint,
) -> Result<impl AddressLookup, AddressLookupBuilderError>;
}
#[derive(Debug, Clone)]
pub struct FilteredAddressLookup<T> {
inner: T,
filter: AddrFilter,
}
impl<T> FilteredAddressLookup<T> {
pub fn new(inner: T, filter: AddrFilter) -> Self {
Self { inner, filter }
}
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T> AsRef<T> for FilteredAddressLookup<T> {
fn as_ref(&self) -> &T {
&self.inner
}
}
impl<T: AddressLookup> AddressLookup for FilteredAddressLookup<T> {
fn publish(&self, data: &EndpointData) {
let data = data.apply_filter(&self.filter);
self.inner.publish(data.borrow());
}
fn resolve(&self, endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
self.inner.resolve(endpoint_id)
}
}
impl<T: AddressLookup> AddressLookupBuilder for T {
fn into_address_lookup(
self,
_endpoint: &Endpoint,
) -> Result<impl AddressLookup, AddressLookupBuilderError> {
Ok(self)
}
}
pub(crate) trait DynAddressLookupBuilder: Send + Sync + std::fmt::Debug + 'static {
fn into_address_lookup(
self: Box<Self>,
endpoint: &Endpoint,
) -> Result<Box<dyn AddressLookup>, AddressLookupBuilderError>;
}
impl<T: AddressLookupBuilder> DynAddressLookupBuilder for T {
fn into_address_lookup(
self: Box<Self>,
endpoint: &Endpoint,
) -> Result<Box<dyn AddressLookup>, AddressLookupBuilderError> {
let addr_lookup: Box<dyn AddressLookup> =
Box::new(AddressLookupBuilder::into_address_lookup(*self, endpoint)?);
Ok(addr_lookup)
}
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta, from_sources, std_sources)]
#[non_exhaustive]
pub enum AddressLookupBuilderError {
#[error("Service '{provenance}' error")]
User {
provenance: &'static str,
source: AnyError,
},
#[error(transparent)]
EndpointClosed { source: EndpointError },
}
impl AddressLookupBuilderError {
pub fn from_err<T: std::error::Error + Send + Sync + 'static>(
provenance: &'static str,
source: T,
) -> Self {
e!(AddressLookupBuilderError::User {
provenance,
source: AnyError::from_std(source)
})
}
pub fn from_err_box(
provenance: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> Self {
e!(AddressLookupBuilderError::User {
provenance,
source: AnyError::from_std_box(source)
})
}
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
#[derive(Clone)]
pub enum AddressLookupFailed {
#[error("No address lookup configured")]
NoServiceConfigured,
#[error(
"All address lookup services failed or produced no results{}",
{
let errors = errors.iter().map(|err| format!("{err:#}")).collect::<Vec<_>>();
if !errors.is_empty() {
format!("\n {}", errors.join("\n "))
} else {
String::new()
}
}
)]
NoResults { errors: Vec<Error> },
}
#[stack_error(derive, add_meta)]
#[error("Service '{provenance}' failed")]
#[derive(Clone)]
pub struct Error {
provenance: &'static str,
#[error(source)]
source: Arc<AnyError>,
}
impl Error {
#[track_caller]
pub fn from_err<T: std::error::Error + Send + Sync + 'static>(
provenance: &'static str,
source: T,
) -> Self {
Self::from_err_any(provenance, AnyError::from_std(source))
}
#[track_caller]
pub fn from_err_box(
provenance: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> Self {
Self::from_err_any(provenance, AnyError::from_std_box(source))
}
#[track_caller]
pub fn from_err_any(provenance: &'static str, source: impl Into<AnyError>) -> Self {
Self::new(provenance, Arc::new(source.into()))
}
}
pub trait AddressLookup: std::fmt::Debug + Send + Sync + 'static {
fn publish(&self, _data: &EndpointData) {}
fn resolve(&self, _endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
None
}
}
impl<T: AddressLookup> AddressLookup for Arc<T> {
fn publish(&self, data: &EndpointData) {
self.as_ref().publish(data);
}
fn resolve(&self, endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
self.as_ref().resolve(endpoint_id)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Item {
endpoint_info: EndpointInfo,
provenance: &'static str,
last_updated: Option<u64>,
}
impl Item {
pub fn new(
endpoint_info: EndpointInfo,
provenance: &'static str,
last_updated: Option<u64>,
) -> Self {
Self {
endpoint_info,
provenance,
last_updated,
}
}
pub fn endpoint_id(&self) -> EndpointId {
self.endpoint_info.endpoint_id
}
pub fn endpoint_info(&self) -> &EndpointInfo {
&self.endpoint_info
}
pub fn provenance(&self) -> &'static str {
self.provenance
}
pub fn last_updated(&self) -> Option<u64> {
self.last_updated
}
pub fn to_endpoint_addr(&self) -> EndpointAddr {
self.endpoint_info.to_endpoint_addr()
}
pub fn into_endpoint_addr(self) -> EndpointAddr {
self.endpoint_info.into_endpoint_addr()
}
pub fn user_data(&self) -> Option<UserData> {
self.endpoint_info().data.user_data().cloned()
}
}
impl std::ops::Deref for Item {
type Target = EndpointData;
fn deref(&self) -> &Self::Target {
&self.endpoint_info.data
}
}
impl From<Item> for EndpointInfo {
fn from(item: Item) -> Self {
item.endpoint_info
}
}
#[derive(Debug, Default, Clone)]
pub struct AddressLookupServices {
services: Arc<RwLock<Vec<Box<dyn AddressLookup>>>>,
last_data: Arc<RwLock<Option<EndpointData>>>,
addr_filter: Arc<RwLock<Option<AddrFilter>>>,
}
impl AddressLookupServices {
pub fn set_addr_filter(&self, filter: AddrFilter) {
*self.addr_filter.write().expect("poisoned") = Some(filter);
}
pub fn add(&self, service: impl AddressLookup + 'static) {
self.add_boxed(Box::new(service))
}
pub fn add_boxed(&self, service: Box<dyn AddressLookup>) {
{
let data = self.last_data.read().expect("poisoned");
if let Some(data) = &*data {
service.publish(data)
}
}
self.services.write().expect("poisoned").push(service);
}
pub fn is_empty(&self) -> bool {
self.services.read().expect("poisoned").is_empty()
}
pub fn len(&self) -> usize {
self.services.read().expect("poisoned").len()
}
pub fn clear(&self) {
let mut services = self.services.write().expect("poisoned");
services.clear();
}
pub(crate) fn publish(&self, data: &EndpointData) {
let data = match &*self.addr_filter.read().expect("poisoned") {
Some(filter) => data.apply_filter(filter),
None => Cow::Borrowed(data),
};
let services = self.services.read().expect("poisoned");
for service in &*services {
service.publish(&data);
}
self.last_data
.write()
.expect("poisoned")
.replace(data.into_owned());
}
pub fn resolve(
&self,
endpoint_id: EndpointId,
) -> impl Stream<Item = Result<Result<Item, Error>, AddressLookupFailed>> + use<> {
let services = self.services.read().expect("poisoned");
if services.is_empty() {
AddressLookupStream::empty()
} else {
let streams = services
.iter()
.filter_map(|service| service.resolve(endpoint_id));
AddressLookupStream::new(streams)
}
}
}
struct AddressLookupStream {
streams: Option<MergeBounded<BoxStream<Result<Item, Error>>>>,
errors: Vec<Error>,
did_emit: bool,
closed: bool,
}
impl AddressLookupStream {
fn empty() -> Self {
Self {
streams: None,
errors: Vec::new(),
did_emit: false,
closed: false,
}
}
fn new(streams: impl Iterator<Item = BoxStream<Result<Item, Error>>>) -> Self {
Self {
streams: Some(MergeBounded::from_iter(streams)),
errors: Vec::new(),
did_emit: false,
closed: false,
}
}
}
impl Stream for AddressLookupStream {
type Item = Result<Result<Item, Error>, AddressLookupFailed>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.closed {
return Poll::Ready(None);
}
let mut inner = match this.streams.as_mut() {
Some(inner) => inner,
None => {
this.closed = true;
return Poll::Ready(Some(Err(e!(AddressLookupFailed::NoServiceConfigured))));
}
};
let item = match ready!(Pin::new(&mut inner).poll_next(cx)) {
Some(Ok(item)) => {
this.did_emit = true;
Some(Ok(Ok(item)))
}
Some(Err(error)) => {
debug!("address lookup error: {error:#}");
this.errors.push(error.clone());
Some(Ok(Err(error)))
}
None => {
this.closed = true;
if !this.did_emit {
let errors = std::mem::take(&mut this.errors);
Some(Err(e!(AddressLookupFailed::NoResults { errors })))
} else {
None
}
}
};
Poll::Ready(item)
}
}
#[cfg(all(test, with_crypto_provider))]
mod tests {
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
use iroh_base::{EndpointAddr, SecretKey, TransportAddr};
use n0_error::{AnyError, Result, StackResultExt};
use n0_future::{StreamExt, time};
use n0_tracing_test::traced_test;
use rand::{CryptoRng, RngExt, SeedableRng};
use tokio_util::task::AbortOnDropHandle;
use super::*;
use crate::{
Endpoint,
endpoint::{ConnectOptions, IdleTimeout, QuicTransportConfig, presets},
};
type InfoStore = HashMap<EndpointId, (EndpointData, u64)>;
#[derive(Debug, Clone, Default)]
struct TestAddressLookupShared {
endpoints: Arc<Mutex<InfoStore>>,
}
impl TestAddressLookupShared {
pub fn create_address_lookup(&self, endpoint_id: EndpointId) -> TestAddressLookup {
TestAddressLookup {
endpoint_id,
shared: self.clone(),
publish: true,
resolve_wrong: false,
delay: Duration::from_millis(200),
}
}
pub fn create_lying_address_lookup(&self, endpoint_id: EndpointId) -> TestAddressLookup {
TestAddressLookup {
endpoint_id,
shared: self.clone(),
publish: false,
resolve_wrong: true,
delay: Duration::from_millis(100),
}
}
}
#[derive(Debug)]
struct TestAddressLookup {
endpoint_id: EndpointId,
shared: TestAddressLookupShared,
publish: bool,
resolve_wrong: bool,
delay: Duration,
}
impl AddressLookup for TestAddressLookup {
fn publish(&self, data: &EndpointData) {
if !self.publish {
return;
}
let now = system_time_now();
self.shared
.endpoints
.lock()
.unwrap()
.insert(self.endpoint_id, (data.clone(), now));
}
fn resolve(&self, endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
let addr_info = if self.resolve_wrong {
let ts = system_time_now() - 100_000;
let port: u16 = rand::rng().random_range(10_000..20_000);
let addr: SocketAddr = format!("240.0.0.1:{port}").parse().unwrap();
let data = EndpointData::from_iter([TransportAddr::Ip(addr)]);
Some((data, ts))
} else {
self.shared
.endpoints
.lock()
.unwrap()
.get(&endpoint_id)
.cloned()
};
let stream = match addr_info {
Some((data, ts)) => {
let item = Item::new(
EndpointInfo::from_parts(endpoint_id, data),
"test-addr-lookup",
Some(ts),
);
let delay = self.delay;
let fut = async move {
time::sleep(delay).await;
tracing::debug!("resolve: {} = {item:?}", endpoint_id.fmt_short());
Ok(item)
};
n0_future::stream::once_future(fut).boxed()
}
None => n0_future::stream::empty().boxed(),
};
Some(stream)
}
}
#[derive(Debug, Clone)]
struct EmptyAddressLookup;
impl AddressLookup for EmptyAddressLookup {
fn publish(&self, _data: &EndpointData) {}
fn resolve(&self, _endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
Some(n0_future::stream::empty().boxed())
}
}
#[derive(Debug, Clone)]
struct FailingAddressLookup {
delay: Duration,
}
impl AddressLookup for FailingAddressLookup {
fn publish(&self, _data: &EndpointData) {}
fn resolve(&self, _endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
let delay = self.delay;
let fut = async move {
time::sleep(delay).await;
Err(Error::from_err(
"failing-test",
std::io::Error::other("simulated resolver failure"),
))
};
Some(n0_future::stream::once_future(fut).boxed())
}
}
const TEST_ALPN: &[u8] = b"n0/iroh/test";
#[tokio::test]
#[traced_test]
async fn address_lookup_simple_shared() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let eir_shared = TestAddressLookupShared::default();
let (ep1, _guard1) =
new_endpoint(&mut rng, |ep| eir_shared.create_address_lookup(ep.id())).await;
let (ep2, _guard2) =
new_endpoint(&mut rng, |ep| eir_shared.create_address_lookup(ep.id())).await;
let ep1_addr = EndpointAddr::new(ep1.id());
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_simple_shared_with_arc() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
Arc::new(address_lookup_shared.create_address_lookup(ep.id()))
})
.await;
let (ep2, _guard2) = new_endpoint(&mut rng, |ep| {
Arc::new(address_lookup_shared.create_address_lookup(ep.id()))
})
.await;
let ep1_addr = EndpointAddr::new(ep1.id());
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_combined_with_empty_and_right() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint_add(&mut rng, |ep| {
let addr_lookup1 = EmptyAddressLookup;
let addr_lookup2 = address_lookup_shared.create_address_lookup(ep.id());
ep.address_lookup()
.expect("endpoint is still open")
.add(addr_lookup1);
ep.address_lookup()
.expect("endpoint is still open")
.add(addr_lookup2);
})
.await;
let ep1_addr = EndpointAddr::new(ep1.id());
assert_eq!(
ep2.address_lookup().expect("endpoint is still open").len(),
2
);
let _conn = ep2
.connect(ep1_addr, TEST_ALPN)
.await
.context("connecting")?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_combined_with_empty_and_wrong() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint_add(&mut rng, |ep| {
let address_lookup1 = EmptyAddressLookup;
let address_lookup2 = address_lookup_shared.create_lying_address_lookup(ep.id());
let address_lookup3 = address_lookup_shared.create_address_lookup(ep.id());
let address_lookup = ep.address_lookup().unwrap();
address_lookup.add(address_lookup1);
address_lookup.add(address_lookup2);
address_lookup.add(address_lookup3);
})
.await;
let _conn = ep2.connect(ep1.id(), TEST_ALPN).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_succeeds_after_other_resolver_errors() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint_add(&mut rng, |ep| {
ep.address_lookup()
.unwrap()
.add(address_lookup_shared.create_address_lookup(ep.id()));
})
.await;
let (ep2, _guard2) = new_endpoint_add(&mut rng, |ep| {
let failing = FailingAddressLookup {
delay: Duration::from_millis(50),
};
let working = address_lookup_shared.create_address_lookup(ep.id());
let address_lookup = ep.address_lookup().unwrap();
address_lookup.add(failing);
address_lookup.add(working);
})
.await;
let _conn = ep2
.connect(ep1.id(), TEST_ALPN)
.await
.context("connect after other resolver errored")?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_combined_wrong_only() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint_add(&mut rng, |ep| {
let address_lookup1 = address_lookup_shared.create_lying_address_lookup(ep.id());
ep.address_lookup().unwrap().add(address_lookup1)
})
.await;
let cfg = QuicTransportConfig::builder()
.keep_alive_interval(Duration::from_secs(1))
.max_idle_timeout(Some(IdleTimeout::try_from(Duration::from_secs(3)).unwrap()))
.build();
let opts = ConnectOptions::new().with_transport_config(cfg);
let res = ep2
.connect_with_opts(ep1.id(), TEST_ALPN, opts)
.await? .await; assert!(res.is_err());
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_with_wrong_existing_addr() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let ep1_wrong_addr = EndpointAddr::from_parts(
ep1.id(),
[TransportAddr::Ip("240.0.0.1:1000".parse().unwrap())],
);
let _conn = ep2.connect(ep1_wrong_addr, TEST_ALPN).await?;
Ok(())
}
#[test]
fn concurrent_address_lookup_addr_filter() {
use iroh_base::RelayUrl;
#[derive(Debug, Clone, Default)]
struct RecordingLookup {
published: Arc<Mutex<Vec<EndpointData>>>,
}
impl AddressLookup for RecordingLookup {
fn publish(&self, data: &EndpointData) {
self.published.lock().unwrap().push(data.clone());
}
fn resolve(&self, _endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
None
}
}
let recorder = RecordingLookup::default();
let lookup = AddressLookupServices::default();
lookup.set_addr_filter(AddrFilter::relay_only());
lookup.add(recorder.clone());
let relay_url: RelayUrl = "https://relay.example.com".parse().unwrap();
let ip_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
let data = EndpointData::from_iter([
TransportAddr::Relay(relay_url.clone()),
TransportAddr::Ip(ip_addr),
]);
lookup.publish(&data);
let published = recorder.published.lock().unwrap();
assert_eq!(published.len(), 1);
let addrs: Vec<_> = published[0].addrs().cloned().collect();
assert_eq!(addrs, vec![TransportAddr::Relay(relay_url)]);
assert!(
!addrs.contains(&TransportAddr::Ip(ip_addr)),
"IP address should have been filtered out"
);
}
async fn new_endpoint<R: CryptoRng, D: AddressLookup + 'static, F: FnOnce(&Endpoint) -> D>(
rng: &mut R,
create_address_lookup: F,
) -> (Endpoint, AbortOnDropHandle<Result<()>>) {
new_endpoint_add(rng, |ep| {
let address_lookup = create_address_lookup(ep);
ep.address_lookup()
.expect("endpoint is still open")
.add(address_lookup);
})
.await
}
async fn new_endpoint_add<R: CryptoRng, F: FnOnce(&Endpoint)>(
rng: &mut R,
add_address_lookup: F,
) -> (Endpoint, AbortOnDropHandle<Result<()>>) {
let secret = SecretKey::from_bytes(&rng.random());
let ep = Endpoint::builder(presets::Minimal)
.secret_key(secret)
.alpns(vec![TEST_ALPN.to_vec()])
.bind()
.await
.unwrap();
add_address_lookup(&ep);
let handle = tokio::spawn({
let ep = ep.clone();
async move {
let mut connections = Vec::new();
while let Some(accepting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
let conn = accepting.await.context("accepting")?;
connections.push(conn);
}
Ok::<_, AnyError>(())
}
});
(ep, AbortOnDropHandle::new(handle))
}
fn system_time_now() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time drift")
.as_micros() as u64
}
}
#[cfg(test)]
mod test_dns_pkarr {
use iroh_base::{EndpointAddr, SecretKey, TransportAddr};
use iroh_relay::{
endpoint_info::UserData,
tls::{CaRootsConfig, default_provider},
};
use n0_error::{Result, StackResultExt};
use n0_future::time::Duration;
use n0_tracing_test::traced_test;
use rand::{RngExt, SeedableRng};
use crate::{
address_lookup::{EndpointData, PkarrPublisher},
dns::DnsResolver,
endpoint_info::EndpointInfo,
test_utils::{DnsPkarrServer, dns_server::run_dns_server, pkarr_dns_state::State},
};
const PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test]
#[traced_test]
async fn dns_resolve() -> Result<()> {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let origin = "testdns.example".to_string();
let state = State::new(origin.clone());
let (nameserver, _dns_drop_guard) = run_dns_server(state.clone())
.await
.context("Running DNS server")?;
let secret_key = SecretKey::from_bytes(&rng.random());
let endpoint_info = EndpointInfo::new(secret_key.public())
.with_relay_url("https://relay.example".parse().unwrap());
let signed_packet = endpoint_info.to_pkarr_signed_packet(&secret_key, 30)?;
state
.upsert(signed_packet)
.context("update and insert signed packet")?;
let resolver = DnsResolver::with_nameserver(nameserver);
let resolved = resolver
.lookup_endpoint_by_id(&endpoint_info.endpoint_id, &origin)
.await?;
assert_eq!(resolved, endpoint_info);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn pkarr_publish_dns_resolve() -> Result<()> {
let origin = "testdns.example".to_string();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let dns_pkarr_server = DnsPkarrServer::run_with_origin(origin.clone())
.await
.context("DnsPkarrServer")?;
let secret_key = SecretKey::from_bytes(&rng.random());
let endpoint_id = secret_key.public();
let relay_url = Some(TransportAddr::Relay(
"https://relay.example".parse().unwrap(),
));
let tls_config = CaRootsConfig::insecure_skip_verify()
.client_config(default_provider())
.expect("infallible");
let resolver = DnsResolver::with_nameserver(dns_pkarr_server.nameserver);
let publisher = PkarrPublisher::builder(dns_pkarr_server.pkarr_url.clone())
.build(secret_key, tls_config);
let user_data: UserData = "foobar".parse().unwrap();
let data = EndpointData::from_iter(relay_url.clone()).with_user_data(user_data.clone());
publisher.update_endpoint_data(&data);
dns_pkarr_server
.on_endpoint(&endpoint_id, PUBLISH_TIMEOUT)
.await
.context("wait for on endpoint update")?;
let resolved = resolver
.lookup_endpoint_by_id(&endpoint_id, &origin)
.await?;
println!("resolved {resolved:?}");
let expected_addr = EndpointAddr::from_parts(endpoint_id, relay_url);
assert_eq!(resolved.to_endpoint_addr(), expected_addr);
assert_eq!(resolved.user_data(), Some(&user_data));
Ok(())
}
#[cfg(with_crypto_provider)]
const TEST_ALPN: &[u8] = b"TEST";
#[cfg(with_crypto_provider)]
#[tokio::test]
#[traced_test]
async fn pkarr_publish_dns_address_lookup() -> Result<()> {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let dns_pkarr_server = DnsPkarrServer::run().await.context("DnsPkarrServer run")?;
let (relay_map, _relay_url, _relay_guard) = crate::test_utils::run_relay_server().await?;
let (ep1, _guard1) =
ep_with_address_lookup(&mut rng, &relay_map, &dns_pkarr_server).await?;
let (ep2, _guard2) =
ep_with_address_lookup(&mut rng, &relay_map, &dns_pkarr_server).await?;
dns_pkarr_server
.on_endpoint(&ep1.id(), PUBLISH_TIMEOUT)
.await
.context("wait for on endpoint update")?;
let _conn = ep2.connect(ep1.id(), TEST_ALPN).await?;
Ok(())
}
#[cfg(with_crypto_provider)]
async fn ep_with_address_lookup<R: rand::CryptoRng + ?Sized>(
rng: &mut R,
relay_map: &iroh_relay::RelayMap,
dns_pkarr_server: &DnsPkarrServer,
) -> Result<(
crate::Endpoint,
n0_future::task::AbortOnDropHandle<Result<()>>,
)> {
use n0_future::task::AbortOnDropHandle;
use crate::{Endpoint, RelayMode, endpoint::presets};
let secret_key = SecretKey::from_bytes(&rng.random());
let ep = Endpoint::builder(presets::Minimal)
.relay_mode(RelayMode::Custom(relay_map.clone()))
.ca_roots_config(CaRootsConfig::insecure_skip_verify())
.secret_key(secret_key.clone())
.alpns(vec![TEST_ALPN.to_vec()])
.preset(dns_pkarr_server.preset())
.bind()
.await?;
let handle = tokio::spawn({
let ep = ep.clone();
async move {
use n0_error::AnyError;
while let Some(accepting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
let _conn = accepting.await.context("accepting")?;
}
Ok::<_, AnyError>(())
}
});
Ok((ep, AbortOnDropHandle::new(handle)))
}
}