Skip to main content

pkarr_client/
client.rs

1//! Pkarr client for publishing and resolving [SignedPacket]s over [mainline] and/or [Relays](https://pkarr.org/relays).
2
3macro_rules! cross_debug {
4    ($($arg:tt)*) => {
5        #[cfg(target_arch = "wasm32")]
6        log::debug!($($arg)*);
7        #[cfg(not(target_arch = "wasm32"))]
8        tracing::debug!($($arg)*);
9    };
10}
11
12pub mod cache;
13
14#[cfg(not(wasm_browser))]
15pub mod blocking;
16pub mod builder;
17#[cfg(all(dht, relays))]
18mod futures;
19#[cfg(relays)]
20mod relays;
21
22#[cfg(all(test, not(wasm_browser)))]
23mod tests;
24#[cfg(all(test, wasm_browser))]
25mod tests_web;
26
27#[cfg(all(dht, relays))]
28use futures::{publish_both_networks, select_stream};
29use futures_lite::{Stream, StreamExt};
30use ntimestamp::Timestamp;
31use std::future::Future;
32use std::pin::Pin;
33use std::sync::Arc;
34use std::{hash::Hash, num::NonZeroUsize};
35
36#[cfg(dht)]
37use crate::mainline::{self, errors::PutMutableError, Dht};
38
39use builder::{ClientBuilder, Config};
40
41#[cfg(relays)]
42use crate::client::relays::RelaysClient;
43use crate::{Cache, CacheKey, InMemoryCache};
44use crate::{PublicKey, SignedPacket};
45
46#[derive(Debug)]
47pub(crate) struct Inner {
48    minimum_ttl: u32,
49    maximum_ttl: u32,
50    cache: Option<Arc<dyn Cache>>,
51    #[cfg(dht)]
52    dht: Option<Dht>,
53    #[cfg(relays)]
54    relays: Option<RelaysClient>,
55    #[cfg(feature = "endpoints")]
56    pub(crate) max_recursion_depth: u8,
57}
58
59/// Pkarr client for publishing and resolving [SignedPacket]s over
60/// [mainline] Dht and/or [Relays](https://pkarr.org/relays).
61#[derive(Clone, Debug)]
62pub struct Client(pub(crate) Arc<Inner>);
63
64impl Client {
65    pub(crate) fn new(config: Config) -> Result<Client, BuildError> {
66        cross_debug!("Starting Pkarr Client {:?}", config);
67
68        #[cfg(dht)]
69        let dht = if let Some(ref builder) = config.dht {
70            Some(builder.build().map_err(BuildError::DhtBuildError)?)
71        } else {
72            None
73        };
74        #[cfg(not(dht))]
75        let dht: Option<()> = None;
76
77        Self::new_with_dht(config, dht)
78    }
79
80    #[cfg(dht)]
81    pub(crate) async fn new_async(config: Config) -> Result<Client, BuildError> {
82        cross_debug!("Starting Pkarr Client {:?}", config);
83
84        let dht = if let Some(b) = &config.dht {
85            Some(
86                b.build_async()
87                    .await
88                    .map_err(BuildError::DhtBuildError)?
89                    .as_sync()
90                    .clone(),
91            )
92        } else {
93            None
94        };
95
96        Self::new_with_dht(config, dht)
97    }
98
99    fn new_with_dht(
100        config: Config,
101        #[cfg(dht)] dht: Option<Dht>, // replace with actual type
102        #[cfg(not(dht))] dht: Option<()>,
103    ) -> Result<Client, BuildError> {
104        let cache = if config.cache_size == 0 {
105            None
106        } else {
107            let cache = config.cache.clone();
108
109            if let Some(cache) = cache {
110                if cache.capacity() == 0 {
111                    None
112                } else {
113                    Some(cache)
114                }
115            } else {
116                Some(
117                    cache.unwrap_or(Arc::new(InMemoryCache::new(
118                        NonZeroUsize::new(config.cache_size)
119                            .expect("if cache size is zero cache should be disabled."),
120                    ))),
121                )
122            }
123        };
124
125        #[cfg(relays)]
126        let relays = if let Some(ref relays) = config.relays {
127            if relays.is_empty() {
128                return Err(BuildError::EmptyListOfRelays);
129            }
130
131            let relays_client =
132                RelaysClient::new(relays.clone().into_boxed_slice(), config.request_timeout);
133
134            Some(relays_client)
135        } else {
136            None
137        };
138        #[cfg(not(relays))]
139        let relays: Option<()> = None;
140
141        if dht.is_none() && relays.is_none() {
142            return Err(BuildError::NoNetwork);
143        }
144
145        let client = Client(Arc::new(Inner {
146            minimum_ttl: config.minimum_ttl,
147            maximum_ttl: config.maximum_ttl,
148            cache,
149            #[cfg(dht)]
150            dht,
151            #[cfg(relays)]
152            relays,
153            #[cfg(feature = "endpoints")]
154            max_recursion_depth: config.max_recursion_depth,
155        }));
156
157        Ok(client)
158    }
159
160    /// Returns a builder to edit config before creating Client.
161    ///
162    /// You can use [ClientBuilder::no_default_network] to start from a clean slate and
163    /// decide which networks to use.
164    pub fn builder() -> ClientBuilder {
165        ClientBuilder::default()
166    }
167
168    // === Getters ===
169
170    /// Returns a reference to the internal cache.
171    pub fn cache(&self) -> Option<&dyn Cache> {
172        self.0.cache.as_deref()
173    }
174
175    /// Returns a reference to the internal [mainline::Dht] node.
176    ///
177    /// Gives you access to methods like [mainline::Dht::info],
178    /// [mainline::Dht::bootstrapped], and [mainline::Dht::to_bootstrap]
179    /// among the rest of the API.
180    #[cfg(dht)]
181    pub fn dht(&self) -> Option<mainline::Dht> {
182        self.0.dht.as_ref().cloned()
183    }
184
185    // === Publish ===
186
187    /// Publishes a [SignedPacket] to the [mainline] Dht and or [Relays](https://pkarr.org/relays).
188    ///
189    /// # Lost Update Problem
190    ///
191    /// Mainline DHT and remote relays form a distributed network, and like all distributed networks,
192    /// it is vulnerable to [Write–write conflict](https://en.wikipedia.org/wiki/Write-write_conflict).
193    ///
194    /// ## Read first
195    ///
196    /// To mitigate the risk of lost updates, you should call the [Self::resolve_most_recent] method
197    /// then start authoring the new [SignedPacket] based on the most recent as in the following example:
198    ///
199    ///```rust
200    /// use pkarr::{Client, SignedPacket, Keypair};
201    /// // For local testing
202    /// use pkarr::mainline::Testnet;
203    ///
204    /// #[tokio::main]
205    /// async fn run() -> anyhow::Result<()> {
206    ///     let testnet = Testnet::new_async(3).await?;
207    ///     let client = Client::builder()
208    ///         // Disable the default network settings (builtin relays and mainline bootstrap nodes).
209    ///         .no_default_network()
210    ///         .bootstrap(&testnet.bootstrap)
211    ///         .build()?;
212    ///
213    ///     let keypair = Keypair::random();
214    ///
215    ///     let (signed_packet, cas) = if let Some(most_recent) = client
216    ///         .resolve_most_recent(&keypair.public_key()).await
217    ///     {
218    ///
219    ///         let mut builder = SignedPacket::builder();
220    ///
221    ///         // 1. Optionally inherit all or some of the existing records.
222    ///         for record in most_recent.all_resource_records() {
223    ///             let name = record.name.to_string();
224    ///
225    ///             if name != "foo" && name != "sercert" {
226    ///                 builder = builder.record(record.clone());
227    ///             }
228    ///         };
229    ///
230    ///         // 2. Optionally add more new records.
231    ///         let signed_packet = builder
232    ///             .txt("foo".try_into()?, "bar".try_into()?, 30)
233    ///             .a("secret".try_into()?, 42.into(), 30)
234    ///             .sign(&keypair)?;
235    ///
236    ///         (
237    ///             signed_packet,
238    ///             // 3. Use the most recent [SignedPacket::timestamp] as a `CAS`.
239    ///             Some(most_recent.timestamp())
240    ///         )
241    ///     } else {
242    ///         (
243    ///             SignedPacket::builder()
244    ///                 .txt("foo".try_into()?, "bar".try_into()?, 30)
245    ///                 .a("secret".try_into()?, 42.into(), 30)
246    ///                 .sign(&keypair)?,
247    ///             None
248    ///         )
249    ///     };
250    ///
251    ///     client.publish(&signed_packet, cas).await?;
252    ///
253    ///     Ok(())
254    /// }
255    /// ```
256    ///
257    /// ## Errors
258    ///
259    /// This method may return on of these errors:
260    ///
261    /// 1. [QueryError]: when the query fails, and you need to retry or debug the network.
262    /// 2. [ConcurrencyError]: when an write conflict (or the risk of it) is detedcted.
263    ///
264    /// If you get a [ConcurrencyError]; you should resolver the most recent packet again,
265    /// and repeat the steps in the previous example.
266    pub async fn publish(
267        &self,
268        signed_packet: &SignedPacket,
269        cas: Option<Timestamp>,
270    ) -> Result<(), PublishError> {
271        async_compat_if_necessary(self.publish_inner(signed_packet, cas)).await
272    }
273
274    // === Resolve ===
275
276    /// Returns a [SignedPacket] from the cache even if it is expired.
277    /// If there is no packet in the cache, or if the cached packet is expired,
278    /// it will make a DHT query in a background query and caches any more recent packets it receives.
279    ///
280    /// If you want to get the most recent version of a [SignedPacket],
281    /// you should use [Self::resolve_most_recent].
282    pub async fn resolve(&self, public_key: &PublicKey) -> Option<SignedPacket> {
283        async_compat_if_necessary(self.resolve_inner(public_key)).await
284    }
285
286    /// Returns the most recent [SignedPacket] found after querying all
287    /// [mainline] Dht nodes and or [Relays](https:://pkarr.org/relays).
288    ///
289    /// Useful if you want to read the most recent packet before publishing
290    /// a new packet.
291    ///
292    /// This is a best effort, and doesn't guarantee consistency.
293    pub async fn resolve_most_recent(&self, public_key: &PublicKey) -> Option<SignedPacket> {
294        async_compat_if_necessary(async move {
295            let cache_key: CacheKey = public_key.as_ref().into();
296
297            let cache = self.0.cache.clone().unwrap_or(Arc::new(InMemoryCache::new(
298                1.try_into().expect("infallible"),
299            )));
300
301            let mut stream = self.resolve_stream(
302                public_key.clone(),
303                Some(cache.clone()),
304                cache_key,
305                cache.get(&cache_key).map(|s| s.timestamp()),
306            );
307            while stream.next().await.is_some() {}
308
309            cache.get(&public_key.into())
310        })
311        .await
312    }
313
314    // === Private Methods ===
315
316    async fn publish_inner(
317        &self,
318        signed_packet: &SignedPacket,
319        cas: Option<Timestamp>,
320    ) -> Result<(), PublishError> {
321        let cache_key: CacheKey = signed_packet.public_key().into();
322
323        // Check conflict
324        if let Some(cached) = self
325            .cache()
326            .as_ref()
327            .and_then(|cache| cache.get(&cache_key))
328        {
329            if cached.more_recent_than(signed_packet) {
330                return Err(ConcurrencyError::NotMostRecent)?;
331            } else if let Some(cas) = cas {
332                if cached.timestamp() != cas {
333                    return Err(ConcurrencyError::CasFailed)?;
334                }
335            }
336        }
337
338        if let Some(cache) = self.cache() {
339            cache.put(&cache_key, signed_packet);
340        }
341
342        self.select_publish_future(signed_packet, cas).await
343    }
344
345    /// Returns the first result from either the DHT or the Relays client or both.
346    async fn select_publish_future(
347        &self,
348        signed_packet: &SignedPacket,
349        cas: Option<Timestamp>,
350    ) -> Result<(), PublishError> {
351        // Handle DHT and Relay futures based on feature flags and target family
352        #[cfg(dht)]
353        let dht_future = {
354            let signed_packet = signed_packet.clone();
355            self.dht().map(|node| async move {
356                node.as_async()
357                    .put_mutable((&signed_packet).into(), cas.map(|t| t.as_u64() as i64))
358                    .await
359                    .map(|_| Ok(()))?
360            })
361        };
362
363        #[cfg(relays)]
364        let relays_future = {
365            let signed_packet = signed_packet.clone();
366            self.0
367                .relays
368                .clone()
369                .map(|relays| async move { relays.publish(&signed_packet, cas).await })
370        };
371
372        #[cfg(all(dht, not(relays)))]
373        return dht_future.expect("infallible").await;
374
375        #[cfg(all(relays, not(dht)))]
376        return relays_future.expect("infallible").await;
377
378        #[cfg(all(dht, relays))]
379        return if let Some(dht_future) = dht_future {
380            if let Some(relays_future) = relays_future {
381                let result = publish_both_networks(dht_future, relays_future).await;
382
383                self.0
384                    .relays
385                    .as_ref()
386                    .expect("infallible")
387                    .cancel_publish(&signed_packet.public_key());
388
389                result
390            } else {
391                dht_future.await
392            }
393        } else {
394            relays_future.expect("infallible").await
395        };
396    }
397
398    pub(crate) async fn resolve_inner(&self, public_key: &PublicKey) -> Option<SignedPacket> {
399        let public_key = public_key.clone();
400
401        let cache_key: CacheKey = public_key.as_ref().into();
402
403        let cached_packet = self
404            .cache()
405            .as_ref()
406            .and_then(|cache| cache.get(&cache_key));
407
408        // Stream is a future, so it won't run until we await or spawn it.
409        let mut stream = self.resolve_stream(
410            public_key.clone(),
411            self.0.cache.clone(),
412            cache_key,
413            cached_packet.as_ref().map(|s| s.timestamp()),
414        );
415
416        if let Some(cached_packet) = cached_packet {
417            if cached_packet.is_expired(self.0.minimum_ttl, self.0.maximum_ttl) {
418                #[cfg(not(wasm_browser))]
419                tokio::spawn(async move { while stream.next().await.is_some() {} });
420                #[cfg(wasm_browser)]
421                wasm_bindgen_futures::spawn_local(
422                    async move { while stream.next().await.is_some() {} },
423                );
424            }
425
426            cross_debug!(
427                "responding with cached packet even if expired. public_key: {}",
428                &public_key
429            );
430
431            self.cache().expect("infallible").get(&cache_key)
432        } else {
433            // Wait for the earliest positive response.
434            let first = stream.next().await;
435
436            if let Some(cache) = self.cache() {
437                cache.get(&cache_key)
438            } else {
439                first
440            }
441        }
442    }
443
444    #[cfg(wasm_browser)]
445    fn resolve_stream(
446        &self,
447        public_key: PublicKey,
448        cache: Option<Arc<dyn Cache>>,
449        cache_key: CacheKey,
450        more_recent_than: Option<Timestamp>,
451    ) -> Pin<Box<dyn Stream<Item = SignedPacket>>> {
452        let stream = self
453            .0
454            .relays
455            .as_ref()
456            .expect("infallible")
457            .resolve_futures(&public_key, more_recent_than)
458            .filter_map(|opt| opt)
459            .filter_map(move |signed_packet| {
460                filter_incoming_signed_packet(&public_key, cache.clone(), &cache_key, signed_packet)
461            });
462
463        Box::pin(stream)
464    }
465
466    #[cfg(not(wasm_browser))]
467    /// Returns a [Stream] of incoming [SignedPacket]s.
468    fn resolve_stream(
469        &self,
470        public_key: PublicKey,
471        cache: Option<Arc<dyn Cache>>,
472        cache_key: CacheKey,
473        more_recent_than: Option<Timestamp>,
474    ) -> Pin<Box<dyn Stream<Item = SignedPacket> + Send>> {
475        self.merged_resolve_stream(&public_key, more_recent_than)
476            .filter_map(move |signed_packet| {
477                filter_incoming_signed_packet(&public_key, cache.clone(), &cache_key, signed_packet)
478            })
479            .boxed()
480    }
481
482    #[cfg(not(wasm_browser))]
483    /// Returns a Stream from both the DHT and Relays client.
484    fn merged_resolve_stream(
485        &self,
486        public_key: &PublicKey,
487        more_recent_than: Option<Timestamp>,
488    ) -> Pin<Box<dyn Stream<Item = SignedPacket> + Send>> {
489        #[cfg(dht)]
490        let dht_stream = match self.dht() {
491            Some(node) => map_dht_stream(node.as_async().get_mutable(
492                public_key.as_bytes(),
493                None,
494                more_recent_than.map(|t| t.as_u64() as i64),
495            )),
496            None => None,
497        };
498
499        #[cfg(relays)]
500        let relays_stream = self
501            .0
502            .relays
503            .as_ref()
504            .map(|relays| relays.resolve(public_key, more_recent_than));
505
506        #[cfg(all(dht, not(relays)))]
507        return dht_stream.expect("infallible");
508
509        #[cfg(all(relays, not(dht)))]
510        return relays_stream.expect("infallible");
511
512        #[cfg(all(dht, relays))]
513        match (dht_stream, relays_stream) {
514            (Some(s), None) | (None, Some(s)) => s,
515            (Some(a), Some(b)) => Box::pin(select_stream(a, b)),
516            (None, None) => unreachable!("should not create a client with no network"),
517        }
518    }
519}
520
521fn filter_incoming_signed_packet(
522    public_key: &PublicKey,
523    cache: Option<Arc<dyn Cache>>,
524    cache_key: &CacheKey,
525    signed_packet: SignedPacket,
526) -> Option<SignedPacket> {
527    let new_packet: Option<SignedPacket> = if let Some(cached) = cache
528        .clone()
529        .and_then(|cache| cache.clone().get_read_only(cache_key))
530    {
531        if signed_packet.more_recent_than(&cached) {
532            cross_debug!("Received more recent packet than in cache. public_key: {public_key}",);
533
534            Some(signed_packet)
535        } else {
536            None
537        }
538    } else {
539        cross_debug!("Received new packet after cache miss. public_key: {public_key}");
540
541        Some(signed_packet)
542    };
543
544    if let Some(packet) = new_packet {
545        if let Some(cache) = &cache {
546            cache.put(cache_key, &packet)
547        };
548
549        Some(packet)
550    } else {
551        None
552    }
553}
554
555#[cfg(dht)]
556fn map_dht_stream(
557    stream: mainline::async_dht::GetStream<mainline::MutableItem>,
558) -> Option<Pin<Box<dyn Stream<Item = SignedPacket> + Send>>> {
559    Some(
560        stream
561            .filter_map(
562                move |mutable_item| match SignedPacket::try_from(mutable_item) {
563                    Ok(signed_packet) => Some(signed_packet),
564                    Err(error) => {
565                        cross_debug!("Got an invalid signed packet from the DHT. Error: {error}");
566                        None
567                    }
568                },
569            )
570            .boxed(),
571    )
572}
573
574#[derive(thiserror::Error, Debug)]
575/// Errors occurring during building a [Client]
576pub enum BuildError {
577    #[error("Client configured without Mainline node or relays.")]
578    /// Client configured without Mainline node or relays.
579    NoNetwork,
580
581    #[error("Failed to build the Dht client {0}")]
582    /// Failed to build the Dht client.
583    DhtBuildError(std::io::Error),
584
585    #[error("Passed an empty list of relays")]
586    /// Passed an empty list of relays
587    EmptyListOfRelays,
588}
589
590#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Hash)]
591/// Errors occurring during publishing a [SignedPacket]
592pub enum PublishError {
593    #[error(transparent)]
594    /// Errors that requires either a retry or debugging the network condition.
595    Query(#[from] QueryError),
596
597    #[error(transparent)]
598    /// A different [SignedPacket] is being concurrently published for the same [PublicKey].
599    ///
600    /// This risks a lost update, you should resolve most recent [SignedPacket] before publishing again.
601    Concurrency(#[from] ConcurrencyError),
602
603    // === Relays only errors ===
604    #[error("All relays responded with unexpected responses, check debug logs.")]
605    /// All relays responded with unexpected responses, check debug logs.
606    UnexpectedResponses,
607}
608
609#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Hash)]
610/// Errors that requires either a retry or debugging the network condition.
611pub enum QueryError {
612    /// Publish query timed out with no responses neither success or errors, from Dht or relays.
613    #[error("Publish query timed out with no responses neither success or errors.")]
614    Timeout,
615
616    #[error("Publishing SignedPacket to Mainline failed.")]
617    /// Publishing SignedPacket to Mainline failed.
618    NoClosestNodes,
619
620    #[error("Publishing SignedPacket to Mainline failed code: {0}, description: {1}.")]
621    /// Publishing SignedPacket to Mainline failed, received an error response.
622    DhtErrorResponse(i32, String),
623
624    #[error("Most relays responded with bad request")]
625    /// Most relays responded with bad request
626    BadRequest,
627}
628
629#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq, Hash)]
630/// Errors that requires resolving most recent [SignedPacket] before publishing.
631pub enum ConcurrencyError {
632    #[error("A different SignedPacket is being concurrently published for the same PublicKey.")]
633    /// A different [SignedPacket] is being concurrently published for the same [PublicKey].
634    ///
635    /// This risks a lost update, you should resolve most recent [SignedPacket] before publishing again.
636    ConflictRisk,
637
638    #[error("Found a more recent SignedPacket in the client's cache")]
639    /// Found a more recent SignedPacket in the client's cache
640    ///
641    /// This risks a lost update, you should resolve most recent [SignedPacket] before publishing again.
642    NotMostRecent,
643
644    #[error("Compare and swap failed; there is a more recent SignedPacket than the one seen before publishing")]
645    /// Compare and swap failed; there is a more recent SignedPacket than the one seen before publishing
646    ///
647    /// This risks a lost update, you should resolve most recent [SignedPacket] before publishing again.
648    CasFailed,
649}
650
651#[cfg(dht)]
652impl From<PutMutableError> for PublishError {
653    fn from(value: PutMutableError) -> Self {
654        match value {
655            PutMutableError::Query(error) => PublishError::Query(match error {
656                mainline::errors::PutQueryError::Timeout => QueryError::Timeout,
657                mainline::errors::PutQueryError::NoClosestNodes => QueryError::NoClosestNodes,
658                mainline::errors::PutQueryError::ErrorResponse(error) => {
659                    QueryError::DhtErrorResponse(error.code, error.description)
660                }
661            }),
662            PutMutableError::Concurrency(error) => PublishError::Concurrency(match error {
663                mainline::errors::ConcurrencyError::ConflictRisk => ConcurrencyError::ConflictRisk,
664                mainline::errors::ConcurrencyError::NotMostRecent => {
665                    ConcurrencyError::NotMostRecent
666                }
667                mainline::errors::ConcurrencyError::CasFailed => ConcurrencyError::CasFailed,
668            }),
669        }
670    }
671}
672
673async fn async_compat_if_necessary<T, O>(fut: T) -> O
674where
675    T: Future<Output = O>,
676{
677    #[cfg(not(wasm_browser))]
678    {
679        if tokio::runtime::Handle::try_current().is_err() {
680            return async_compat::Compat::new(fut).await;
681        }
682    }
683
684    fut.await
685}