1use std::{
4 net::SocketAddrV4,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures_lite::{Stream, StreamExt};
10
11use crate::{
12 common::{
13 hash_immutable, AnnouncePeerRequestArguments, FindNodeRequestArguments,
14 GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem, Node,
15 PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific,
16 },
17 dht::{ActorMessage, Dht, PutMutableError, ResponseSender},
18 rpc::{GetRequestSpecific, Info, PutError, PutQueryError},
19};
20
21impl Dht {
22 pub fn as_async(self) -> AsyncDht {
24 AsyncDht(self)
25 }
26}
27
28#[derive(Debug, Clone)]
29pub struct AsyncDht(Dht);
31
32impl AsyncDht {
33 pub async fn info(&self) -> Info {
35 let (tx, rx) = flume::bounded::<Info>(1);
36 self.send(ActorMessage::Info(tx));
37
38 rx.recv_async()
39 .await
40 .expect("actor thread unexpectedly shutdown")
41 }
42
43 pub async fn to_bootstrap(&self) -> Vec<String> {
45 let (tx, rx) = flume::bounded::<Vec<String>>(1);
46 self.send(ActorMessage::ToBootstrap(tx));
47
48 rx.recv_async()
49 .await
50 .expect("actor thread unexpectedly shutdown")
51 }
52
53 pub async fn bootstrapped(&self) -> bool {
59 let info = self.info().await;
60 let nodes = self.find_node(*info.id()).await;
61
62 !nodes.is_empty()
63 }
64
65 pub async fn find_node(&self, target: Id) -> Box<[Node]> {
81 let (tx, rx) = flume::bounded::<Box<[Node]>>(1);
82 self.send(ActorMessage::Get(
83 GetRequestSpecific::FindNode(FindNodeRequestArguments { target }),
84 ResponseSender::ClosestNodes(tx),
85 ));
86
87 rx.recv_async()
88 .await
89 .expect("Query was dropped before sending a response, please open an issue.")
90 }
91
92 pub fn get_peers(&self, info_hash: Id) -> GetStream<Vec<SocketAddrV4>> {
104 let (tx, rx) = flume::unbounded::<Vec<SocketAddrV4>>();
105 self.send(ActorMessage::Get(
106 GetRequestSpecific::GetPeers(GetPeersRequestArguments { info_hash }),
107 ResponseSender::Peers(tx),
108 ));
109
110 GetStream(rx.into_stream())
111 }
112
113 pub async fn announce_peer(
119 &self,
120 info_hash: Id,
121 port: Option<u16>,
122 ) -> Result<Id, PutQueryError> {
123 let (port, implied_port) = match port {
124 Some(port) => (port, None),
125 None => (0, Some(true)),
126 };
127
128 self.put(
129 PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
130 info_hash,
131 port,
132 implied_port,
133 }),
134 None,
135 )
136 .await
137 .map_err(|error| match error {
138 PutError::Query(error) => error,
139 PutError::Concurrency(_) => {
140 unreachable!("should not receive a concurrency error from announce peer query")
141 }
142 })
143 }
144
145 pub async fn get_immutable(&self, target: Id) -> Option<Box<[u8]>> {
149 let (tx, rx) = flume::unbounded::<Box<[u8]>>();
150 self.send(ActorMessage::Get(
151 GetRequestSpecific::GetValue(GetValueRequestArguments {
152 target,
153 seq: None,
154 salt: None,
155 }),
156 ResponseSender::Immutable(tx),
157 ));
158
159 rx.recv_async().await.map(Some).unwrap_or(None)
160 }
161
162 pub async fn put_immutable(&self, value: &[u8]) -> Result<Id, PutQueryError> {
164 let target: Id = hash_immutable(value).into();
165
166 self.put(
167 PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
168 target,
169 v: value.into(),
170 }),
171 None,
172 )
173 .await
174 .map_err(|error| match error {
175 PutError::Query(error) => error,
176 PutError::Concurrency(_) => {
177 unreachable!("should not receive a concurrency error from put immutable query")
178 }
179 })
180 }
181
182 pub fn get_mutable(
197 &self,
198 public_key: &[u8; 32],
199 salt: Option<&[u8]>,
200 more_recent_than: Option<i64>,
201 ) -> GetStream<MutableItem> {
202 let salt = salt.map(|s| s.into());
203 let target = MutableItem::target_from_key(public_key, salt.as_deref());
204 let (tx, rx) = flume::unbounded::<MutableItem>();
205 self.send(ActorMessage::Get(
206 GetRequestSpecific::GetValue(GetValueRequestArguments {
207 target,
208 seq: more_recent_than,
209 salt,
210 }),
211 ResponseSender::Mutable(tx),
212 ));
213
214 GetStream(rx.into_stream())
215 }
216
217 pub async fn get_mutable_most_recent(
219 &self,
220 public_key: &[u8; 32],
221 salt: Option<&[u8]>,
222 ) -> Option<MutableItem> {
223 let mut most_recent: Option<MutableItem> = None;
224 let mut stream = self.get_mutable(public_key, salt, None);
225
226 while let Some(item) = stream.next().await {
227 if let Some(mr) = &most_recent {
228 if item.seq() == mr.seq && item.value() > &mr.value {
229 most_recent = Some(item)
230 }
231 } else {
232 most_recent = Some(item);
233 }
234 }
235
236 most_recent
237 }
238
239 pub async fn put_mutable(
292 &self,
293 item: MutableItem,
294 cas: Option<i64>,
295 ) -> Result<Id, PutMutableError> {
296 let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, cas));
297
298 self.put(request, None).await.map_err(|error| match error {
299 PutError::Query(err) => PutMutableError::Query(err),
300 PutError::Concurrency(err) => PutMutableError::Concurrency(err),
301 })
302 }
303
304 pub async fn get_closest_nodes(&self, target: Id) -> Box<[Node]> {
311 let (tx, rx) = flume::unbounded::<Box<[Node]>>();
312 self.send(ActorMessage::Get(
313 GetRequestSpecific::GetValue(GetValueRequestArguments {
314 target,
315 salt: None,
316 seq: None,
317 }),
318 ResponseSender::ClosestNodes(tx),
319 ));
320
321 rx.recv_async()
322 .await
323 .expect("Query was dropped before sending a response, please open an issue.")
324 }
325
326 pub async fn put(
336 &self,
337 request: PutRequestSpecific,
338 extra_nodes: Option<Box<[Node]>>,
339 ) -> Result<Id, PutError> {
340 self.put_inner(request, extra_nodes)
341 .recv_async()
342 .await
343 .expect("Query was dropped before sending a response, please open an issue.")
344 }
345
346 pub(crate) fn put_inner(
349 &self,
350 request: PutRequestSpecific,
351 extra_nodes: Option<Box<[Node]>>,
352 ) -> flume::Receiver<Result<Id, PutError>> {
353 let (tx, rx) = flume::bounded::<Result<Id, PutError>>(1);
354 self.send(ActorMessage::Put(request, tx, extra_nodes));
355
356 rx
357 }
358
359 fn send(&self, message: ActorMessage) {
360 self.0.send(message)
361 }
362}
363
364pub struct GetStream<T: 'static>(flume::r#async::RecvStream<'static, T>);
366
367impl<T> Stream for GetStream<T> {
368 type Item = T;
369
370 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371 let this = self.get_mut();
372 this.0.poll_next(cx)
373 }
374}
375
376#[cfg(test)]
377mod test {
378 use std::{str::FromStr, time::Duration};
379
380 use ed25519_dalek::SigningKey;
381 use futures::StreamExt;
382
383 use crate::{dht::Testnet, rpc::ConcurrencyError};
384
385 use super::*;
386
387 #[test]
388 fn announce_get_peer() {
389 async fn test() {
390 let testnet = Testnet::new(10).unwrap();
391
392 let a = Dht::builder()
393 .bootstrap(&testnet.bootstrap)
394 .build()
395 .unwrap()
396 .as_async();
397 let b = Dht::builder()
398 .bootstrap(&testnet.bootstrap)
399 .build()
400 .unwrap()
401 .as_async();
402
403 let info_hash = Id::random();
404
405 a.announce_peer(info_hash, Some(45555))
406 .await
407 .expect("failed to announce");
408
409 let peers = b.get_peers(info_hash).next().await.expect("No peers");
410
411 assert_eq!(peers.first().unwrap().port(), 45555);
412 }
413
414 futures::executor::block_on(test());
415 }
416
417 #[test]
418 fn put_get_immutable() {
419 async fn test() {
420 let testnet = Testnet::new(10).unwrap();
421
422 let a = Dht::builder()
423 .bootstrap(&testnet.bootstrap)
424 .build()
425 .unwrap()
426 .as_async();
427 let b = Dht::builder()
428 .bootstrap(&testnet.bootstrap)
429 .build()
430 .unwrap()
431 .as_async();
432
433 let value = b"Hello World!";
434 let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap();
435
436 let target = a.put_immutable(value).await.unwrap();
437 assert_eq!(target, expected_target);
438
439 let response = b.get_immutable(target).await;
440 assert_eq!(response, Some(value.to_vec().into_boxed_slice()));
441 }
442
443 futures::executor::block_on(test());
444 }
445
446 #[test]
447 fn put_get_mutable() {
448 async fn test() {
449 let testnet = Testnet::new(10).unwrap();
450
451 let a = Dht::builder()
452 .bootstrap(&testnet.bootstrap)
453 .build()
454 .unwrap()
455 .as_async();
456 let b = Dht::builder()
457 .bootstrap(&testnet.bootstrap)
458 .build()
459 .unwrap()
460 .as_async();
461
462 let signer = SigningKey::from_bytes(&[
463 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
464 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
465 ]);
466
467 let seq = 1000;
468 let value = b"Hello World!";
469
470 let item = MutableItem::new(signer.clone(), value, seq, None);
471
472 a.put_mutable(item.clone(), None).await.unwrap();
473
474 let response = b
475 .get_mutable(signer.verifying_key().as_bytes(), None, None)
476 .next()
477 .await
478 .expect("No mutable values");
479
480 assert_eq!(&response, &item);
481 }
482
483 futures::executor::block_on(test());
484 }
485
486 #[test]
487 fn put_get_mutable_no_more_recent_value() {
488 async fn test() {
489 let testnet = Testnet::new(10).unwrap();
490
491 let a = Dht::builder()
492 .bootstrap(&testnet.bootstrap)
493 .build()
494 .unwrap()
495 .as_async();
496 let b = Dht::builder()
497 .bootstrap(&testnet.bootstrap)
498 .build()
499 .unwrap()
500 .as_async();
501
502 let signer = SigningKey::from_bytes(&[
503 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
504 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
505 ]);
506
507 let seq = 1000;
508 let value = b"Hello World!";
509
510 let item = MutableItem::new(signer.clone(), value, seq, None);
511
512 a.put_mutable(item.clone(), None).await.unwrap();
513
514 let response = b
515 .get_mutable(signer.verifying_key().as_bytes(), None, Some(seq))
516 .next()
517 .await;
518
519 assert!(&response.is_none());
520 }
521
522 futures::executor::block_on(test());
523 }
524
525 #[test]
526 fn repeated_put_query() {
527 async fn test() {
528 let testnet = Testnet::new(10).unwrap();
529
530 let a = Dht::builder()
531 .bootstrap(&testnet.bootstrap)
532 .build()
533 .unwrap()
534 .as_async();
535
536 let first = a.put_immutable(&[1, 2, 3]).await;
537 let second = a.put_immutable(&[1, 2, 3]).await;
538
539 assert_eq!(first.unwrap(), second.unwrap());
540 }
541
542 futures::executor::block_on(test());
543 }
544
545 #[test]
546 fn concurrent_get_mutable() {
547 async fn test() {
548 let testnet = Testnet::new(10).unwrap();
549
550 let a = Dht::builder()
551 .bootstrap(&testnet.bootstrap)
552 .build()
553 .unwrap()
554 .as_async();
555 let b = Dht::builder()
556 .bootstrap(&testnet.bootstrap)
557 .build()
558 .unwrap()
559 .as_async();
560
561 let signer = SigningKey::from_bytes(&[
562 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
563 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
564 ]);
565
566 let seq = 1000;
567 let value = b"Hello World!";
568
569 let item = MutableItem::new(signer.clone(), value, seq, None);
570
571 a.put_mutable(item.clone(), None).await.unwrap();
572
573 let _response_first = b
574 .get_mutable(signer.verifying_key().as_bytes(), None, None)
575 .next()
576 .await
577 .expect("No mutable values");
578
579 let response_second = b
580 .get_mutable(signer.verifying_key().as_bytes(), None, None)
581 .next()
582 .await
583 .expect("No mutable values");
584
585 assert_eq!(&response_second, &item);
586 }
587
588 futures::executor::block_on(test());
589 }
590
591 #[test]
592 fn concurrent_put_mutable_same() {
593 let testnet = Testnet::new(10).unwrap();
594
595 let dht = Dht::builder()
596 .bootstrap(&testnet.bootstrap)
597 .build()
598 .unwrap()
599 .as_async();
600
601 let signer = SigningKey::from_bytes(&[
602 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
603 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
604 ]);
605
606 let seq = 1000;
607 let value = b"Hello World!";
608
609 let item = MutableItem::new(signer.clone(), value, seq, None);
610
611 let mut handles = vec![];
612
613 for _ in 0..2 {
614 let dht = dht.clone();
615 let item = item.clone();
616
617 let handle = std::thread::spawn(move || {
618 futures::executor::block_on(async { dht.put_mutable(item, None).await.unwrap() });
619 });
620
621 handles.push(handle);
622 }
623
624 for handle in handles {
625 handle.join().unwrap();
626 }
627 }
628
629 #[test]
630 fn concurrent_put_mutable_different() {
631 let testnet = Testnet::new(10).unwrap();
632
633 let dht = Dht::builder()
634 .bootstrap(&testnet.bootstrap)
635 .build()
636 .unwrap()
637 .as_async();
638
639 let mut handles = vec![];
640
641 for i in 0..2 {
642 let dht = dht.clone();
643
644 let signer = SigningKey::from_bytes(&[
645 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
646 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
647 ]);
648
649 let seq = 1000;
650
651 let mut value = b"Hello World!".to_vec();
652 value.push(i);
653
654 let item = MutableItem::new(signer.clone(), &value, seq, None);
655
656 let handle = std::thread::spawn(move || {
657 futures::executor::block_on(async {
658 let result = dht.put_mutable(item, None).await;
659 if i == 0 {
660 assert!(matches!(result, Ok(_)))
661 } else {
662 assert!(matches!(
663 result,
664 Err(PutMutableError::Concurrency(ConcurrencyError::ConflictRisk))
665 ))
666 }
667 })
668 });
669
670 handles.push(handle);
671 }
672
673 for handle in handles {
674 handle.join().unwrap();
675 }
676 }
677
678 #[test]
679 fn concurrent_put_mutable_different_with_cas() {
680 async fn test() {
681 let testnet = Testnet::new(10).unwrap();
682
683 let dht = Dht::builder()
684 .bootstrap(&testnet.bootstrap)
685 .build()
686 .unwrap()
687 .as_async();
688
689 let signer = SigningKey::from_bytes(&[
690 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
691 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
692 ]);
693 let value = b"Hello World!".to_vec();
694
695 {
697 let item = MutableItem::new(signer.clone(), &value, 1000, None);
698
699 let (sender, _) = flume::bounded::<Result<Id, PutError>>(1);
700 let request =
701 PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, None));
702 dht.0
703 .0
704 .send(ActorMessage::Put(request, sender, None))
705 .unwrap();
706 }
707
708 std::thread::sleep(Duration::from_millis(100));
709
710 {
712 let item = MutableItem::new(signer, &value, 1001, None);
713
714 let most_recent = dht.get_mutable_most_recent(item.key(), None).await;
715
716 if let Some(cas) = most_recent.map(|item| item.seq()) {
717 dht.put_mutable(item, Some(cas)).await.unwrap();
718 } else {
719 dht.put_mutable(item, None).await.unwrap();
720 }
721 }
722 }
723
724 futures::executor::block_on(test());
725 }
726
727 #[test]
728 fn conflict_301_cas() {
729 async fn test() {
730 let testnet = Testnet::new(10).unwrap();
731
732 let dht = Dht::builder()
733 .bootstrap(&testnet.bootstrap)
734 .build()
735 .unwrap()
736 .as_async();
737
738 let signer = SigningKey::from_bytes(&[
739 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
740 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
741 ]);
742 let value = b"Hello World!".to_vec();
743
744 dht.put_mutable(MutableItem::new(signer.clone(), &value, 1001, None), None)
745 .await
746 .unwrap();
747
748 assert!(matches!(
749 dht.put_mutable(MutableItem::new(signer, &value, 1002, None), Some(1000))
750 .await,
751 Err(PutMutableError::Concurrency(ConcurrencyError::CasFailed))
752 ));
753 }
754
755 futures::executor::block_on(test());
756 }
757}