Skip to main content

n0_mainline/
dht.rs

1//! Dht node.
2
3use std::io;
4use std::net::{Ipv4Addr, SocketAddrV4};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[allow(unused_imports)]
9use ed25519_dalek::SigningKey;
10use futures_core::Stream;
11use tokio::sync::{mpsc, oneshot};
12
13#[allow(unused_imports)]
14use crate::{
15    Node, ServerSettings,
16    actor::{ActorMessage, Info, ResponseSender, config::Config},
17    common::{
18        AnnouncePeerRequestArguments, AnnounceSignedPeerRequestArguments, FindNodeRequestArguments,
19        GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem,
20        PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific,
21        SignedAnnounce, hash_immutable,
22    },
23    core::{ConcurrencyError, PutError, PutQueryError, iterative_query::GetRequestSpecific},
24};
25
26mod testnet;
27
28pub use testnet::Testnet;
29
30/// Capacity of the actor inbox channel.
31const ACTOR_INBOX_CAPACITY: usize = 256;
32
33#[n0_error::stack_error(derive)]
34#[derive(Clone, Copy, PartialEq, Eq)]
35/// Error returned when the DHT actor task has shut down.
36///
37/// All [Dht] handles are still usable but every method will return this error
38/// until the handles are dropped.
39#[error("DHT actor task has shut down")]
40pub struct ActorShutdown;
41
42impl From<ActorShutdown> for io::Error {
43    fn from(_: ActorShutdown) -> Self {
44        io::Error::other("DHT actor task has shut down")
45    }
46}
47
48#[derive(Debug, Clone)]
49/// Mainline Dht node.
50pub struct Dht(pub(crate) mpsc::Sender<ActorMessage>);
51
52#[derive(Debug, Default, Clone)]
53/// A builder for the [Dht] node.
54pub struct DhtBuilder(Config);
55
56impl DhtBuilder {
57    /// Set this node's server_mode.
58    pub fn server_mode(&mut self) -> &mut Self {
59        self.0.server_mode = true;
60
61        self
62    }
63
64    /// Set a custom settings for the node to use at server mode.
65    ///
66    /// Defaults to [ServerSettings::default]
67    pub fn server_settings(&mut self, server_settings: ServerSettings) -> &mut Self {
68        self.0.server_settings = server_settings;
69
70        self
71    }
72
73    /// Set bootstrapping nodes.
74    pub fn bootstrap<T: ToString>(&mut self, bootstrap: &[T]) -> &mut Self {
75        self.0.bootstrap = bootstrap.iter().map(|b| b.to_string()).collect();
76
77        self
78    }
79
80    /// Add more bootstrap nodes to default bootstrapping nodes.
81    ///
82    /// Useful when you want to augment the default bootstrapping nodes with
83    /// dynamic list of nodes you have seen in previous sessions.
84    pub fn extra_bootstrap<T: ToString>(&mut self, extra_bootstrap: &[T]) -> &mut Self {
85        for address in extra_bootstrap {
86            self.0.bootstrap.push(address.to_string());
87        }
88
89        self
90    }
91
92    /// Remove the existing bootstrapping nodes, usually to create the first node in a new network.
93    pub fn no_bootstrap(&mut self) -> &mut Self {
94        self.0.bootstrap = vec![];
95
96        self
97    }
98
99    /// Used to simulate a DHT that doesn't support `announce_signed_peers`
100    #[cfg(test)]
101    fn disable_signed_peers(&mut self) -> &mut Self {
102        self.0.disable_announce_signed_peers = true;
103
104        self
105    }
106
107    /// Set an explicit port to listen on.
108    pub fn port(&mut self, port: u16) -> &mut Self {
109        self.0.port = Some(port);
110
111        self
112    }
113
114    /// A known public IPv4 address for this node to generate
115    /// a secure node Id from according to [BEP_0042](https://www.bittorrent.org/beps/bep_0042.html)
116    ///
117    /// Defaults to depending on suggestions from responding nodes.
118    pub fn public_ip(&mut self, public_ip: Ipv4Addr) -> &mut Self {
119        self.0.public_ip = Some(public_ip);
120
121        self
122    }
123
124    /// Create a [Dht] node.
125    ///
126    /// Must be called from within a Tokio runtime context, since the UDP socket is
127    /// registered with the Tokio reactor during construction.
128    pub fn build(&self) -> io::Result<Dht> {
129        Dht::new(self.0.clone())
130    }
131}
132
133impl Dht {
134    /// Create a new Dht node.
135    ///
136    /// Binds the UDP socket synchronously, so any bind error is surfaced immediately.
137    /// Must be called from within a Tokio runtime context: the socket is registered
138    /// with the Tokio reactor here, and the actor loop is spawned as a background task.
139    pub fn new(config: Config) -> io::Result<Self> {
140        let actor = crate::actor::Actor::new(config)?;
141        let (sender, receiver) = mpsc::channel(ACTOR_INBOX_CAPACITY);
142        tokio::spawn(crate::actor::run(actor, receiver));
143        Ok(Dht(sender))
144    }
145
146    /// Returns a builder to edit settings before creating a Dht node.
147    pub fn builder() -> DhtBuilder {
148        DhtBuilder::default()
149    }
150
151    /// Create a new DHT client with default bootstrap nodes.
152    pub fn client() -> io::Result<Self> {
153        Dht::builder().build()
154    }
155
156    /// Create a new DHT node that is running in [Server mode][DhtBuilder::server_mode] as
157    /// soon as possible.
158    ///
159    /// You shouldn't use this option unless you are sure your
160    /// DHT node is publicly accessible (not firewalled) _AND_ will be long running,
161    /// and/or you are running your own local network for testing.
162    ///
163    /// If you are not sure, use [Self::client] and it will switch
164    /// to server mode when/if these two conditions are met.
165    pub fn server() -> io::Result<Self> {
166        Dht::builder().server_mode().build()
167    }
168
169    // === Getters ===
170
171    /// Information and statistics about this [Dht] node.
172    pub async fn info(&self) -> Result<Info, ActorShutdown> {
173        let (tx, rx) = oneshot::channel();
174        self.send(ActorMessage::Info(tx)).await?;
175
176        rx.await.map_err(|_| ActorShutdown)
177    }
178
179    /// Turn this node's routing table to a list of bootstrapping nodes.
180    pub async fn to_bootstrap(&self) -> Result<Vec<String>, ActorShutdown> {
181        let (tx, rx) = oneshot::channel();
182        self.send(ActorMessage::ToBootstrap(tx)).await?;
183
184        rx.await.map_err(|_| ActorShutdown)
185    }
186
187    // === Public Methods ===
188
189    /// Await until the bootstrapping query is done.
190    ///
191    /// Returns true if the bootstrapping was successful.
192    pub async fn bootstrapped(&self) -> Result<bool, ActorShutdown> {
193        let info = self.info().await?;
194        self.find_node(*info.id()).await?;
195
196        let info = self.info().await?;
197        Ok(info.routing_table_size() > 0)
198    }
199
200    // === Find nodes ===
201
202    /// Returns the closest 20 [secure](Node::is_secure) nodes to a target [Id].
203    ///
204    /// Mostly useful to crawl the DHT.
205    ///
206    /// The returned nodes are claims by other nodes, they may be lies, or may have churned
207    /// since they were last seen, but haven't been pinged yet.
208    ///
209    /// You might need to ping them to confirm they exist, and responsive, or if you want to
210    /// learn more about them like the client they are using, or if they support a given BEP.
211    ///
212    /// If you are trying to find the closest nodes to a target with intent to [Self::put],
213    /// a request directly to these nodes (using `extra_nodes` parameter), then you should
214    /// use [Self::get_closest_nodes] instead.
215    pub async fn find_node(&self, target: Id) -> Result<Box<[Node]>, ActorShutdown> {
216        let (tx, rx) = oneshot::channel();
217        self.send(ActorMessage::Get(
218            GetRequestSpecific::FindNode(FindNodeRequestArguments { target }),
219            ResponseSender::ClosestNodes(tx),
220        ))
221        .await?;
222
223        rx.await.map_err(|_| ActorShutdown)
224    }
225
226    // === Peers ===
227
228    /// Get peers for a given infohash.
229    ///
230    /// Note: each node of the network will only return a _random_ subset (usually 20)
231    /// of the total peers it has for a given infohash, so if you are getting responses
232    /// from 20 nodes, you can expect up to 400 peers in total, but if there are more
233    /// announced peers on that infohash, you are likely to miss some, the logic here
234    /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange"
235    /// so if you are implementing something different from Bittorrent, you might want
236    /// to implement your own logic for gossipping more peers after you discover the first ones.
237    pub async fn get_peers(
238        &self,
239        info_hash: Id,
240    ) -> Result<GetStream<Vec<SocketAddrV4>>, ActorShutdown> {
241        let (tx, rx) = mpsc::unbounded_channel();
242        self.send(ActorMessage::Get(
243            GetRequestSpecific::GetPeers(GetPeersRequestArguments { info_hash }),
244            ResponseSender::Peers(tx),
245        ))
246        .await?;
247
248        Ok(GetStream(rx))
249    }
250
251    /// Announce a peer for a given infohash.
252    ///
253    /// The peer will be announced on this process IP.
254    /// If explicit port is passed, it will be used, otherwise the port will be implicitly
255    /// assumed by remote nodes to be the same ase port they received the request from.
256    pub async fn announce_peer(
257        &self,
258        info_hash: Id,
259        port: Option<u16>,
260    ) -> Result<Id, PutQueryError> {
261        let (port, implied_port) = match port {
262            Some(port) => (port, None),
263            None => (0, Some(true)),
264        };
265
266        self.put(
267            PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
268                info_hash,
269                port,
270                implied_port,
271            }),
272            None,
273        )
274        .await
275        .map_err(put_error_to_query_error)
276    }
277
278    // === Signed Peers ===
279
280    /// Announce a signed peer for a given infohash.
281    ///
282    /// ## Namespacing
283    /// It is important to distinguish your overlay network and any other differentiator like a
284    /// sub-network or geographical distribution, by namespacing your `info_hash`, to avoid getting
285    /// signed peers you can't or don't want to connect to.
286    ///
287    /// The easiest way for namespacing is to hash a concatenation of your original `info_hash`
288    /// with the name of your network and any other filters, then pass the first 20 bytes as the
289    /// `info_hash` to this method.
290    ///
291    /// Read [BEP_????](https://github.com/Nuhvi/mainline/blob/main/beps/bep_signed_peers.rst) for more information.
292    #[cfg(feature = "unstable_signed_peers")]
293    #[cfg_attr(n0_mainline_docsrs, doc(cfg(feature = "unstable_signed_peers")))]
294    pub async fn announce_signed_peer(
295        &self,
296        info_hash: Id,
297        signer: &SigningKey,
298    ) -> Result<Id, PutQueryError> {
299        let signed_announce = SignedAnnounce::new(signer, &info_hash);
300
301        self.put(
302            PutRequestSpecific::AnnounceSignedPeer(AnnounceSignedPeerRequestArguments {
303                info_hash,
304                k: *signed_announce.key(),
305                t: signed_announce.timestamp(),
306                sig: *signed_announce.signature(),
307            }),
308            None,
309        )
310        .await
311        .map_err(put_error_to_query_error)
312    }
313
314    /// Get peers verifiably announced for a given infohash by their public key.
315    ///
316    /// ## Namespacing
317    /// It is important to distinguish your overlay network and any other differentiator like a
318    /// sub-network or geographical distribution, by namespacing your `info_hash`, to avoid getting
319    /// signed peers you can't or don't want to connect to.
320    ///
321    /// The easiest way for namespacing is to hash a concatenation of your original `info_hash`
322    /// with the name of your network and any other filters, then pass the first 20 bytes as the
323    /// `info_hash` to this method.
324    ///
325    /// Note: each node of the network will only return a _random_ subset (usually 20)
326    /// of the total peers it has for a given infohash, so if you are getting responses
327    /// from 20 nodes, you can expect up to 400 peers in total, but if there are more
328    /// announced peers on that infohash, you are likely to miss some, the logic here
329    /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange"
330    /// so if you are implementing something different from Bittorrent, you might want
331    /// to implement your own logic for gossipping more peers after you discover the first ones.
332    ///
333    /// Read [BEP_????](https://github.com/Nuhvi/mainline/blob/main/beps/bep_signed_peers.rst) for more information.
334    #[cfg(feature = "unstable_signed_peers")]
335    #[cfg_attr(n0_mainline_docsrs, doc(cfg(feature = "unstable_signed_peers")))]
336    pub async fn get_signed_peers(
337        &self,
338        info_hash: Id,
339    ) -> Result<GetStream<Vec<SignedAnnounce>>, ActorShutdown> {
340        let (tx, rx) = mpsc::unbounded_channel();
341        self.send(ActorMessage::Get(
342            GetRequestSpecific::GetSignedPeers(GetPeersRequestArguments { info_hash }),
343            ResponseSender::SignedPeers(tx),
344        ))
345        .await?;
346
347        Ok(GetStream(rx))
348    }
349
350    // === Immutable data ===
351
352    /// Get an Immutable data by its sha1 hash.
353    pub async fn get_immutable(&self, target: Id) -> Result<Option<Box<[u8]>>, ActorShutdown> {
354        let (tx, rx) = oneshot::channel();
355        self.send(ActorMessage::Get(
356            GetRequestSpecific::GetValue(GetValueRequestArguments {
357                target,
358                seq: None,
359                salt: None,
360            }),
361            ResponseSender::Immutable(Some(tx)),
362        ))
363        .await?;
364
365        // Sender dropped without sending → query completed without finding a value.
366        Ok(rx.await.ok())
367    }
368
369    /// Put an immutable data to the DHT.
370    pub async fn put_immutable(&self, value: &[u8]) -> Result<Id, PutQueryError> {
371        let target: Id = hash_immutable(value).into();
372
373        self.put(
374            PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
375                target,
376                v: value.into(),
377            }),
378            None,
379        )
380        .await
381        .map_err(put_error_to_query_error)
382    }
383
384    // === Mutable data ===
385
386    /// Get a mutable data by its `public_key` and optional `salt`.
387    ///
388    /// You can ask for items `more_recent_than` than a certain `seq`,
389    /// usually one that you already have seen before, similar to `If-Modified-Since` header in HTTP.
390    ///
391    /// # Order
392    ///
393    /// The order of [MutableItem]s returned by this stream is not guaranteed to
394    /// reflect their `seq` value. You should not assume that the later items are
395    /// more recent than earlier ones.
396    ///
397    /// Consider using [Self::get_mutable_most_recent] if that is what you need.
398    pub async fn get_mutable(
399        &self,
400        public_key: &[u8; 32],
401        salt: Option<&[u8]>,
402        more_recent_than: Option<i64>,
403    ) -> Result<GetStream<MutableItem>, ActorShutdown> {
404        let salt = salt.map(|s| s.into());
405        let target = MutableItem::target_from_key(public_key, salt.as_deref());
406        let (tx, rx) = mpsc::unbounded_channel();
407        self.send(ActorMessage::Get(
408            GetRequestSpecific::GetValue(GetValueRequestArguments {
409                target,
410                seq: more_recent_than,
411                salt,
412            }),
413            ResponseSender::Mutable(tx),
414        ))
415        .await?;
416
417        Ok(GetStream(rx))
418    }
419
420    /// Get the most recent [MutableItem] from the network.
421    pub async fn get_mutable_most_recent(
422        &self,
423        public_key: &[u8; 32],
424        salt: Option<&[u8]>,
425    ) -> Result<Option<MutableItem>, ActorShutdown> {
426        let mut most_recent: Option<MutableItem> = None;
427        let mut stream = self.get_mutable(public_key, salt, None).await?;
428
429        while let Some(item) = stream.0.recv().await {
430            if let Some(mr) = &most_recent {
431                if item.seq() == mr.seq && item.value() > &*mr.value {
432                    most_recent = Some(item)
433                }
434            } else {
435                most_recent = Some(item);
436            }
437        }
438
439        Ok(most_recent)
440    }
441
442    /// Put a mutable data to the DHT.
443    ///
444    /// # Lost Update Problem
445    ///
446    /// As mainline DHT is a distributed system, it is vulnerable to [Write–write conflict](https://en.wikipedia.org/wiki/Write-write_conflict).
447    ///
448    /// ## Read first
449    ///
450    /// To mitigate the risk of lost updates, you should call the [Self::get_mutable_most_recent] method
451    /// then start authoring the new [MutableItem] based on the most recent as in the following example:
452    ///
453    ///```rust,ignore
454    /// use dht::{Dht, MutableItem, SigningKey, Testnet};
455    ///
456    /// let testnet = Testnet::new(3).await.unwrap();
457    /// let dht = Dht::builder().bootstrap(&testnet.bootstrap).build().unwrap();
458    ///
459    /// let secret_key = SigningKey::from_bytes(&[0; 32]);
460    /// let key = secret_key.verifying_key().to_bytes();
461    /// let salt = Some(b"salt".as_ref());
462    ///
463    /// let (item, cas) = if let Some(most_recent) = dht .get_mutable_most_recent(&key, salt).await {
464    ///     // 1. Optionally Create a new value to take the most recent's value in consideration.
465    ///     let mut new_value = most_recent.value().to_vec();
466    ///     new_value.extend_from_slice(b" more data");
467    ///
468    ///     // 2. Increment the sequence number to be higher than the most recent's.
469    ///     let most_recent_seq = most_recent.seq();
470    ///     let new_seq = most_recent_seq + 1;
471    ///
472    ///     (
473    ///         MutableItem::new(&secret_key, &new_value, new_seq, salt),
474    ///         // 3. Use the most recent [MutableItem::seq] as a `CAS`.
475    ///         Some(most_recent_seq)
476    ///     )
477    /// } else {
478    ///     (MutableItem::new(&secret_key, b"first value", 1, salt), None)
479    /// };
480    ///
481    /// dht.put_mutable(item, cas).await.unwrap();
482    /// ```
483    ///
484    /// ## Errors
485    ///
486    /// In addition to the [PutQueryError] common with all PUT queries, PUT mutable item
487    /// query has other [Concurrency errors][ConcurrencyError], that try to detect write conflict
488    /// risks or obvious conflicts.
489    ///
490    /// If you are lucky to get one of these errors (which is not guaranteed), then you should
491    /// read the most recent item again, and repeat the steps in the previous example.
492    pub async fn put_mutable(
493        &self,
494        item: MutableItem,
495        cas: Option<i64>,
496    ) -> Result<Id, PutMutableError> {
497        let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, cas));
498
499        self.put(request, None).await.map_err(|error| match error {
500            PutError::Query(err) => PutMutableError::Query(err),
501            PutError::Concurrency(err) => PutMutableError::Concurrency(err),
502        })
503    }
504
505    // === Raw ===
506
507    /// Get closet nodes to a specific target, that support [BEP_0044](https://www.bittorrent.org/beps/bep_0044.html).
508    ///
509    /// Useful to [Self::put] a request to nodes further from the 20 closest nodes to the
510    /// [PutRequestSpecific::target]. Which itself is useful to circumvent [extreme vertical sybil attacks](https://github.com/nuhvi/mainline/blob/main/docs/censorship-resistance.md#extreme-vertical-sybil-attacks).
511    pub async fn get_closest_nodes(&self, target: Id) -> Result<Box<[Node]>, ActorShutdown> {
512        let (tx, rx) = oneshot::channel();
513        self.send(ActorMessage::Get(
514            GetRequestSpecific::GetValue(GetValueRequestArguments {
515                target,
516                salt: None,
517                seq: None,
518            }),
519            ResponseSender::ClosestNodes(tx),
520        ))
521        .await?;
522
523        rx.await.map_err(|_| ActorShutdown)
524    }
525
526    /// Send a PUT request to the closest nodes, and optionally some extra nodes.
527    ///
528    /// This is useful to put data to regions of the DHT other than the closest nodes
529    /// to this request's [target][PutRequestSpecific::target].
530    ///
531    /// You can find nodes close to other regions of the network by calling
532    /// [Self::get_closest_nodes] with the target that you want to find the closest nodes to.
533    ///
534    /// Note: extra nodes need to have [Node::valid_token].
535    pub async fn put(
536        &self,
537        request: PutRequestSpecific,
538        extra_nodes: Option<Box<[Node]>>,
539    ) -> Result<Id, PutError> {
540        let (tx, rx) = oneshot::channel();
541        self.send(ActorMessage::Put(request, tx, extra_nodes))
542            .await?;
543
544        rx.await.map_err(|_| ActorShutdown)?
545    }
546
547    // === Private Methods ===
548
549    async fn send(&self, message: ActorMessage) -> Result<(), ActorShutdown> {
550        self.0.send(message).await.map_err(|_| ActorShutdown)
551    }
552}
553
554fn put_error_to_query_error(error: PutError) -> PutQueryError {
555    match error {
556        PutError::Query(error) => error,
557        PutError::Concurrency(_) => {
558            unreachable!("should not receive a concurrency error from this query type")
559        }
560    }
561}
562
563/// A [Stream] of incoming peers, immutable or mutable values.
564pub struct GetStream<T>(mpsc::UnboundedReceiver<T>);
565
566impl<T> Stream for GetStream<T> {
567    type Item = T;
568
569    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
570        let this = self.get_mut();
571        this.0.poll_recv(cx)
572    }
573}
574
575#[n0_error::stack_error(derive, from_sources, std_sources)]
576/// Put MutableItem errors.
577pub enum PutMutableError {
578    #[error(transparent)]
579    /// Common PutQuery errors
580    Query(PutQueryError),
581
582    #[error(transparent)]
583    /// PutQuery for [crate::MutableItem] errors
584    Concurrency(ConcurrencyError),
585}
586
587impl From<ActorShutdown> for PutMutableError {
588    fn from(_: ActorShutdown) -> Self {
589        PutMutableError::Query(PutQueryError::Shutdown)
590    }
591}
592
593impl From<ActorShutdown> for PutQueryError {
594    fn from(_: ActorShutdown) -> Self {
595        PutQueryError::Shutdown
596    }
597}
598
599impl From<ActorShutdown> for PutError {
600    fn from(_: ActorShutdown) -> Self {
601        PutError::Query(PutQueryError::Shutdown)
602    }
603}
604
605#[cfg(test)]
606mod test {
607    use std::{str::FromStr, time::Duration};
608
609    use ed25519_dalek::SigningKey;
610    use futures::StreamExt;
611    #[allow(unused_imports)]
612    use rand::Rng;
613
614    use crate::core::ConcurrencyError;
615
616    use super::*;
617
618    #[tokio::test]
619    #[ignore = "hits the real mainline DHT; run with --ignored"]
620    async fn put_get_mutable_real_dht() {
621        let _ = tracing_subscriber::fmt()
622            .with_env_filter("debug")
623            .try_init();
624
625        let dht = Dht::client().unwrap();
626
627        let signer = SigningKey::from_bytes(&rand::random());
628        let key = *signer.verifying_key().as_bytes();
629        let value = b"hello from n0-mainline test";
630
631        let item = MutableItem::new(&signer, value, 1, None);
632
633        dht.put_mutable(item.clone(), None).await.unwrap();
634
635        let response = dht
636            .get_mutable(&key, None, None)
637            .await
638            .unwrap()
639            .next()
640            .await
641            .expect("should resolve mutable item from real DHT");
642
643        assert_eq!(response.value(), value.as_slice());
644        assert_eq!(response.seq(), 1);
645    }
646
647    #[tokio::test]
648    async fn bind_twice() {
649        let a = Dht::client().unwrap();
650        let result = Dht::builder()
651            .port(a.info().await.unwrap().local_addr().port())
652            .server_mode()
653            .build();
654
655        assert!(result.is_err());
656    }
657
658    #[tokio::test]
659    async fn announce_get_peer() {
660        let testnet = Testnet::new(10).await.unwrap();
661
662        let a = Dht::builder()
663            .bootstrap(&testnet.bootstrap)
664            .build()
665            .unwrap();
666        let b = Dht::builder()
667            .bootstrap(&testnet.bootstrap)
668            .build()
669            .unwrap();
670
671        let info_hash = Id::random();
672
673        a.announce_peer(info_hash, Some(45555))
674            .await
675            .expect("failed to announce");
676
677        let peers = b
678            .get_peers(info_hash)
679            .await
680            .unwrap()
681            .next()
682            .await
683            .expect("No peers");
684
685        assert_eq!(peers.first().unwrap().port(), 45555);
686    }
687
688    #[tokio::test]
689    async fn put_get_immutable() {
690        let testnet = Testnet::new(10).await.unwrap();
691
692        let a = Dht::builder()
693            .bootstrap(&testnet.bootstrap)
694            .build()
695            .unwrap();
696        let b = Dht::builder()
697            .bootstrap(&testnet.bootstrap)
698            .build()
699            .unwrap();
700
701        let value = b"Hello World!";
702        let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap();
703
704        let target = a.put_immutable(value).await.unwrap();
705        assert_eq!(target, expected_target);
706
707        let response = b.get_immutable(target).await.unwrap().unwrap();
708
709        assert_eq!(response, value.to_vec().into_boxed_slice());
710    }
711
712    #[tokio::test]
713    async fn find_node_no_values() {
714        let client = Dht::builder().no_bootstrap().build().unwrap();
715
716        client.find_node(Id::random()).await.unwrap();
717    }
718
719    #[tokio::test]
720    async fn put_get_immutable_no_values() {
721        let client = Dht::builder().no_bootstrap().build().unwrap();
722
723        assert_eq!(client.get_immutable(Id::random()).await.unwrap(), None);
724    }
725
726    #[tokio::test]
727    async fn put_get_mutable() {
728        let testnet = Testnet::new(10).await.unwrap();
729
730        let a = Dht::builder()
731            .bootstrap(&testnet.bootstrap)
732            .build()
733            .unwrap();
734        let b = Dht::builder()
735            .bootstrap(&testnet.bootstrap)
736            .build()
737            .unwrap();
738
739        let signer = SigningKey::from_bytes(&[
740            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
741            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
742        ]);
743
744        let seq = 1000;
745        let value = b"Hello World!";
746
747        let item = MutableItem::new(&signer, value, seq, None);
748
749        a.put_mutable(item.clone(), None).await.unwrap();
750
751        let response = b
752            .get_mutable(signer.verifying_key().as_bytes(), None, None)
753            .await
754            .unwrap()
755            .next()
756            .await
757            .expect("No mutable values");
758
759        assert_eq!(&response, &item);
760    }
761
762    #[tokio::test]
763    async fn put_get_mutable_no_more_recent_value() {
764        let testnet = Testnet::new(10).await.unwrap();
765
766        let a = Dht::builder()
767            .bootstrap(&testnet.bootstrap)
768            .build()
769            .unwrap();
770        let b = Dht::builder()
771            .bootstrap(&testnet.bootstrap)
772            .build()
773            .unwrap();
774
775        let signer = SigningKey::from_bytes(&[
776            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
777            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
778        ]);
779
780        let seq = 1000;
781        let value = b"Hello World!";
782
783        let item = MutableItem::new(&signer, value, seq, None);
784
785        a.put_mutable(item.clone(), None).await.unwrap();
786
787        let response = b
788            .get_mutable(signer.verifying_key().as_bytes(), None, Some(seq))
789            .await
790            .unwrap()
791            .next()
792            .await;
793
794        assert!(&response.is_none());
795    }
796
797    #[tokio::test]
798    async fn repeated_put_query() {
799        let testnet = Testnet::new(10).await.unwrap();
800
801        let a = Dht::builder()
802            .bootstrap(&testnet.bootstrap)
803            .build()
804            .unwrap();
805
806        let id = a.put_immutable(&[1, 2, 3]).await.unwrap();
807
808        assert_eq!(a.put_immutable(&[1, 2, 3]).await.unwrap(), id);
809    }
810
811    #[tokio::test]
812    async fn concurrent_get_mutable() {
813        let testnet = Testnet::new(10).await.unwrap();
814
815        let a = Dht::builder()
816            .bootstrap(&testnet.bootstrap)
817            .build()
818            .unwrap();
819        let b = Dht::builder()
820            .bootstrap(&testnet.bootstrap)
821            .build()
822            .unwrap();
823
824        let signer = SigningKey::from_bytes(&[
825            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
826            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
827        ]);
828
829        let key = *signer.verifying_key().as_bytes();
830        let seq = 1000;
831        let value = b"Hello World!";
832
833        let item = MutableItem::new(&signer, value, seq, None);
834
835        a.put_mutable(item.clone(), None).await.unwrap();
836
837        let _response_first = b
838            .get_mutable(&key, None, None)
839            .await
840            .unwrap()
841            .next()
842            .await
843            .expect("No mutable values");
844
845        let response_second = b
846            .get_mutable(&key, None, None)
847            .await
848            .unwrap()
849            .next()
850            .await
851            .expect("No mutable values");
852
853        assert_eq!(&response_second, &item);
854    }
855
856    #[tokio::test]
857    async fn concurrent_put_mutable_different_with_cas() {
858        let testnet = Testnet::new(10).await.unwrap();
859
860        let client = Dht::builder()
861            .bootstrap(&testnet.bootstrap)
862            .build()
863            .unwrap();
864
865        let signer = SigningKey::from_bytes(&[
866            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
867            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
868        ]);
869
870        // First
871        {
872            let item = MutableItem::new(&signer, &[], 1000, None);
873
874            let (tx, _rx) = oneshot::channel();
875            let request =
876                PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, None));
877            client
878                .0
879                .send(ActorMessage::Put(request, tx, None))
880                .await
881                .unwrap();
882        }
883
884        tokio::time::sleep(Duration::from_millis(100)).await;
885
886        // Second
887        {
888            let item = MutableItem::new(&signer, &[], 1001, None);
889
890            let most_recent = client
891                .get_mutable_most_recent(item.key(), None)
892                .await
893                .unwrap();
894
895            if let Some(cas) = most_recent.map(|item| item.seq()) {
896                client.put_mutable(item, Some(cas)).await.unwrap();
897            } else {
898                client.put_mutable(item, None).await.unwrap();
899            }
900        }
901    }
902
903    #[tokio::test]
904    async fn conflict_302_seq_less_than_current() {
905        let testnet = Testnet::new(10).await.unwrap();
906
907        let client = Dht::builder()
908            .bootstrap(&testnet.bootstrap)
909            .build()
910            .unwrap();
911
912        let signer = SigningKey::from_bytes(&[
913            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
914            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
915        ]);
916
917        client
918            .put_mutable(MutableItem::new(&signer, &[], 1001, None), None)
919            .await
920            .unwrap();
921
922        assert!(matches!(
923            client
924                .put_mutable(MutableItem::new(&signer, &[], 1000, None), None)
925                .await,
926            Err(PutMutableError::Concurrency(
927                ConcurrencyError::NotMostRecent
928            ))
929        ));
930    }
931
932    #[tokio::test]
933    async fn conflict_301_cas() {
934        let testnet = Testnet::new(10).await.unwrap();
935
936        let client = Dht::builder()
937            .bootstrap(&testnet.bootstrap)
938            .build()
939            .unwrap();
940
941        let signer = SigningKey::from_bytes(&[
942            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
943            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
944        ]);
945
946        client
947            .put_mutable(MutableItem::new(&signer, &[], 1001, None), None)
948            .await
949            .unwrap();
950
951        assert!(matches!(
952            client
953                .put_mutable(MutableItem::new(&signer, &[], 1002, None), Some(1000))
954                .await,
955            Err(PutMutableError::Concurrency(ConcurrencyError::CasFailed))
956        ));
957    }
958
959    #[tokio::test]
960    async fn populate_bootstrapping_node_routing_table() {
961        let size = 3;
962
963        let testnet = Testnet::new(size).await.unwrap();
964
965        for n in &testnet.nodes {
966            assert_eq!(n.to_bootstrap().await.unwrap().len(), size - 1);
967        }
968    }
969
970    #[tokio::test]
971    async fn bootstrap_with_one_node() {
972        let testnet = Testnet::new(1).await.unwrap();
973
974        let client = Dht::builder()
975            .bootstrap(&testnet.bootstrap)
976            .build()
977            .unwrap();
978
979        assert!(client.bootstrapped().await.unwrap());
980    }
981
982    #[cfg(feature = "unstable_signed_peers")]
983    #[tokio::test]
984    async fn announce_signed_peers_at_full_adoption() {
985        let testnet = Testnet::new(10).await.unwrap();
986
987        let a = Dht::builder()
988            .bootstrap(&testnet.bootstrap)
989            .build()
990            .unwrap();
991        let b = Dht::builder()
992            .bootstrap(&testnet.bootstrap)
993            .build()
994            .unwrap();
995
996        let info_hash = Id::random();
997
998        let signers = [0, 1, 2]
999            .iter()
1000            .map(|_| {
1001                let mut secret_key = [0; 32];
1002                rand::rng().fill_bytes(&mut secret_key);
1003                SigningKey::from_bytes(&secret_key)
1004            })
1005            .collect::<Vec<_>>();
1006
1007        let mut expected_keys = signers
1008            .iter()
1009            .map(|s| s.verifying_key().as_bytes().to_vec())
1010            .collect::<Vec<_>>();
1011        expected_keys.sort();
1012
1013        for signer in signers {
1014            a.announce_signed_peer(info_hash, &signer)
1015                .await
1016                .expect("failed to announce");
1017        }
1018
1019        let peers = b
1020            .get_signed_peers(info_hash)
1021            .await
1022            .unwrap()
1023            .next()
1024            .await
1025            .expect("No peers");
1026
1027        let mut keys = peers.iter().map(|a| a.key().to_vec()).collect::<Vec<_>>();
1028        keys.sort();
1029
1030        assert_eq!(keys, expected_keys);
1031    }
1032
1033    #[cfg(feature = "unstable_signed_peers")]
1034    #[tokio::test]
1035    async fn announce_signed_peers_at_low_adoption() {
1036        let testnet_legacy = Testnet::new_without_signed_peers(10).await.unwrap();
1037
1038        let signers = [0, 1, 2]
1039            .iter()
1040            .map(|_| {
1041                let mut secret_key = [0; 32];
1042                rand::rng().fill_bytes(&mut secret_key);
1043                SigningKey::from_bytes(&secret_key)
1044            })
1045            .collect::<Vec<_>>();
1046
1047        let mut expected_keys = signers
1048            .iter()
1049            .map(|s| s.verifying_key().as_bytes().to_vec())
1050            .collect::<Vec<_>>();
1051        expected_keys.sort();
1052
1053        let info_hash = Id::random();
1054
1055        // confirm that our code disables `signed_announce_peers` for older versions
1056        {
1057            let a = Dht::builder()
1058                .bootstrap(&testnet_legacy.bootstrap)
1059                .disable_signed_peers()
1060                .build()
1061                .unwrap();
1062            assert!(
1063                a.announce_signed_peer(info_hash, &signers[0])
1064                    .await
1065                    .is_err()
1066            );
1067            assert_eq!(
1068                a.get_signed_peers(info_hash).await.unwrap().next().await,
1069                None
1070            )
1071        }
1072
1073        {
1074            let testnet_new = Testnet::new_with_bootstrap(3, &testnet_legacy.bootstrap)
1075                .await
1076                .unwrap();
1077
1078            let bootstrap = testnet_new.bootstrap;
1079
1080            let a = Dht::builder().bootstrap(&bootstrap).build().unwrap();
1081            let b = Dht::builder().bootstrap(&bootstrap).build().unwrap();
1082
1083            for signer in &signers {
1084                a.announce_signed_peer(info_hash, signer)
1085                    .await
1086                    .expect("failed to announce");
1087            }
1088
1089            let peers = b
1090                .get_signed_peers(info_hash)
1091                .await
1092                .unwrap()
1093                .next()
1094                .await
1095                .expect("No peers");
1096
1097            let mut keys = peers.iter().map(|a| a.key().to_vec()).collect::<Vec<_>>();
1098            keys.sort();
1099
1100            assert_eq!(keys, expected_keys);
1101        }
1102    }
1103}