use std::sync::{Arc, Mutex};
use iroh_base::{EndpointId, SecretKey};
use iroh_dns::pkarr::{SignedPacket, SignedPacketVerifyError, Timestamp};
use mainline::{Dht, DhtBuilder, MutableItem};
use n0_future::{
boxed::BoxStream,
stream::StreamExt,
task::{self, AbortOnDropHandle},
time::{self, Duration},
};
use crate::{
Endpoint,
address_lookup::{
AddrFilter, AddressLookup, AddressLookupBuilder, AddressLookupBuilderError, EndpointData,
Error as AddressLookupError, Item as AddressLookupItem, pkarr::DEFAULT_PKARR_TTL,
},
endpoint_info::EndpointInfo,
};
const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
fn signed_packet_to_mutable_item(packet: &SignedPacket) -> MutableItem {
MutableItem::new_signed_unchecked(
*packet.public_key().as_bytes(),
packet.signature().to_bytes(),
packet.encoded_packet(),
packet.timestamp().as_micros() as i64,
None,
)
}
fn mutable_item_to_signed_packet(
item: &MutableItem,
) -> Result<SignedPacket, SignedPacketVerifyError> {
SignedPacket::from_parts_unchecked(
item.key(),
item.signature(),
Timestamp::from_micros(item.seq() as u64),
item.value(),
)
}
#[derive(Debug, Clone)]
pub struct DhtAddressLookup(Arc<Inner>);
#[derive(derive_more::Debug)]
struct Inner {
dht: Dht,
task: Mutex<Option<AbortOnDropHandle<()>>>,
secret_key: Option<SecretKey>,
ttl: u32,
republish_delay: Duration,
filter: AddrFilter,
}
impl Inner {
async fn resolve_dht(
&self,
public_key: EndpointId,
) -> Option<Result<AddressLookupItem, AddressLookupError>> {
tracing::info!("resolving {} from DHT", public_key.to_z32());
let maybe_item = self
.dht
.clone()
.as_async()
.get_mutable_most_recent(public_key.as_bytes(), None)
.await;
match maybe_item {
Some(item) => {
let signed_packet = match mutable_item_to_signed_packet(&item) {
Ok(packet) => packet,
Err(err) => {
tracing::debug!("failed to parse mutable item as signed packet: {err}");
return None;
}
};
match EndpointInfo::from_pkarr_signed_packet(&signed_packet) {
Ok(endpoint_info) => {
tracing::info!("discovered endpoint info {:?}", endpoint_info);
Some(Ok(AddressLookupItem::new(endpoint_info, "pkarr", None)))
}
Err(_err) => {
tracing::debug!("failed to parse signed packet as endpoint info");
None
}
}
}
None => {
tracing::debug!("no signed packet found");
None
}
}
}
}
#[derive(Debug)]
pub struct Builder {
dht_builder: Option<DhtBuilder>,
secret_key: Option<SecretKey>,
ttl: Option<u32>,
republish_delay: Duration,
enable_publish: bool,
addr_filter: AddrFilter,
}
impl Default for Builder {
fn default() -> Self {
Self {
dht_builder: None,
secret_key: None,
ttl: None,
republish_delay: REPUBLISH_DELAY,
enable_publish: true,
addr_filter: AddrFilter::relay_only(),
}
}
}
impl Builder {
pub fn dht_builder(mut self, builder: DhtBuilder) -> Self {
self.dht_builder = Some(builder);
self
}
pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
self.secret_key = Some(secret_key);
self
}
pub fn ttl(mut self, ttl: u32) -> Self {
self.ttl = Some(ttl);
self
}
pub fn republish_delay(mut self, republish_delay: Duration) -> Self {
self.republish_delay = republish_delay;
self
}
pub fn no_publish(mut self) -> Self {
self.enable_publish = false;
self
}
pub fn addr_filter(mut self, filter: AddrFilter) -> Self {
self.addr_filter = filter;
self
}
pub fn build(self) -> Result<DhtAddressLookup, AddressLookupBuilderError> {
let dht_builder = self.dht_builder.unwrap_or_default();
let dht = dht_builder
.build()
.map_err(|e| AddressLookupBuilderError::from_err("pkarr-dht", e))?;
let ttl = self.ttl.unwrap_or(DEFAULT_PKARR_TTL);
let secret_key = self.secret_key.filter(|_| self.enable_publish);
Ok(DhtAddressLookup(Arc::new(Inner {
dht,
ttl,
secret_key,
republish_delay: self.republish_delay,
task: Default::default(),
filter: self.addr_filter,
})))
}
}
impl AddressLookupBuilder for Builder {
fn into_address_lookup(
self,
endpoint: &Endpoint,
) -> Result<impl AddressLookup, AddressLookupBuilderError> {
self.secret_key(endpoint.secret_key().clone()).build()
}
}
impl DhtAddressLookup {
pub fn builder() -> Builder {
Builder::default()
}
async fn publish_loop(self, signed_packet: SignedPacket) {
let this = self;
let public_key = signed_packet.public_key();
let z32 = public_key.to_z32();
let item = signed_packet_to_mutable_item(&signed_packet);
let initial_cas = this
.0
.dht
.clone()
.as_async()
.get_mutable_most_recent(public_key.as_bytes(), None)
.await
.map(|item| item.seq());
let mut cas = initial_cas;
loop {
let res = this
.0
.dht
.clone()
.as_async()
.put_mutable(item.clone(), cas)
.await;
match res {
Ok(_) => {
tracing::debug!("pkarr publish success. published under {z32}");
}
Err(e) => {
tracing::warn!("pkarr publish error: {}", e);
}
}
cas = None;
time::sleep(this.0.republish_delay).await;
}
}
}
impl AddressLookup for DhtAddressLookup {
fn publish(&self, data: &EndpointData) {
let Some(keypair) = &self.0.secret_key else {
tracing::debug!("no keypair set, not publishing");
return;
};
let data = data.apply_filter(&self.0.filter).into_owned();
if !data.has_addrs() {
tracing::debug!("no relay url or direct addresses in endpoint data, not publishing");
return;
}
tracing::debug!("publishing {data:?}");
let info = EndpointInfo::from_parts(keypair.public(), data);
let Ok(signed_packet) = info.to_pkarr_signed_packet(keypair, self.0.ttl) else {
tracing::warn!("failed to create signed packet");
return;
};
let this = self.clone();
let curr = task::spawn(this.publish_loop(signed_packet));
let mut task = self.0.task.lock().expect("poisoned");
*task = Some(AbortOnDropHandle::new(curr));
}
fn resolve(
&self,
endpoint_id: EndpointId,
) -> Option<BoxStream<Result<AddressLookupItem, AddressLookupError>>> {
let z32 = endpoint_id.to_z32();
tracing::info!("resolving {} as {}", endpoint_id, z32);
let address_lookup = self.0.clone();
let stream =
n0_future::stream::once_future(
async move { address_lookup.resolve_dht(endpoint_id).await },
)
.filter_map(|x| x)
.boxed();
Some(stream)
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use iroh_base::{RelayUrl, TransportAddr};
use mainline::Testnet;
use n0_error::{Result, StdResultExt};
use n0_tracing_test::traced_test;
use url::Url;
use super::*;
#[tokio::test]
#[ignore = "flaky"]
#[traced_test]
async fn dht_address_lookup_smoke() -> Result {
let secret = SecretKey::generate();
let testnet = Testnet::new_async(3).await.anyerr()?;
let mut dht_builder = DhtBuilder::default();
dht_builder.bootstrap(&testnet.bootstrap);
let address_lookup = DhtAddressLookup::builder()
.secret_key(secret.clone())
.dht_builder(dht_builder)
.addr_filter(AddrFilter::unfiltered())
.build()?;
let relay_url: RelayUrl = Url::parse("https://example.com").anyerr()?.into();
let data = EndpointData::from_iter([TransportAddr::Relay(relay_url.clone())]);
address_lookup.publish(&data);
tokio::time::timeout(Duration::from_secs(30), async move {
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
let mut found_relay_urls = BTreeSet::new();
let items = address_lookup
.resolve(secret.public())
.unwrap()
.collect::<Vec<_>>()
.await;
for item in items.into_iter().flatten() {
for url in item.relay_urls() {
found_relay_urls.insert(url.clone());
}
}
if found_relay_urls.contains(&relay_url) {
break;
}
}
})
.await
.expect("timeout, relay_url not found on DHT");
Ok(())
}
}