mainline/
async_dht.rs

1//! AsyncDht node.
2
3use std::{
4    net::SocketAddrV4,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures_lite::{Stream, StreamExt};
10
11use crate::{
12    common::{
13        hash_immutable, AnnouncePeerRequestArguments, FindNodeRequestArguments,
14        GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem, Node,
15        PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific,
16    },
17    dht::{ActorMessage, Dht, PutMutableError, ResponseSender},
18    rpc::{GetRequestSpecific, Info, PutError, PutQueryError},
19};
20
21impl Dht {
22    /// Return an async version of the Dht client.
23    pub fn as_async(self) -> AsyncDht {
24        AsyncDht(self)
25    }
26}
27
28#[derive(Debug, Clone)]
29/// Async version of the Dht node.
30pub struct AsyncDht(Dht);
31
32impl AsyncDht {
33    /// Information and statistics about this [Dht] node.
34    pub async fn info(&self) -> Info {
35        let (tx, rx) = flume::bounded::<Info>(1);
36        self.send(ActorMessage::Info(tx));
37
38        rx.recv_async()
39            .await
40            .expect("actor thread unexpectedly shutdown")
41    }
42
43    /// Turn this node's routing table to a list of bootstrapping nodes.   
44    pub async fn to_bootstrap(&self) -> Vec<String> {
45        let (tx, rx) = flume::bounded::<Vec<String>>(1);
46        self.send(ActorMessage::ToBootstrap(tx));
47
48        rx.recv_async()
49            .await
50            .expect("actor thread unexpectedly shutdown")
51    }
52
53    // === Public Methods ===
54
55    /// Await until the bootstrapping query is done.
56    ///
57    /// Returns true if the bootstrapping was successful.
58    pub async fn bootstrapped(&self) -> bool {
59        let info = self.info().await;
60        let nodes = self.find_node(*info.id()).await;
61
62        !nodes.is_empty()
63    }
64
65    // === Find nodes ===
66
67    /// Returns the closest 20 [secure](Node::is_secure) nodes to a target [Id].
68    ///
69    /// Mostly useful to crawl the DHT.
70    ///
71    /// The returned nodes are claims by other nodes, they may be lies, or may have churned
72    /// since they were last seen, but haven't been pinged yet.
73    ///
74    /// You might need to ping them to confirm they exist, and responsive, or if you want to
75    /// learn more about them like the client they are using, or if they support a given BEP.
76    ///
77    /// If you are trying to find the closest nodes to a target with intent to [Self::put],
78    /// a request directly to these nodes (using `extra_nodes` parameter), then you should
79    /// use [Self::get_closest_nodes] instead.
80    pub async fn find_node(&self, target: Id) -> Box<[Node]> {
81        let (tx, rx) = flume::bounded::<Box<[Node]>>(1);
82        self.send(ActorMessage::Get(
83            GetRequestSpecific::FindNode(FindNodeRequestArguments { target }),
84            ResponseSender::ClosestNodes(tx),
85        ));
86
87        rx.recv_async()
88            .await
89            .expect("Query was dropped before sending a response, please open an issue.")
90    }
91
92    // === Peers ===
93
94    /// Get peers for a given infohash.
95    ///
96    /// Note: each node of the network will only return a _random_ subset (usually 20)
97    /// of the total peers it has for a given infohash, so if you are getting responses
98    /// from 20 nodes, you can expect up to 400 peers in total, but if there are more
99    /// announced peers on that infohash, you are likely to miss some, the logic here
100    /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange"
101    /// so if you are implementing something different from Bittorrent, you might want
102    /// to implement your own logic for gossipping more peers after you discover the first ones.
103    pub fn get_peers(&self, info_hash: Id) -> GetStream<Vec<SocketAddrV4>> {
104        let (tx, rx) = flume::unbounded::<Vec<SocketAddrV4>>();
105        self.send(ActorMessage::Get(
106            GetRequestSpecific::GetPeers(GetPeersRequestArguments { info_hash }),
107            ResponseSender::Peers(tx),
108        ));
109
110        GetStream(rx.into_stream())
111    }
112
113    /// Announce a peer for a given infohash.
114    ///
115    /// The peer will be announced on this process IP.
116    /// If explicit port is passed, it will be used, otherwise the port will be implicitly
117    /// assumed by remote nodes to be the same ase port they received the request from.
118    pub async fn announce_peer(
119        &self,
120        info_hash: Id,
121        port: Option<u16>,
122    ) -> Result<Id, PutQueryError> {
123        let (port, implied_port) = match port {
124            Some(port) => (port, None),
125            None => (0, Some(true)),
126        };
127
128        self.put(
129            PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
130                info_hash,
131                port,
132                implied_port,
133            }),
134            None,
135        )
136        .await
137        .map_err(|error| match error {
138            PutError::Query(error) => error,
139            PutError::Concurrency(_) => {
140                unreachable!("should not receive a concurrency error from announce peer query")
141            }
142        })
143    }
144
145    // === Immutable data ===
146
147    /// Get an Immutable data by its sha1 hash.
148    pub async fn get_immutable(&self, target: Id) -> Option<Box<[u8]>> {
149        let (tx, rx) = flume::unbounded::<Box<[u8]>>();
150        self.send(ActorMessage::Get(
151            GetRequestSpecific::GetValue(GetValueRequestArguments {
152                target,
153                seq: None,
154                salt: None,
155            }),
156            ResponseSender::Immutable(tx),
157        ));
158
159        rx.recv_async().await.map(Some).unwrap_or(None)
160    }
161
162    /// Put an immutable data to the DHT.
163    pub async fn put_immutable(&self, value: &[u8]) -> Result<Id, PutQueryError> {
164        let target: Id = hash_immutable(value).into();
165
166        self.put(
167            PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
168                target,
169                v: value.into(),
170            }),
171            None,
172        )
173        .await
174        .map_err(|error| match error {
175            PutError::Query(error) => error,
176            PutError::Concurrency(_) => {
177                unreachable!("should not receive a concurrency error from put immutable query")
178            }
179        })
180    }
181
182    // === Mutable data ===
183
184    /// Get a mutable data by its `public_key` and optional `salt`.
185    ///
186    /// You can ask for items `more_recent_than` than a certain `seq`,
187    /// usually one that you already have seen before, similar to `If-Modified-Since` header in HTTP.
188    ///
189    /// # Order
190    ///
191    /// The order of [MutableItem]s returned by this stream is not guaranteed to
192    /// reflect their `seq` value. You should not assume that the later items are
193    /// more recent than earlier ones.
194    ///
195    /// Consider using [Self::get_mutable_most_recent] if that is what you need.
196    pub fn get_mutable(
197        &self,
198        public_key: &[u8; 32],
199        salt: Option<&[u8]>,
200        more_recent_than: Option<i64>,
201    ) -> GetStream<MutableItem> {
202        let salt = salt.map(|s| s.into());
203        let target = MutableItem::target_from_key(public_key, salt.as_deref());
204        let (tx, rx) = flume::unbounded::<MutableItem>();
205        self.send(ActorMessage::Get(
206            GetRequestSpecific::GetValue(GetValueRequestArguments {
207                target,
208                seq: more_recent_than,
209                salt,
210            }),
211            ResponseSender::Mutable(tx),
212        ));
213
214        GetStream(rx.into_stream())
215    }
216
217    /// Get the most recent [MutableItem] from the network.
218    pub async fn get_mutable_most_recent(
219        &self,
220        public_key: &[u8; 32],
221        salt: Option<&[u8]>,
222    ) -> Option<MutableItem> {
223        let mut most_recent: Option<MutableItem> = None;
224        let mut stream = self.get_mutable(public_key, salt, None);
225
226        while let Some(item) = stream.next().await {
227            if let Some(mr) = &most_recent {
228                if item.seq() == mr.seq && item.value() > &mr.value {
229                    most_recent = Some(item)
230                }
231            } else {
232                most_recent = Some(item);
233            }
234        }
235
236        most_recent
237    }
238
239    /// Put a mutable data to the DHT.
240    ///
241    /// # Lost Update Problem
242    ///
243    /// As mainline DHT is a distributed system, it is vulnerable to [Write–write conflict](https://en.wikipedia.org/wiki/Write-write_conflict).
244    ///
245    /// ## Read first
246    ///
247    /// To mitigate the risk of lost updates, you should call the [Self::get_mutable_most_recent] method
248    /// then start authoring the new [MutableItem] based on the most recent as in the following example:
249    ///
250    ///```rust
251    /// use mainline::{Dht, MutableItem, SigningKey, Testnet};
252    ///
253    /// let testnet = Testnet::new(3).unwrap();
254    /// let dht = Dht::builder().bootstrap(&testnet.bootstrap).build().unwrap().as_async();
255    ///
256    /// let signing_key = SigningKey::from_bytes(&[0; 32]);
257    /// let key = signing_key.verifying_key().to_bytes();
258    /// let salt = Some(b"salt".as_ref());
259    ///
260    /// futures::executor::block_on(async move {
261    ///     let (item, cas) = if let Some(most_recent) = dht.get_mutable_most_recent(&key, salt).await {
262    ///         // 1. Optionally Create a new value to take the most recent's value in consideration.
263    ///         let mut new_value = most_recent.value().to_vec();
264    ///         new_value.extend_from_slice(b" more data");
265    ///
266    ///         // 2. Increment the sequence number to be higher than the most recent's.
267    ///         let most_recent_seq = most_recent.seq();
268    ///         let new_seq = most_recent_seq + 1;
269    ///
270    ///         (
271    ///             MutableItem::new(signing_key, &new_value, new_seq, salt),
272    ///             // 3. Use the most recent [MutableItem::seq] as a `CAS`.
273    ///             Some(most_recent_seq)
274    ///         )
275    ///     } else {
276    ///         (MutableItem::new(signing_key, b"first value", 1, salt), None)
277    ///     };
278    ///
279    ///     dht.put_mutable(item, cas).await.unwrap();
280    /// });
281    /// ```
282    ///
283    /// ## Errors
284    ///
285    /// In addition to the [PutQueryError] common with all PUT queries, PUT mutable item
286    /// query has other [Concurrency errors][crate::rpc::ConcurrencyError], that try to detect write conflict
287    /// risks or obvious conflicts.
288    ///
289    /// If you are lucky to get one of these errors (which is not guaranteed), then you should
290    /// read the most recent item again, and repeat the steps in the previous example.
291    pub async fn put_mutable(
292        &self,
293        item: MutableItem,
294        cas: Option<i64>,
295    ) -> Result<Id, PutMutableError> {
296        let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, cas));
297
298        self.put(request, None).await.map_err(|error| match error {
299            PutError::Query(err) => PutMutableError::Query(err),
300            PutError::Concurrency(err) => PutMutableError::Concurrency(err),
301        })
302    }
303
304    // === Raw ===
305
306    /// Get closet nodes to a specific target, that support [BEP_0044](https://www.bittorrent.org/beps/bep_0044.html).
307    ///
308    /// Useful to [Self::put] a request to nodes further from the 20 closest nodes to the
309    /// [PutRequestSpecific::target]. Which itself is useful to circumvent [extreme vertical sybil attacks](https://github.com/pubky/mainline/blob/main/docs/censorship-resistance.md#extreme-vertical-sybil-attacks).
310    pub async fn get_closest_nodes(&self, target: Id) -> Box<[Node]> {
311        let (tx, rx) = flume::unbounded::<Box<[Node]>>();
312        self.send(ActorMessage::Get(
313            GetRequestSpecific::GetValue(GetValueRequestArguments {
314                target,
315                salt: None,
316                seq: None,
317            }),
318            ResponseSender::ClosestNodes(tx),
319        ));
320
321        rx.recv_async()
322            .await
323            .expect("Query was dropped before sending a response, please open an issue.")
324    }
325
326    /// Send a PUT request to the closest nodes, and optionally some extra nodes.
327    ///
328    /// This is useful to put data to regions of the DHT other than the closest nodes
329    /// to this request's [target][PutRequestSpecific::target].
330    ///
331    /// You can find nodes close to other regions of the network by calling
332    /// [Self::get_closest_nodes] with the target that you want to find the closest nodes to.
333    ///
334    /// Note: extra nodes need to have [Node::valid_token].
335    pub async fn put(
336        &self,
337        request: PutRequestSpecific,
338        extra_nodes: Option<Box<[Node]>>,
339    ) -> Result<Id, PutError> {
340        self.put_inner(request, extra_nodes)
341            .recv_async()
342            .await
343            .expect("Query was dropped before sending a response, please open an issue.")
344    }
345
346    // === Private Methods ===
347
348    pub(crate) fn put_inner(
349        &self,
350        request: PutRequestSpecific,
351        extra_nodes: Option<Box<[Node]>>,
352    ) -> flume::Receiver<Result<Id, PutError>> {
353        let (tx, rx) = flume::bounded::<Result<Id, PutError>>(1);
354        self.send(ActorMessage::Put(request, tx, extra_nodes));
355
356        rx
357    }
358
359    fn send(&self, message: ActorMessage) {
360        self.0.send(message)
361    }
362}
363
364/// A [Stream] of incoming peers, immutable or mutable values.
365pub struct GetStream<T: 'static>(flume::r#async::RecvStream<'static, T>);
366
367impl<T> Stream for GetStream<T> {
368    type Item = T;
369
370    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371        let this = self.get_mut();
372        this.0.poll_next(cx)
373    }
374}
375
376#[cfg(test)]
377mod test {
378    use std::{str::FromStr, time::Duration};
379
380    use ed25519_dalek::SigningKey;
381    use futures::StreamExt;
382
383    use crate::{dht::Testnet, rpc::ConcurrencyError};
384
385    use super::*;
386
387    #[test]
388    fn announce_get_peer() {
389        async fn test() {
390            let testnet = Testnet::new(10).unwrap();
391
392            let a = Dht::builder()
393                .bootstrap(&testnet.bootstrap)
394                .build()
395                .unwrap()
396                .as_async();
397            let b = Dht::builder()
398                .bootstrap(&testnet.bootstrap)
399                .build()
400                .unwrap()
401                .as_async();
402
403            let info_hash = Id::random();
404
405            a.announce_peer(info_hash, Some(45555))
406                .await
407                .expect("failed to announce");
408
409            let peers = b.get_peers(info_hash).next().await.expect("No peers");
410
411            assert_eq!(peers.first().unwrap().port(), 45555);
412        }
413
414        futures::executor::block_on(test());
415    }
416
417    #[test]
418    fn put_get_immutable() {
419        async fn test() {
420            let testnet = Testnet::new(10).unwrap();
421
422            let a = Dht::builder()
423                .bootstrap(&testnet.bootstrap)
424                .build()
425                .unwrap()
426                .as_async();
427            let b = Dht::builder()
428                .bootstrap(&testnet.bootstrap)
429                .build()
430                .unwrap()
431                .as_async();
432
433            let value = b"Hello World!";
434            let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap();
435
436            let target = a.put_immutable(value).await.unwrap();
437            assert_eq!(target, expected_target);
438
439            let response = b.get_immutable(target).await;
440            assert_eq!(response, Some(value.to_vec().into_boxed_slice()));
441        }
442
443        futures::executor::block_on(test());
444    }
445
446    #[test]
447    fn put_get_mutable() {
448        async fn test() {
449            let testnet = Testnet::new(10).unwrap();
450
451            let a = Dht::builder()
452                .bootstrap(&testnet.bootstrap)
453                .build()
454                .unwrap()
455                .as_async();
456            let b = Dht::builder()
457                .bootstrap(&testnet.bootstrap)
458                .build()
459                .unwrap()
460                .as_async();
461
462            let signer = SigningKey::from_bytes(&[
463                56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
464                228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
465            ]);
466
467            let seq = 1000;
468            let value = b"Hello World!";
469
470            let item = MutableItem::new(signer.clone(), value, seq, None);
471
472            a.put_mutable(item.clone(), None).await.unwrap();
473
474            let response = b
475                .get_mutable(signer.verifying_key().as_bytes(), None, None)
476                .next()
477                .await
478                .expect("No mutable values");
479
480            assert_eq!(&response, &item);
481        }
482
483        futures::executor::block_on(test());
484    }
485
486    #[test]
487    fn put_get_mutable_no_more_recent_value() {
488        async fn test() {
489            let testnet = Testnet::new(10).unwrap();
490
491            let a = Dht::builder()
492                .bootstrap(&testnet.bootstrap)
493                .build()
494                .unwrap()
495                .as_async();
496            let b = Dht::builder()
497                .bootstrap(&testnet.bootstrap)
498                .build()
499                .unwrap()
500                .as_async();
501
502            let signer = SigningKey::from_bytes(&[
503                56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
504                228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
505            ]);
506
507            let seq = 1000;
508            let value = b"Hello World!";
509
510            let item = MutableItem::new(signer.clone(), value, seq, None);
511
512            a.put_mutable(item.clone(), None).await.unwrap();
513
514            let response = b
515                .get_mutable(signer.verifying_key().as_bytes(), None, Some(seq))
516                .next()
517                .await;
518
519            assert!(&response.is_none());
520        }
521
522        futures::executor::block_on(test());
523    }
524
525    #[test]
526    fn repeated_put_query() {
527        async fn test() {
528            let testnet = Testnet::new(10).unwrap();
529
530            let a = Dht::builder()
531                .bootstrap(&testnet.bootstrap)
532                .build()
533                .unwrap()
534                .as_async();
535
536            let first = a.put_immutable(&[1, 2, 3]).await;
537            let second = a.put_immutable(&[1, 2, 3]).await;
538
539            assert_eq!(first.unwrap(), second.unwrap());
540        }
541
542        futures::executor::block_on(test());
543    }
544
545    #[test]
546    fn concurrent_get_mutable() {
547        async fn test() {
548            let testnet = Testnet::new(10).unwrap();
549
550            let a = Dht::builder()
551                .bootstrap(&testnet.bootstrap)
552                .build()
553                .unwrap()
554                .as_async();
555            let b = Dht::builder()
556                .bootstrap(&testnet.bootstrap)
557                .build()
558                .unwrap()
559                .as_async();
560
561            let signer = SigningKey::from_bytes(&[
562                56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
563                228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
564            ]);
565
566            let seq = 1000;
567            let value = b"Hello World!";
568
569            let item = MutableItem::new(signer.clone(), value, seq, None);
570
571            a.put_mutable(item.clone(), None).await.unwrap();
572
573            let _response_first = b
574                .get_mutable(signer.verifying_key().as_bytes(), None, None)
575                .next()
576                .await
577                .expect("No mutable values");
578
579            let response_second = b
580                .get_mutable(signer.verifying_key().as_bytes(), None, None)
581                .next()
582                .await
583                .expect("No mutable values");
584
585            assert_eq!(&response_second, &item);
586        }
587
588        futures::executor::block_on(test());
589    }
590
591    #[test]
592    fn concurrent_put_mutable_same() {
593        let testnet = Testnet::new(10).unwrap();
594
595        let dht = Dht::builder()
596            .bootstrap(&testnet.bootstrap)
597            .build()
598            .unwrap()
599            .as_async();
600
601        let signer = SigningKey::from_bytes(&[
602            56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
603            228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
604        ]);
605
606        let seq = 1000;
607        let value = b"Hello World!";
608
609        let item = MutableItem::new(signer.clone(), value, seq, None);
610
611        let mut handles = vec![];
612
613        for _ in 0..2 {
614            let dht = dht.clone();
615            let item = item.clone();
616
617            let handle = std::thread::spawn(move || {
618                futures::executor::block_on(async { dht.put_mutable(item, None).await.unwrap() });
619            });
620
621            handles.push(handle);
622        }
623
624        for handle in handles {
625            handle.join().unwrap();
626        }
627    }
628
629    #[test]
630    fn concurrent_put_mutable_different() {
631        let testnet = Testnet::new(10).unwrap();
632
633        let dht = Dht::builder()
634            .bootstrap(&testnet.bootstrap)
635            .build()
636            .unwrap()
637            .as_async();
638
639        let mut handles = vec![];
640
641        for i in 0..2 {
642            let dht = dht.clone();
643
644            let signer = SigningKey::from_bytes(&[
645                56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
646                228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
647            ]);
648
649            let seq = 1000;
650
651            let mut value = b"Hello World!".to_vec();
652            value.push(i);
653
654            let item = MutableItem::new(signer.clone(), &value, seq, None);
655
656            let handle = std::thread::spawn(move || {
657                futures::executor::block_on(async {
658                    let result = dht.put_mutable(item, None).await;
659                    if i == 0 {
660                        assert!(matches!(result, Ok(_)))
661                    } else {
662                        assert!(matches!(
663                            result,
664                            Err(PutMutableError::Concurrency(ConcurrencyError::ConflictRisk))
665                        ))
666                    }
667                })
668            });
669
670            handles.push(handle);
671        }
672
673        for handle in handles {
674            handle.join().unwrap();
675        }
676    }
677
678    #[test]
679    fn concurrent_put_mutable_different_with_cas() {
680        async fn test() {
681            let testnet = Testnet::new(10).unwrap();
682
683            let dht = Dht::builder()
684                .bootstrap(&testnet.bootstrap)
685                .build()
686                .unwrap()
687                .as_async();
688
689            let signer = SigningKey::from_bytes(&[
690                56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
691                228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
692            ]);
693            let value = b"Hello World!".to_vec();
694
695            // First
696            {
697                let item = MutableItem::new(signer.clone(), &value, 1000, None);
698
699                let (sender, _) = flume::bounded::<Result<Id, PutError>>(1);
700                let request =
701                    PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, None));
702                dht.0
703                     .0
704                    .send(ActorMessage::Put(request, sender, None))
705                    .unwrap();
706            }
707
708            std::thread::sleep(Duration::from_millis(100));
709
710            // Second
711            {
712                let item = MutableItem::new(signer, &value, 1001, None);
713
714                let most_recent = dht.get_mutable_most_recent(item.key(), None).await;
715
716                if let Some(cas) = most_recent.map(|item| item.seq()) {
717                    dht.put_mutable(item, Some(cas)).await.unwrap();
718                } else {
719                    dht.put_mutable(item, None).await.unwrap();
720                }
721            }
722        }
723
724        futures::executor::block_on(test());
725    }
726
727    #[test]
728    fn conflict_301_cas() {
729        async fn test() {
730            let testnet = Testnet::new(10).unwrap();
731
732            let dht = Dht::builder()
733                .bootstrap(&testnet.bootstrap)
734                .build()
735                .unwrap()
736                .as_async();
737
738            let signer = SigningKey::from_bytes(&[
739                56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
740                228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
741            ]);
742            let value = b"Hello World!".to_vec();
743
744            dht.put_mutable(MutableItem::new(signer.clone(), &value, 1001, None), None)
745                .await
746                .unwrap();
747
748            assert!(matches!(
749                dht.put_mutable(MutableItem::new(signer, &value, 1002, None), Some(1000))
750                    .await,
751                Err(PutMutableError::Concurrency(ConcurrencyError::CasFailed))
752            ));
753        }
754
755        futures::executor::block_on(test());
756    }
757}