iroh_mainline_address_lookup/
lib.rs1use std::sync::{Arc, Mutex};
9
10use iroh::{
11 Endpoint,
12 address_lookup::{
13 AddrFilter, AddressLookup, AddressLookupBuilder, AddressLookupBuilderError, EndpointData,
14 Error as AddressLookupError, Item as AddressLookupItem,
15 },
16 endpoint_info::EndpointInfo,
17};
18use iroh_base::{EndpointId, SecretKey};
19use iroh_dns::pkarr::{SignedPacket, SignedPacketVerifyError, Timestamp};
20use n0_future::{
21 boxed::BoxStream,
22 stream::StreamExt,
23 task::{self, AbortOnDropHandle},
24 time::{self, Duration},
25};
26use n0_mainline::{Dht, DhtBuilder, MutableItem};
27const DEFAULT_PKARR_TTL: u32 = 30;
29
30const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);
35
36const PUBLISH_DEBOUNCE_DELAY: Duration = Duration::from_millis(50);
42
43fn signed_packet_to_mutable_item(packet: &SignedPacket) -> MutableItem {
45 MutableItem::new_signed_unchecked(
46 *packet.public_key().as_bytes(),
47 packet.signature().to_bytes(),
48 packet.encoded_packet(),
49 packet.timestamp().as_micros() as i64,
50 None,
51 )
52}
53
54fn mutable_item_to_signed_packet(
56 item: &MutableItem,
57) -> Result<SignedPacket, SignedPacketVerifyError> {
58 SignedPacket::from_parts_unchecked(
59 item.key(),
60 item.signature(),
61 Timestamp::from_micros(item.seq() as u64),
62 item.value(),
63 )
64}
65
66#[derive(Debug, Clone)]
85pub struct DhtAddressLookup(Arc<Inner>);
86
87#[derive(derive_more::Debug)]
88struct Inner {
89 dht: Dht,
91 task: Mutex<Option<AbortOnDropHandle<()>>>,
95 secret_key: Option<SecretKey>,
99 ttl: u32,
101 republish_delay: Duration,
103 filter: AddrFilter,
105}
106
107impl Inner {
108 async fn resolve_dht(
109 &self,
110 public_key: EndpointId,
111 ) -> Option<Result<AddressLookupItem, AddressLookupError>> {
112 tracing::info!("resolving {} from DHT", public_key.to_z32());
113
114 let maybe_item = self
115 .dht
116 .get_mutable_most_recent(public_key.as_bytes(), None)
117 .await
118 .ok()
119 .flatten();
120 match maybe_item {
121 Some(item) => {
122 let signed_packet = match mutable_item_to_signed_packet(&item) {
123 Ok(packet) => packet,
124 Err(err) => {
125 tracing::debug!("failed to parse mutable item as signed packet: {err}");
126 return None;
127 }
128 };
129 match EndpointInfo::from_pkarr_signed_packet(&signed_packet) {
130 Ok(endpoint_info) => {
131 tracing::info!("discovered endpoint info {:?}", endpoint_info);
132 Some(Ok(AddressLookupItem::new(endpoint_info, "pkarr", None)))
133 }
134 Err(_err) => {
135 tracing::debug!("failed to parse signed packet as endpoint info");
136 None
137 }
138 }
139 }
140 None => {
141 tracing::debug!("no signed packet found");
142 None
143 }
144 }
145 }
146}
147
148#[derive(Debug)]
152pub struct Builder {
153 dht_builder: Option<DhtBuilder>,
154 secret_key: Option<SecretKey>,
155 ttl: Option<u32>,
156 republish_delay: Duration,
157 enable_publish: bool,
158 addr_filter: AddrFilter,
159}
160
161impl Default for Builder {
162 fn default() -> Self {
163 Self {
164 dht_builder: None,
165 secret_key: None,
166 ttl: None,
167 republish_delay: REPUBLISH_DELAY,
168 enable_publish: true,
169 addr_filter: AddrFilter::relay_only(),
170 }
171 }
172}
173
174impl Builder {
175 pub fn dht_builder(mut self, builder: DhtBuilder) -> Self {
177 self.dht_builder = Some(builder);
178 self
179 }
180
181 pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
185 self.secret_key = Some(secret_key);
186 self
187 }
188
189 pub fn ttl(mut self, ttl: u32) -> Self {
191 self.ttl = Some(ttl);
192 self
193 }
194
195 pub fn republish_delay(mut self, republish_delay: Duration) -> Self {
197 self.republish_delay = republish_delay;
198 self
199 }
200
201 pub fn no_publish(mut self) -> Self {
203 self.enable_publish = false;
204 self
205 }
206
207 pub fn addr_filter(mut self, filter: AddrFilter) -> Self {
218 self.addr_filter = filter;
219 self
220 }
221
222 pub fn build(self) -> Result<DhtAddressLookup, AddressLookupBuilderError> {
227 let dht_builder = self.dht_builder.unwrap_or_default();
228 let dht = dht_builder
229 .build()
230 .map_err(|e| AddressLookupBuilderError::from_err("pkarr-dht", e))?;
231 let ttl = self.ttl.unwrap_or(DEFAULT_PKARR_TTL);
232 let secret_key = self.secret_key.filter(|_| self.enable_publish);
233
234 Ok(DhtAddressLookup(Arc::new(Inner {
235 dht,
236 ttl,
237 secret_key,
238 republish_delay: self.republish_delay,
239 task: Default::default(),
240 filter: self.addr_filter,
241 })))
242 }
243}
244
245impl AddressLookupBuilder for Builder {
246 fn into_address_lookup(
247 self,
248 endpoint: &Endpoint,
249 ) -> Result<impl AddressLookup, AddressLookupBuilderError> {
250 self.secret_key(endpoint.secret_key().clone()).build()
251 }
252}
253
254impl DhtAddressLookup {
255 pub fn builder() -> Builder {
257 Builder::default()
258 }
259
260 async fn publish_loop(self, signed_packet: SignedPacket) {
266 let this = self;
267 let public_key = signed_packet.public_key();
268 let z32 = public_key.to_z32();
269 let item = signed_packet_to_mutable_item(&signed_packet);
270 let Ok(info) = this.0.dht.info().await else {
271 tracing::error!("failed to read dht info; stopping publish task");
272 return;
273 };
274 if info.routing_table_size() == 0 {
275 let Ok(bootstrapped) = this.0.dht.bootstrapped().await else {
276 tracing::error!("dht bootstrap probe failed; stopping publish task");
277 return;
278 };
279 if !bootstrapped {
280 tracing::warn!("dht bootstrap probe returned not ready");
281 }
282 } else {
283 time::sleep(PUBLISH_DEBOUNCE_DELAY).await;
292 }
293
294 loop {
295 let res = this.0.dht.put_mutable(item.clone(), None).await;
296 match res {
297 Ok(_) => {
298 tracing::debug!("pkarr publish success. published under {z32}");
299 }
300 Err(e) => {
301 tracing::warn!("pkarr publish error: {}", e);
308 }
309 }
310 time::sleep(this.0.republish_delay).await;
311 }
312 }
313}
314
315impl AddressLookup for DhtAddressLookup {
316 fn publish(&self, data: &EndpointData) {
317 let Some(keypair) = &self.0.secret_key else {
318 tracing::debug!("no keypair set, not publishing");
319 return;
320 };
321
322 let data = data.apply_filter(&self.0.filter).into_owned();
324
325 if !data.has_addrs() {
326 tracing::debug!("no relay url or direct addresses in endpoint data, not publishing");
327 return;
328 }
329
330 tracing::debug!("publishing {data:?}");
331 let info = EndpointInfo::from_parts(keypair.public(), data);
332 let Ok(signed_packet) = info.to_pkarr_signed_packet(keypair, self.0.ttl) else {
333 tracing::warn!("failed to create signed packet");
334 return;
335 };
336 let this = self.clone();
337 let curr = task::spawn(this.publish_loop(signed_packet));
338 let mut task = self.0.task.lock().expect("poisoned");
339 *task = Some(AbortOnDropHandle::new(curr));
340 }
341
342 fn resolve(
343 &self,
344 endpoint_id: EndpointId,
345 ) -> Option<BoxStream<Result<AddressLookupItem, AddressLookupError>>> {
346 let z32 = endpoint_id.to_z32();
347 tracing::info!("resolving {} as {}", endpoint_id, z32);
348 let address_lookup = self.0.clone();
349 let stream =
350 n0_future::stream::once_future(
351 async move { address_lookup.resolve_dht(endpoint_id).await },
352 )
353 .filter_map(|x| x)
354 .boxed();
355 Some(stream)
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use std::collections::BTreeSet;
362
363 use iroh_base::{RelayUrl, TransportAddr};
364 use n0_error::{Result, StdResultExt};
365 use n0_mainline::Testnet;
366 use n0_tracing_test::traced_test;
367 use url::Url;
368
369 use super::*;
370
371 #[tokio::test]
372 #[ignore = "flaky"]
373 #[traced_test]
374 async fn dht_address_lookup_smoke() -> Result {
375 let secret = SecretKey::generate();
376 let testnet = Testnet::new(3).await.anyerr()?;
377 let mut dht_builder = DhtBuilder::default();
378 dht_builder.bootstrap(&testnet.bootstrap);
379 let address_lookup = DhtAddressLookup::builder()
380 .secret_key(secret.clone())
381 .dht_builder(dht_builder)
382 .addr_filter(AddrFilter::unfiltered())
383 .build()?;
384
385 let relay_url: RelayUrl = Url::parse("https://example.com").anyerr()?.into();
386
387 let data = EndpointData::from_iter([TransportAddr::Relay(relay_url.clone())]);
388 address_lookup.publish(&data);
389
390 tokio::time::timeout(Duration::from_secs(30), async move {
392 loop {
393 tokio::time::sleep(Duration::from_millis(200)).await;
394 let mut found_relay_urls = BTreeSet::new();
395 let items = address_lookup
396 .resolve(secret.public())
397 .unwrap()
398 .collect::<Vec<_>>()
399 .await;
400 for item in items.into_iter().flatten() {
401 for url in item.relay_urls() {
402 found_relay_urls.insert(url.clone());
403 }
404 }
405 if found_relay_urls.contains(&relay_url) {
406 break;
407 }
408 }
409 })
410 .await
411 .expect("timeout, relay_url not found on DHT");
412 Ok(())
413 }
414}