1macro_rules! cross_debug {
4 ($($arg:tt)*) => {
5 #[cfg(target_arch = "wasm32")]
6 log::debug!($($arg)*);
7 #[cfg(not(target_arch = "wasm32"))]
8 tracing::debug!($($arg)*);
9 };
10}
11
12pub mod cache;
13
14#[cfg(not(wasm_browser))]
15pub mod blocking;
16pub mod builder;
17#[cfg(all(dht, relays))]
18mod futures;
19#[cfg(relays)]
20mod relays;
21
22#[cfg(all(test, not(wasm_browser)))]
23mod tests;
24#[cfg(all(test, wasm_browser))]
25mod tests_web;
26
27#[cfg(all(dht, relays))]
28use futures::{publish_both_networks, select_stream};
29use futures_lite::{Stream, StreamExt};
30use ntimestamp::Timestamp;
31use std::future::Future;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::{hash::Hash, num::NonZeroUsize};
35
36#[cfg(dht)]
37use crate::mainline::{self, errors::PutMutableError, Dht};
38
39use builder::{ClientBuilder, Config};
40
41#[cfg(relays)]
42use crate::client::relays::RelaysClient;
43use crate::{Cache, CacheKey, InMemoryCache};
44use crate::{PublicKey, SignedPacket};
45
46#[derive(Debug)]
47pub(crate) struct Inner {
48 minimum_ttl: u32,
49 maximum_ttl: u32,
50 cache: Option<Arc<dyn Cache>>,
51 #[cfg(dht)]
52 dht: Option<Dht>,
53 #[cfg(relays)]
54 relays: Option<RelaysClient>,
55 #[cfg(feature = "endpoints")]
56 pub(crate) max_recursion_depth: u8,
57}
58
59#[derive(Clone, Debug)]
62pub struct Client(pub(crate) Arc<Inner>);
63
64impl Client {
65 pub(crate) fn new(config: Config) -> Result<Client, BuildError> {
66 cross_debug!("Starting Pkarr Client {:?}", config);
67
68 #[cfg(dht)]
69 let dht = if let Some(ref builder) = config.dht {
70 Some(builder.build().map_err(BuildError::DhtBuildError)?)
71 } else {
72 None
73 };
74 #[cfg(not(dht))]
75 let dht: Option<()> = None;
76
77 Self::new_with_dht(config, dht)
78 }
79
80 #[cfg(dht)]
81 pub(crate) async fn new_async(config: Config) -> Result<Client, BuildError> {
82 cross_debug!("Starting Pkarr Client {:?}", config);
83
84 let dht = if let Some(b) = &config.dht {
85 Some(
86 b.build_async()
87 .await
88 .map_err(BuildError::DhtBuildError)?
89 .as_sync()
90 .clone(),
91 )
92 } else {
93 None
94 };
95
96 Self::new_with_dht(config, dht)
97 }
98
99 fn new_with_dht(
100 config: Config,
101 #[cfg(dht)] dht: Option<Dht>, #[cfg(not(dht))] dht: Option<()>,
103 ) -> Result<Client, BuildError> {
104 let cache = if config.cache_size == 0 {
105 None
106 } else {
107 let cache = config.cache.clone();
108
109 if let Some(cache) = cache {
110 if cache.capacity() == 0 {
111 None
112 } else {
113 Some(cache)
114 }
115 } else {
116 Some(
117 cache.unwrap_or(Arc::new(InMemoryCache::new(
118 NonZeroUsize::new(config.cache_size)
119 .expect("if cache size is zero cache should be disabled."),
120 ))),
121 )
122 }
123 };
124
125 #[cfg(relays)]
126 let relays = if let Some(ref relays) = config.relays {
127 if relays.is_empty() {
128 return Err(BuildError::EmptyListOfRelays);
129 }
130
131 let relays_client =
132 RelaysClient::new(relays.clone().into_boxed_slice(), config.request_timeout);
133
134 Some(relays_client)
135 } else {
136 None
137 };
138 #[cfg(not(relays))]
139 let relays: Option<()> = None;
140
141 if dht.is_none() && relays.is_none() {
142 return Err(BuildError::NoNetwork);
143 }
144
145 let client = Client(Arc::new(Inner {
146 minimum_ttl: config.minimum_ttl,
147 maximum_ttl: config.maximum_ttl,
148 cache,
149 #[cfg(dht)]
150 dht,
151 #[cfg(relays)]
152 relays,
153 #[cfg(feature = "endpoints")]
154 max_recursion_depth: config.max_recursion_depth,
155 }));
156
157 Ok(client)
158 }
159
160 pub fn builder() -> ClientBuilder {
165 ClientBuilder::default()
166 }
167
168 pub fn cache(&self) -> Option<&dyn Cache> {
172 self.0.cache.as_deref()
173 }
174
175 #[cfg(dht)]
181 pub fn dht(&self) -> Option<mainline::Dht> {
182 self.0.dht.as_ref().cloned()
183 }
184
185 pub async fn publish(
267 &self,
268 signed_packet: &SignedPacket,
269 cas: Option<Timestamp>,
270 ) -> Result<(), PublishError> {
271 async_compat_if_necessary(self.publish_inner(signed_packet, cas)).await
272 }
273
274 pub async fn resolve(&self, public_key: &PublicKey) -> Option<SignedPacket> {
283 async_compat_if_necessary(self.resolve_inner(public_key)).await
284 }
285
286 pub async fn resolve_most_recent(&self, public_key: &PublicKey) -> Option<SignedPacket> {
294 async_compat_if_necessary(async move {
295 let cache_key: CacheKey = public_key.as_ref().into();
296
297 let cache = self.0.cache.clone().unwrap_or(Arc::new(InMemoryCache::new(
298 1.try_into().expect("infallible"),
299 )));
300
301 let mut stream = self.resolve_stream(
302 public_key.clone(),
303 Some(cache.clone()),
304 cache_key,
305 cache.get(&cache_key).map(|s| s.timestamp()),
306 );
307 while stream.next().await.is_some() {}
308
309 cache.get(&public_key.into())
310 })
311 .await
312 }
313
314 async fn publish_inner(
317 &self,
318 signed_packet: &SignedPacket,
319 cas: Option<Timestamp>,
320 ) -> Result<(), PublishError> {
321 let cache_key: CacheKey = signed_packet.public_key().into();
322
323 if let Some(cached) = self
325 .cache()
326 .as_ref()
327 .and_then(|cache| cache.get(&cache_key))
328 {
329 if cached.more_recent_than(signed_packet) {
330 return Err(ConcurrencyError::NotMostRecent)?;
331 } else if let Some(cas) = cas {
332 if cached.timestamp() != cas {
333 return Err(ConcurrencyError::CasFailed)?;
334 }
335 }
336 }
337
338 if let Some(cache) = self.cache() {
339 cache.put(&cache_key, signed_packet);
340 }
341
342 self.select_publish_future(signed_packet, cas).await
343 }
344
345 async fn select_publish_future(
347 &self,
348 signed_packet: &SignedPacket,
349 cas: Option<Timestamp>,
350 ) -> Result<(), PublishError> {
351 #[cfg(dht)]
353 let dht_future = {
354 let signed_packet = signed_packet.clone();
355 self.dht().map(|node| async move {
356 node.as_async()
357 .put_mutable((&signed_packet).into(), cas.map(|t| t.as_u64() as i64))
358 .await
359 .map(|_| Ok(()))?
360 })
361 };
362
363 #[cfg(relays)]
364 let relays_future = {
365 let signed_packet = signed_packet.clone();
366 self.0
367 .relays
368 .clone()
369 .map(|relays| async move { relays.publish(&signed_packet, cas).await })
370 };
371
372 #[cfg(all(dht, not(relays)))]
373 return dht_future.expect("infallible").await;
374
375 #[cfg(all(relays, not(dht)))]
376 return relays_future.expect("infallible").await;
377
378 #[cfg(all(dht, relays))]
379 return if let Some(dht_future) = dht_future {
380 if let Some(relays_future) = relays_future {
381 let result = publish_both_networks(dht_future, relays_future).await;
382
383 self.0
384 .relays
385 .as_ref()
386 .expect("infallible")
387 .cancel_publish(&signed_packet.public_key());
388
389 result
390 } else {
391 dht_future.await
392 }
393 } else {
394 relays_future.expect("infallible").await
395 };
396 }
397
398 pub(crate) async fn resolve_inner(&self, public_key: &PublicKey) -> Option<SignedPacket> {
399 let public_key = public_key.clone();
400
401 let cache_key: CacheKey = public_key.as_ref().into();
402
403 let cached_packet = self
404 .cache()
405 .as_ref()
406 .and_then(|cache| cache.get(&cache_key));
407
408 let mut stream = self.resolve_stream(
410 public_key.clone(),
411 self.0.cache.clone(),
412 cache_key,
413 cached_packet.as_ref().map(|s| s.timestamp()),
414 );
415
416 if let Some(cached_packet) = cached_packet {
417 if cached_packet.is_expired(self.0.minimum_ttl, self.0.maximum_ttl) {
418 #[cfg(not(wasm_browser))]
419 tokio::spawn(async move { while stream.next().await.is_some() {} });
420 #[cfg(wasm_browser)]
421 wasm_bindgen_futures::spawn_local(
422 async move { while stream.next().await.is_some() {} },
423 );
424 }
425
426 cross_debug!(
427 "responding with cached packet even if expired. public_key: {}",
428 &public_key
429 );
430
431 self.cache().expect("infallible").get(&cache_key)
432 } else {
433 let first = stream.next().await;
435
436 if let Some(cache) = self.cache() {
437 cache.get(&cache_key)
438 } else {
439 first
440 }
441 }
442 }
443
444 #[cfg(wasm_browser)]
445 fn resolve_stream(
446 &self,
447 public_key: PublicKey,
448 cache: Option<Arc<dyn Cache>>,
449 cache_key: CacheKey,
450 more_recent_than: Option<Timestamp>,
451 ) -> Pin<Box<dyn Stream<Item = SignedPacket>>> {
452 let stream = self
453 .0
454 .relays
455 .as_ref()
456 .expect("infallible")
457 .resolve_futures(&public_key, more_recent_than)
458 .filter_map(|opt| opt)
459 .filter_map(move |signed_packet| {
460 filter_incoming_signed_packet(&public_key, cache.clone(), &cache_key, signed_packet)
461 });
462
463 Box::pin(stream)
464 }
465
466 #[cfg(not(wasm_browser))]
467 fn resolve_stream(
469 &self,
470 public_key: PublicKey,
471 cache: Option<Arc<dyn Cache>>,
472 cache_key: CacheKey,
473 more_recent_than: Option<Timestamp>,
474 ) -> Pin<Box<dyn Stream<Item = SignedPacket> + Send>> {
475 self.merged_resolve_stream(&public_key, more_recent_than)
476 .filter_map(move |signed_packet| {
477 filter_incoming_signed_packet(&public_key, cache.clone(), &cache_key, signed_packet)
478 })
479 .boxed()
480 }
481
482 #[cfg(not(wasm_browser))]
483 fn merged_resolve_stream(
485 &self,
486 public_key: &PublicKey,
487 more_recent_than: Option<Timestamp>,
488 ) -> Pin<Box<dyn Stream<Item = SignedPacket> + Send>> {
489 #[cfg(dht)]
490 let dht_stream = match self.dht() {
491 Some(node) => map_dht_stream(node.as_async().get_mutable(
492 public_key.as_bytes(),
493 None,
494 more_recent_than.map(|t| t.as_u64() as i64),
495 )),
496 None => None,
497 };
498
499 #[cfg(relays)]
500 let relays_stream = self
501 .0
502 .relays
503 .as_ref()
504 .map(|relays| relays.resolve(public_key, more_recent_than));
505
506 #[cfg(all(dht, not(relays)))]
507 return dht_stream.expect("infallible");
508
509 #[cfg(all(relays, not(dht)))]
510 return relays_stream.expect("infallible");
511
512 #[cfg(all(dht, relays))]
513 match (dht_stream, relays_stream) {
514 (Some(s), None) | (None, Some(s)) => s,
515 (Some(a), Some(b)) => Box::pin(select_stream(a, b)),
516 (None, None) => unreachable!("should not create a client with no network"),
517 }
518 }
519}
520
521fn filter_incoming_signed_packet(
522 public_key: &PublicKey,
523 cache: Option<Arc<dyn Cache>>,
524 cache_key: &CacheKey,
525 signed_packet: SignedPacket,
526) -> Option<SignedPacket> {
527 let new_packet: Option<SignedPacket> = if let Some(cached) = cache
528 .clone()
529 .and_then(|cache| cache.clone().get_read_only(cache_key))
530 {
531 if signed_packet.more_recent_than(&cached) {
532 cross_debug!("Received more recent packet than in cache. public_key: {public_key}",);
533
534 Some(signed_packet)
535 } else {
536 None
537 }
538 } else {
539 cross_debug!("Received new packet after cache miss. public_key: {public_key}");
540
541 Some(signed_packet)
542 };
543
544 if let Some(packet) = new_packet {
545 if let Some(cache) = &cache {
546 cache.put(cache_key, &packet)
547 };
548
549 Some(packet)
550 } else {
551 None
552 }
553}
554
555#[cfg(dht)]
556fn map_dht_stream(
557 stream: mainline::async_dht::GetStream<mainline::MutableItem>,
558) -> Option<Pin<Box<dyn Stream<Item = SignedPacket> + Send>>> {
559 Some(
560 stream
561 .filter_map(
562 move |mutable_item| match SignedPacket::try_from(mutable_item) {
563 Ok(signed_packet) => Some(signed_packet),
564 Err(error) => {
565 cross_debug!("Got an invalid signed packet from the DHT. Error: {error}");
566 None
567 }
568 },
569 )
570 .boxed(),
571 )
572}
573
574#[derive(thiserror::Error, Debug)]
575pub enum BuildError {
577 #[error("Client configured without Mainline node or relays.")]
578 NoNetwork,
580
581 #[error("Failed to build the Dht client {0}")]
582 DhtBuildError(std::io::Error),
584
585 #[error("Passed an empty list of relays")]
586 EmptyListOfRelays,
588}
589
590#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Hash)]
591pub enum PublishError {
593 #[error(transparent)]
594 Query(#[from] QueryError),
596
597 #[error(transparent)]
598 Concurrency(#[from] ConcurrencyError),
602
603 #[error("All relays responded with unexpected responses, check debug logs.")]
605 UnexpectedResponses,
607}
608
609#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Hash)]
610pub enum QueryError {
612 #[error("Publish query timed out with no responses neither success or errors.")]
614 Timeout,
615
616 #[error("Publishing SignedPacket to Mainline failed.")]
617 NoClosestNodes,
619
620 #[error("Publishing SignedPacket to Mainline failed code: {0}, description: {1}.")]
621 DhtErrorResponse(i32, String),
623
624 #[error("Most relays responded with bad request")]
625 BadRequest,
627}
628
629#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Hash)]
630pub enum ConcurrencyError {
632 #[error("A different SignedPacket is being concurrently published for the same PublicKey.")]
633 ConflictRisk,
637
638 #[error("Found a more recent SignedPacket in the client's cache")]
639 NotMostRecent,
643
644 #[error("Compare and swap failed; there is a more recent SignedPacket than the one seen before publishing")]
645 CasFailed,
649}
650
651#[cfg(dht)]
652impl From<PutMutableError> for PublishError {
653 fn from(value: PutMutableError) -> Self {
654 match value {
655 PutMutableError::Query(error) => PublishError::Query(match error {
656 mainline::errors::PutQueryError::Timeout => QueryError::Timeout,
657 mainline::errors::PutQueryError::NoClosestNodes => QueryError::NoClosestNodes,
658 mainline::errors::PutQueryError::ErrorResponse(error) => {
659 QueryError::DhtErrorResponse(error.code, error.description)
660 }
661 }),
662 PutMutableError::Concurrency(error) => PublishError::Concurrency(match error {
663 mainline::errors::ConcurrencyError::ConflictRisk => ConcurrencyError::ConflictRisk,
664 mainline::errors::ConcurrencyError::NotMostRecent => {
665 ConcurrencyError::NotMostRecent
666 }
667 mainline::errors::ConcurrencyError::CasFailed => ConcurrencyError::CasFailed,
668 }),
669 }
670 }
671}
672
673async fn async_compat_if_necessary<T, O>(fut: T) -> O
674where
675 T: Future<Output = O>,
676{
677 #[cfg(not(wasm_browser))]
678 {
679 if tokio::runtime::Handle::try_current().is_err() {
680 return async_compat::Compat::new(fut).await;
681 }
682 }
683
684 fut.await
685}