use std::sync::Arc;
use anyhow::{anyhow, bail, Result};
use futures_util::stream::BoxStream;
use pkarr::SignedPacket;
use tokio::{
task::JoinHandle,
time::{Duration, Instant},
};
use tracing::{debug, error_span, info, warn, Instrument};
use url::Url;
use watchable::{Watchable, Watcher};
use crate::{
discovery::{Discovery, DiscoveryItem},
dns::node_info::NodeInfo,
key::SecretKey,
AddrInfo, Endpoint, NodeId,
};
pub const N0_DNS_PKARR_RELAY_PROD: &str = "https://dns.iroh.link/pkarr";
pub const N0_DNS_PKARR_RELAY_STAGING: &str = "https://staging-dns.iroh.link/pkarr";
pub const DEFAULT_PKARR_TTL: u32 = 30;
pub const DEFAULT_REPUBLISH_INTERVAL: Duration = Duration::from_secs(60 * 5);
#[derive(derive_more::Debug, Clone)]
pub struct PkarrPublisher {
node_id: NodeId,
watchable: Watchable<Option<NodeInfo>>,
join_handle: Arc<JoinHandle<()>>,
}
impl PkarrPublisher {
pub fn new(secret_key: SecretKey, pkarr_relay: Url) -> Self {
Self::with_options(
secret_key,
pkarr_relay,
DEFAULT_PKARR_TTL,
DEFAULT_REPUBLISH_INTERVAL,
)
}
pub fn with_options(
secret_key: SecretKey,
pkarr_relay: Url,
ttl: u32,
republish_interval: std::time::Duration,
) -> Self {
debug!("creating pkarr publisher that publishes to {pkarr_relay}");
let node_id = secret_key.public();
let pkarr_client = PkarrRelayClient::new(pkarr_relay);
let watchable = Watchable::default();
let service = PublisherService {
ttl,
watcher: watchable.watch(),
secret_key,
pkarr_client,
republish_interval,
};
let join_handle = tokio::task::spawn(
service
.run()
.instrument(error_span!("pkarr_publish", me=%node_id.fmt_short())),
);
Self {
watchable,
node_id,
join_handle: Arc::new(join_handle),
}
}
pub fn n0_dns(secret_key: SecretKey) -> Self {
#[cfg(not(any(test, feature = "test-utils")))]
let pkarr_relay = N0_DNS_PKARR_RELAY_PROD;
#[cfg(any(test, feature = "test-utils"))]
let pkarr_relay = N0_DNS_PKARR_RELAY_STAGING;
let pkarr_relay: Url = pkarr_relay.parse().expect("url is valid");
Self::new(secret_key, pkarr_relay)
}
pub fn update_addr_info(&self, info: &AddrInfo) {
let (relay_url, direct_addresses) = if let Some(relay_url) = info.relay_url.as_ref() {
(Some(relay_url.clone().into()), Default::default())
} else {
(None, info.direct_addresses.clone())
};
let info = NodeInfo::new(self.node_id, relay_url, direct_addresses);
self.watchable.update(Some(info)).ok();
}
}
impl Discovery for PkarrPublisher {
fn publish(&self, info: &AddrInfo) {
self.update_addr_info(info);
}
}
impl Drop for PkarrPublisher {
fn drop(&mut self) {
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
handle.abort();
}
}
}
#[derive(derive_more::Debug, Clone)]
struct PublisherService {
#[debug("SecretKey")]
secret_key: SecretKey,
#[debug("PkarrClient")]
pkarr_client: PkarrRelayClient,
watcher: Watcher<Option<NodeInfo>>,
ttl: u32,
republish_interval: Duration,
}
impl PublisherService {
async fn run(self) {
let mut failed_attempts = 0;
let republish = tokio::time::sleep(Duration::MAX);
tokio::pin!(republish);
loop {
if let Some(info) = self.watcher.get() {
if let Err(err) = self.publish_current(info).await {
warn!(?err, url = %self.pkarr_client.pkarr_relay_url , "Failed to publish to pkarr");
failed_attempts += 1;
republish
.as_mut()
.reset(Instant::now() + Duration::from_secs(failed_attempts));
} else {
failed_attempts = 0;
republish
.as_mut()
.reset(Instant::now() + self.republish_interval);
}
}
tokio::select! {
res = self.watcher.watch_async() => match res {
Ok(()) => debug!("Publish node info to pkarr (info changed)"),
Err(_disconnected) => break,
},
_ = &mut republish => debug!("Publish node info to pkarr (interval elapsed)"),
}
}
}
async fn publish_current(&self, info: NodeInfo) -> Result<()> {
info!(
relay_url = ?info
.relay_url
.as_ref()
.map(|s| s.as_str()),
"Publish node info to pkarr"
);
let signed_packet = info.to_pkarr_signed_packet(&self.secret_key, self.ttl)?;
self.pkarr_client.publish(&signed_packet).await?;
Ok(())
}
}
#[derive(derive_more::Debug, Clone)]
pub struct PkarrResolver {
pkarr_client: PkarrRelayClient,
}
impl PkarrResolver {
pub fn new(pkarr_relay: Url) -> Self {
Self {
pkarr_client: PkarrRelayClient::new(pkarr_relay),
}
}
pub fn n0_dns() -> Self {
#[cfg(not(any(test, feature = "test-utils")))]
let pkarr_relay = N0_DNS_PKARR_RELAY_PROD;
#[cfg(any(test, feature = "test-utils"))]
let pkarr_relay = N0_DNS_PKARR_RELAY_STAGING;
let pkarr_relay: Url = pkarr_relay.parse().expect("url is valid");
Self::new(pkarr_relay)
}
}
impl Discovery for PkarrResolver {
fn resolve(
&self,
_ep: Endpoint,
node_id: NodeId,
) -> Option<BoxStream<'static, Result<DiscoveryItem>>> {
let pkarr_client = self.pkarr_client.clone();
let fut = async move {
let signed_packet = pkarr_client.resolve(node_id).await?;
let info = NodeInfo::from_pkarr_signed_packet(&signed_packet)?;
Ok(DiscoveryItem {
provenance: "pkarr",
last_updated: None,
addr_info: info.into(),
})
};
let stream = futures_lite::stream::once_future(fut);
Some(Box::pin(stream))
}
}
#[derive(Debug, Clone)]
pub struct PkarrRelayClient {
http_client: reqwest::Client,
pkarr_relay_url: Url,
}
impl PkarrRelayClient {
pub fn new(pkarr_relay_url: Url) -> Self {
Self {
http_client: reqwest::Client::new(),
pkarr_relay_url,
}
}
pub async fn resolve(&self, node_id: NodeId) -> anyhow::Result<SignedPacket> {
let public_key = pkarr::PublicKey::try_from(node_id.as_bytes())?;
let mut url = self.pkarr_relay_url.clone();
url.path_segments_mut()
.map_err(|_| anyhow!("Failed to resolve: Invalid relay URL"))?
.push(&public_key.to_z32());
let response = self.http_client.get(url).send().await?;
if !response.status().is_success() {
bail!(format!(
"Resolve request failed with status {}",
response.status()
))
}
let payload = response.bytes().await?;
Ok(SignedPacket::from_relay_payload(&public_key, &payload)?)
}
pub async fn publish(&self, signed_packet: &SignedPacket) -> anyhow::Result<()> {
let mut url = self.pkarr_relay_url.clone();
url.path_segments_mut()
.map_err(|_| anyhow!("Failed to publish: Invalid relay URL"))?
.push(&signed_packet.public_key().to_z32());
let response = self
.http_client
.put(url)
.body(signed_packet.to_relay_payload())
.send()
.await?;
if !response.status().is_success() {
bail!(format!(
"Publish request failed with status {}",
response.status()
))
}
Ok(())
}
}