1use std::io;
4use std::net::{Ipv4Addr, SocketAddrV4};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[allow(unused_imports)]
9use ed25519_dalek::SigningKey;
10use futures_core::Stream;
11use tokio::sync::{mpsc, oneshot};
12
13#[allow(unused_imports)]
14use crate::{
15 Node, ServerSettings,
16 actor::{ActorMessage, Info, ResponseSender, config::Config},
17 common::{
18 AnnouncePeerRequestArguments, AnnounceSignedPeerRequestArguments, FindNodeRequestArguments,
19 GetPeersRequestArguments, GetValueRequestArguments, Id, MutableItem,
20 PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific,
21 SignedAnnounce, hash_immutable,
22 },
23 core::{ConcurrencyError, PutError, PutQueryError, iterative_query::GetRequestSpecific},
24};
25
26mod testnet;
27
28pub use testnet::Testnet;
29
30const ACTOR_INBOX_CAPACITY: usize = 256;
32
33#[n0_error::stack_error(derive)]
34#[derive(Clone, Copy, PartialEq, Eq)]
35#[error("DHT actor task has shut down")]
40pub struct ActorShutdown;
41
42impl From<ActorShutdown> for io::Error {
43 fn from(_: ActorShutdown) -> Self {
44 io::Error::other("DHT actor task has shut down")
45 }
46}
47
48#[derive(Debug, Clone)]
49pub struct Dht(pub(crate) mpsc::Sender<ActorMessage>);
51
52#[derive(Debug, Default, Clone)]
53pub struct DhtBuilder(Config);
55
56impl DhtBuilder {
57 pub fn server_mode(&mut self) -> &mut Self {
59 self.0.server_mode = true;
60
61 self
62 }
63
64 pub fn server_settings(&mut self, server_settings: ServerSettings) -> &mut Self {
68 self.0.server_settings = server_settings;
69
70 self
71 }
72
73 pub fn bootstrap<T: ToString>(&mut self, bootstrap: &[T]) -> &mut Self {
75 self.0.bootstrap = bootstrap.iter().map(|b| b.to_string()).collect();
76
77 self
78 }
79
80 pub fn extra_bootstrap<T: ToString>(&mut self, extra_bootstrap: &[T]) -> &mut Self {
85 for address in extra_bootstrap {
86 self.0.bootstrap.push(address.to_string());
87 }
88
89 self
90 }
91
92 pub fn no_bootstrap(&mut self) -> &mut Self {
94 self.0.bootstrap = vec![];
95
96 self
97 }
98
99 #[cfg(test)]
101 fn disable_signed_peers(&mut self) -> &mut Self {
102 self.0.disable_announce_signed_peers = true;
103
104 self
105 }
106
107 pub fn port(&mut self, port: u16) -> &mut Self {
109 self.0.port = Some(port);
110
111 self
112 }
113
114 pub fn public_ip(&mut self, public_ip: Ipv4Addr) -> &mut Self {
119 self.0.public_ip = Some(public_ip);
120
121 self
122 }
123
124 pub fn build(&self) -> io::Result<Dht> {
129 Dht::new(self.0.clone())
130 }
131}
132
133impl Dht {
134 pub fn new(config: Config) -> io::Result<Self> {
140 let actor = crate::actor::Actor::new(config)?;
141 let (sender, receiver) = mpsc::channel(ACTOR_INBOX_CAPACITY);
142 tokio::spawn(crate::actor::run(actor, receiver));
143 Ok(Dht(sender))
144 }
145
146 pub fn builder() -> DhtBuilder {
148 DhtBuilder::default()
149 }
150
151 pub fn client() -> io::Result<Self> {
153 Dht::builder().build()
154 }
155
156 pub fn server() -> io::Result<Self> {
166 Dht::builder().server_mode().build()
167 }
168
169 pub async fn info(&self) -> Result<Info, ActorShutdown> {
173 let (tx, rx) = oneshot::channel();
174 self.send(ActorMessage::Info(tx)).await?;
175
176 rx.await.map_err(|_| ActorShutdown)
177 }
178
179 pub async fn to_bootstrap(&self) -> Result<Vec<String>, ActorShutdown> {
181 let (tx, rx) = oneshot::channel();
182 self.send(ActorMessage::ToBootstrap(tx)).await?;
183
184 rx.await.map_err(|_| ActorShutdown)
185 }
186
187 pub async fn bootstrapped(&self) -> Result<bool, ActorShutdown> {
193 let info = self.info().await?;
194 self.find_node(*info.id()).await?;
195
196 let info = self.info().await?;
197 Ok(info.routing_table_size() > 0)
198 }
199
200 pub async fn find_node(&self, target: Id) -> Result<Box<[Node]>, ActorShutdown> {
216 let (tx, rx) = oneshot::channel();
217 self.send(ActorMessage::Get(
218 GetRequestSpecific::FindNode(FindNodeRequestArguments { target }),
219 ResponseSender::ClosestNodes(tx),
220 ))
221 .await?;
222
223 rx.await.map_err(|_| ActorShutdown)
224 }
225
226 pub async fn get_peers(
238 &self,
239 info_hash: Id,
240 ) -> Result<GetStream<Vec<SocketAddrV4>>, ActorShutdown> {
241 let (tx, rx) = mpsc::unbounded_channel();
242 self.send(ActorMessage::Get(
243 GetRequestSpecific::GetPeers(GetPeersRequestArguments { info_hash }),
244 ResponseSender::Peers(tx),
245 ))
246 .await?;
247
248 Ok(GetStream(rx))
249 }
250
251 pub async fn announce_peer(
257 &self,
258 info_hash: Id,
259 port: Option<u16>,
260 ) -> Result<Id, PutQueryError> {
261 let (port, implied_port) = match port {
262 Some(port) => (port, None),
263 None => (0, Some(true)),
264 };
265
266 self.put(
267 PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments {
268 info_hash,
269 port,
270 implied_port,
271 }),
272 None,
273 )
274 .await
275 .map_err(put_error_to_query_error)
276 }
277
278 #[cfg(feature = "unstable_signed_peers")]
293 #[cfg_attr(n0_mainline_docsrs, doc(cfg(feature = "unstable_signed_peers")))]
294 pub async fn announce_signed_peer(
295 &self,
296 info_hash: Id,
297 signer: &SigningKey,
298 ) -> Result<Id, PutQueryError> {
299 let signed_announce = SignedAnnounce::new(signer, &info_hash);
300
301 self.put(
302 PutRequestSpecific::AnnounceSignedPeer(AnnounceSignedPeerRequestArguments {
303 info_hash,
304 k: *signed_announce.key(),
305 t: signed_announce.timestamp(),
306 sig: *signed_announce.signature(),
307 }),
308 None,
309 )
310 .await
311 .map_err(put_error_to_query_error)
312 }
313
314 #[cfg(feature = "unstable_signed_peers")]
335 #[cfg_attr(n0_mainline_docsrs, doc(cfg(feature = "unstable_signed_peers")))]
336 pub async fn get_signed_peers(
337 &self,
338 info_hash: Id,
339 ) -> Result<GetStream<Vec<SignedAnnounce>>, ActorShutdown> {
340 let (tx, rx) = mpsc::unbounded_channel();
341 self.send(ActorMessage::Get(
342 GetRequestSpecific::GetSignedPeers(GetPeersRequestArguments { info_hash }),
343 ResponseSender::SignedPeers(tx),
344 ))
345 .await?;
346
347 Ok(GetStream(rx))
348 }
349
350 pub async fn get_immutable(&self, target: Id) -> Result<Option<Box<[u8]>>, ActorShutdown> {
354 let (tx, rx) = oneshot::channel();
355 self.send(ActorMessage::Get(
356 GetRequestSpecific::GetValue(GetValueRequestArguments {
357 target,
358 seq: None,
359 salt: None,
360 }),
361 ResponseSender::Immutable(Some(tx)),
362 ))
363 .await?;
364
365 Ok(rx.await.ok())
367 }
368
369 pub async fn put_immutable(&self, value: &[u8]) -> Result<Id, PutQueryError> {
371 let target: Id = hash_immutable(value).into();
372
373 self.put(
374 PutRequestSpecific::PutImmutable(PutImmutableRequestArguments {
375 target,
376 v: value.into(),
377 }),
378 None,
379 )
380 .await
381 .map_err(put_error_to_query_error)
382 }
383
384 pub async fn get_mutable(
399 &self,
400 public_key: &[u8; 32],
401 salt: Option<&[u8]>,
402 more_recent_than: Option<i64>,
403 ) -> Result<GetStream<MutableItem>, ActorShutdown> {
404 let salt = salt.map(|s| s.into());
405 let target = MutableItem::target_from_key(public_key, salt.as_deref());
406 let (tx, rx) = mpsc::unbounded_channel();
407 self.send(ActorMessage::Get(
408 GetRequestSpecific::GetValue(GetValueRequestArguments {
409 target,
410 seq: more_recent_than,
411 salt,
412 }),
413 ResponseSender::Mutable(tx),
414 ))
415 .await?;
416
417 Ok(GetStream(rx))
418 }
419
420 pub async fn get_mutable_most_recent(
422 &self,
423 public_key: &[u8; 32],
424 salt: Option<&[u8]>,
425 ) -> Result<Option<MutableItem>, ActorShutdown> {
426 let mut most_recent: Option<MutableItem> = None;
427 let mut stream = self.get_mutable(public_key, salt, None).await?;
428
429 while let Some(item) = stream.0.recv().await {
430 if let Some(mr) = &most_recent {
431 if item.seq() == mr.seq && item.value() > &*mr.value {
432 most_recent = Some(item)
433 }
434 } else {
435 most_recent = Some(item);
436 }
437 }
438
439 Ok(most_recent)
440 }
441
442 pub async fn put_mutable(
493 &self,
494 item: MutableItem,
495 cas: Option<i64>,
496 ) -> Result<Id, PutMutableError> {
497 let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, cas));
498
499 self.put(request, None).await.map_err(|error| match error {
500 PutError::Query(err) => PutMutableError::Query(err),
501 PutError::Concurrency(err) => PutMutableError::Concurrency(err),
502 })
503 }
504
505 pub async fn get_closest_nodes(&self, target: Id) -> Result<Box<[Node]>, ActorShutdown> {
512 let (tx, rx) = oneshot::channel();
513 self.send(ActorMessage::Get(
514 GetRequestSpecific::GetValue(GetValueRequestArguments {
515 target,
516 salt: None,
517 seq: None,
518 }),
519 ResponseSender::ClosestNodes(tx),
520 ))
521 .await?;
522
523 rx.await.map_err(|_| ActorShutdown)
524 }
525
526 pub async fn put(
536 &self,
537 request: PutRequestSpecific,
538 extra_nodes: Option<Box<[Node]>>,
539 ) -> Result<Id, PutError> {
540 let (tx, rx) = oneshot::channel();
541 self.send(ActorMessage::Put(request, tx, extra_nodes))
542 .await?;
543
544 rx.await.map_err(|_| ActorShutdown)?
545 }
546
547 async fn send(&self, message: ActorMessage) -> Result<(), ActorShutdown> {
550 self.0.send(message).await.map_err(|_| ActorShutdown)
551 }
552}
553
554fn put_error_to_query_error(error: PutError) -> PutQueryError {
555 match error {
556 PutError::Query(error) => error,
557 PutError::Concurrency(_) => {
558 unreachable!("should not receive a concurrency error from this query type")
559 }
560 }
561}
562
563pub struct GetStream<T>(mpsc::UnboundedReceiver<T>);
565
566impl<T> Stream for GetStream<T> {
567 type Item = T;
568
569 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
570 let this = self.get_mut();
571 this.0.poll_recv(cx)
572 }
573}
574
575#[n0_error::stack_error(derive, from_sources, std_sources)]
576pub enum PutMutableError {
578 #[error(transparent)]
579 Query(PutQueryError),
581
582 #[error(transparent)]
583 Concurrency(ConcurrencyError),
585}
586
587impl From<ActorShutdown> for PutMutableError {
588 fn from(_: ActorShutdown) -> Self {
589 PutMutableError::Query(PutQueryError::Shutdown)
590 }
591}
592
593impl From<ActorShutdown> for PutQueryError {
594 fn from(_: ActorShutdown) -> Self {
595 PutQueryError::Shutdown
596 }
597}
598
599impl From<ActorShutdown> for PutError {
600 fn from(_: ActorShutdown) -> Self {
601 PutError::Query(PutQueryError::Shutdown)
602 }
603}
604
605#[cfg(test)]
606mod test {
607 use std::{str::FromStr, time::Duration};
608
609 use ed25519_dalek::SigningKey;
610 use futures::StreamExt;
611 #[allow(unused_imports)]
612 use rand::Rng;
613
614 use crate::core::ConcurrencyError;
615
616 use super::*;
617
618 #[tokio::test]
619 #[ignore = "hits the real mainline DHT; run with --ignored"]
620 async fn put_get_mutable_real_dht() {
621 let _ = tracing_subscriber::fmt()
622 .with_env_filter("debug")
623 .try_init();
624
625 let dht = Dht::client().unwrap();
626
627 let signer = SigningKey::from_bytes(&rand::random());
628 let key = *signer.verifying_key().as_bytes();
629 let value = b"hello from n0-mainline test";
630
631 let item = MutableItem::new(&signer, value, 1, None);
632
633 dht.put_mutable(item.clone(), None).await.unwrap();
634
635 let response = dht
636 .get_mutable(&key, None, None)
637 .await
638 .unwrap()
639 .next()
640 .await
641 .expect("should resolve mutable item from real DHT");
642
643 assert_eq!(response.value(), value.as_slice());
644 assert_eq!(response.seq(), 1);
645 }
646
647 #[tokio::test]
648 async fn bind_twice() {
649 let a = Dht::client().unwrap();
650 let result = Dht::builder()
651 .port(a.info().await.unwrap().local_addr().port())
652 .server_mode()
653 .build();
654
655 assert!(result.is_err());
656 }
657
658 #[tokio::test]
659 async fn announce_get_peer() {
660 let testnet = Testnet::new(10).await.unwrap();
661
662 let a = Dht::builder()
663 .bootstrap(&testnet.bootstrap)
664 .build()
665 .unwrap();
666 let b = Dht::builder()
667 .bootstrap(&testnet.bootstrap)
668 .build()
669 .unwrap();
670
671 let info_hash = Id::random();
672
673 a.announce_peer(info_hash, Some(45555))
674 .await
675 .expect("failed to announce");
676
677 let peers = b
678 .get_peers(info_hash)
679 .await
680 .unwrap()
681 .next()
682 .await
683 .expect("No peers");
684
685 assert_eq!(peers.first().unwrap().port(), 45555);
686 }
687
688 #[tokio::test]
689 async fn put_get_immutable() {
690 let testnet = Testnet::new(10).await.unwrap();
691
692 let a = Dht::builder()
693 .bootstrap(&testnet.bootstrap)
694 .build()
695 .unwrap();
696 let b = Dht::builder()
697 .bootstrap(&testnet.bootstrap)
698 .build()
699 .unwrap();
700
701 let value = b"Hello World!";
702 let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap();
703
704 let target = a.put_immutable(value).await.unwrap();
705 assert_eq!(target, expected_target);
706
707 let response = b.get_immutable(target).await.unwrap().unwrap();
708
709 assert_eq!(response, value.to_vec().into_boxed_slice());
710 }
711
712 #[tokio::test]
713 async fn find_node_no_values() {
714 let client = Dht::builder().no_bootstrap().build().unwrap();
715
716 client.find_node(Id::random()).await.unwrap();
717 }
718
719 #[tokio::test]
720 async fn put_get_immutable_no_values() {
721 let client = Dht::builder().no_bootstrap().build().unwrap();
722
723 assert_eq!(client.get_immutable(Id::random()).await.unwrap(), None);
724 }
725
726 #[tokio::test]
727 async fn put_get_mutable() {
728 let testnet = Testnet::new(10).await.unwrap();
729
730 let a = Dht::builder()
731 .bootstrap(&testnet.bootstrap)
732 .build()
733 .unwrap();
734 let b = Dht::builder()
735 .bootstrap(&testnet.bootstrap)
736 .build()
737 .unwrap();
738
739 let signer = SigningKey::from_bytes(&[
740 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
741 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
742 ]);
743
744 let seq = 1000;
745 let value = b"Hello World!";
746
747 let item = MutableItem::new(&signer, value, seq, None);
748
749 a.put_mutable(item.clone(), None).await.unwrap();
750
751 let response = b
752 .get_mutable(signer.verifying_key().as_bytes(), None, None)
753 .await
754 .unwrap()
755 .next()
756 .await
757 .expect("No mutable values");
758
759 assert_eq!(&response, &item);
760 }
761
762 #[tokio::test]
763 async fn put_get_mutable_no_more_recent_value() {
764 let testnet = Testnet::new(10).await.unwrap();
765
766 let a = Dht::builder()
767 .bootstrap(&testnet.bootstrap)
768 .build()
769 .unwrap();
770 let b = Dht::builder()
771 .bootstrap(&testnet.bootstrap)
772 .build()
773 .unwrap();
774
775 let signer = SigningKey::from_bytes(&[
776 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
777 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
778 ]);
779
780 let seq = 1000;
781 let value = b"Hello World!";
782
783 let item = MutableItem::new(&signer, value, seq, None);
784
785 a.put_mutable(item.clone(), None).await.unwrap();
786
787 let response = b
788 .get_mutable(signer.verifying_key().as_bytes(), None, Some(seq))
789 .await
790 .unwrap()
791 .next()
792 .await;
793
794 assert!(&response.is_none());
795 }
796
797 #[tokio::test]
798 async fn repeated_put_query() {
799 let testnet = Testnet::new(10).await.unwrap();
800
801 let a = Dht::builder()
802 .bootstrap(&testnet.bootstrap)
803 .build()
804 .unwrap();
805
806 let id = a.put_immutable(&[1, 2, 3]).await.unwrap();
807
808 assert_eq!(a.put_immutable(&[1, 2, 3]).await.unwrap(), id);
809 }
810
811 #[tokio::test]
812 async fn concurrent_get_mutable() {
813 let testnet = Testnet::new(10).await.unwrap();
814
815 let a = Dht::builder()
816 .bootstrap(&testnet.bootstrap)
817 .build()
818 .unwrap();
819 let b = Dht::builder()
820 .bootstrap(&testnet.bootstrap)
821 .build()
822 .unwrap();
823
824 let signer = SigningKey::from_bytes(&[
825 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
826 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
827 ]);
828
829 let key = *signer.verifying_key().as_bytes();
830 let seq = 1000;
831 let value = b"Hello World!";
832
833 let item = MutableItem::new(&signer, value, seq, None);
834
835 a.put_mutable(item.clone(), None).await.unwrap();
836
837 let _response_first = b
838 .get_mutable(&key, None, None)
839 .await
840 .unwrap()
841 .next()
842 .await
843 .expect("No mutable values");
844
845 let response_second = b
846 .get_mutable(&key, None, None)
847 .await
848 .unwrap()
849 .next()
850 .await
851 .expect("No mutable values");
852
853 assert_eq!(&response_second, &item);
854 }
855
856 #[tokio::test]
857 async fn concurrent_put_mutable_different_with_cas() {
858 let testnet = Testnet::new(10).await.unwrap();
859
860 let client = Dht::builder()
861 .bootstrap(&testnet.bootstrap)
862 .build()
863 .unwrap();
864
865 let signer = SigningKey::from_bytes(&[
866 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
867 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
868 ]);
869
870 {
872 let item = MutableItem::new(&signer, &[], 1000, None);
873
874 let (tx, _rx) = oneshot::channel();
875 let request =
876 PutRequestSpecific::PutMutable(PutMutableRequestArguments::from(item, None));
877 client
878 .0
879 .send(ActorMessage::Put(request, tx, None))
880 .await
881 .unwrap();
882 }
883
884 tokio::time::sleep(Duration::from_millis(100)).await;
885
886 {
888 let item = MutableItem::new(&signer, &[], 1001, None);
889
890 let most_recent = client
891 .get_mutable_most_recent(item.key(), None)
892 .await
893 .unwrap();
894
895 if let Some(cas) = most_recent.map(|item| item.seq()) {
896 client.put_mutable(item, Some(cas)).await.unwrap();
897 } else {
898 client.put_mutable(item, None).await.unwrap();
899 }
900 }
901 }
902
903 #[tokio::test]
904 async fn conflict_302_seq_less_than_current() {
905 let testnet = Testnet::new(10).await.unwrap();
906
907 let client = Dht::builder()
908 .bootstrap(&testnet.bootstrap)
909 .build()
910 .unwrap();
911
912 let signer = SigningKey::from_bytes(&[
913 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
914 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
915 ]);
916
917 client
918 .put_mutable(MutableItem::new(&signer, &[], 1001, None), None)
919 .await
920 .unwrap();
921
922 assert!(matches!(
923 client
924 .put_mutable(MutableItem::new(&signer, &[], 1000, None), None)
925 .await,
926 Err(PutMutableError::Concurrency(
927 ConcurrencyError::NotMostRecent
928 ))
929 ));
930 }
931
932 #[tokio::test]
933 async fn conflict_301_cas() {
934 let testnet = Testnet::new(10).await.unwrap();
935
936 let client = Dht::builder()
937 .bootstrap(&testnet.bootstrap)
938 .build()
939 .unwrap();
940
941 let signer = SigningKey::from_bytes(&[
942 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
943 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
944 ]);
945
946 client
947 .put_mutable(MutableItem::new(&signer, &[], 1001, None), None)
948 .await
949 .unwrap();
950
951 assert!(matches!(
952 client
953 .put_mutable(MutableItem::new(&signer, &[], 1002, None), Some(1000))
954 .await,
955 Err(PutMutableError::Concurrency(ConcurrencyError::CasFailed))
956 ));
957 }
958
959 #[tokio::test]
960 async fn populate_bootstrapping_node_routing_table() {
961 let size = 3;
962
963 let testnet = Testnet::new(size).await.unwrap();
964
965 for n in &testnet.nodes {
966 assert_eq!(n.to_bootstrap().await.unwrap().len(), size - 1);
967 }
968 }
969
970 #[tokio::test]
971 async fn bootstrap_with_one_node() {
972 let testnet = Testnet::new(1).await.unwrap();
973
974 let client = Dht::builder()
975 .bootstrap(&testnet.bootstrap)
976 .build()
977 .unwrap();
978
979 assert!(client.bootstrapped().await.unwrap());
980 }
981
982 #[cfg(feature = "unstable_signed_peers")]
983 #[tokio::test]
984 async fn announce_signed_peers_at_full_adoption() {
985 let testnet = Testnet::new(10).await.unwrap();
986
987 let a = Dht::builder()
988 .bootstrap(&testnet.bootstrap)
989 .build()
990 .unwrap();
991 let b = Dht::builder()
992 .bootstrap(&testnet.bootstrap)
993 .build()
994 .unwrap();
995
996 let info_hash = Id::random();
997
998 let signers = [0, 1, 2]
999 .iter()
1000 .map(|_| {
1001 let mut secret_key = [0; 32];
1002 rand::rng().fill_bytes(&mut secret_key);
1003 SigningKey::from_bytes(&secret_key)
1004 })
1005 .collect::<Vec<_>>();
1006
1007 let mut expected_keys = signers
1008 .iter()
1009 .map(|s| s.verifying_key().as_bytes().to_vec())
1010 .collect::<Vec<_>>();
1011 expected_keys.sort();
1012
1013 for signer in signers {
1014 a.announce_signed_peer(info_hash, &signer)
1015 .await
1016 .expect("failed to announce");
1017 }
1018
1019 let peers = b
1020 .get_signed_peers(info_hash)
1021 .await
1022 .unwrap()
1023 .next()
1024 .await
1025 .expect("No peers");
1026
1027 let mut keys = peers.iter().map(|a| a.key().to_vec()).collect::<Vec<_>>();
1028 keys.sort();
1029
1030 assert_eq!(keys, expected_keys);
1031 }
1032
1033 #[cfg(feature = "unstable_signed_peers")]
1034 #[tokio::test]
1035 async fn announce_signed_peers_at_low_adoption() {
1036 let testnet_legacy = Testnet::new_without_signed_peers(10).await.unwrap();
1037
1038 let signers = [0, 1, 2]
1039 .iter()
1040 .map(|_| {
1041 let mut secret_key = [0; 32];
1042 rand::rng().fill_bytes(&mut secret_key);
1043 SigningKey::from_bytes(&secret_key)
1044 })
1045 .collect::<Vec<_>>();
1046
1047 let mut expected_keys = signers
1048 .iter()
1049 .map(|s| s.verifying_key().as_bytes().to_vec())
1050 .collect::<Vec<_>>();
1051 expected_keys.sort();
1052
1053 let info_hash = Id::random();
1054
1055 {
1057 let a = Dht::builder()
1058 .bootstrap(&testnet_legacy.bootstrap)
1059 .disable_signed_peers()
1060 .build()
1061 .unwrap();
1062 assert!(
1063 a.announce_signed_peer(info_hash, &signers[0])
1064 .await
1065 .is_err()
1066 );
1067 assert_eq!(
1068 a.get_signed_peers(info_hash).await.unwrap().next().await,
1069 None
1070 )
1071 }
1072
1073 {
1074 let testnet_new = Testnet::new_with_bootstrap(3, &testnet_legacy.bootstrap)
1075 .await
1076 .unwrap();
1077
1078 let bootstrap = testnet_new.bootstrap;
1079
1080 let a = Dht::builder().bootstrap(&bootstrap).build().unwrap();
1081 let b = Dht::builder().bootstrap(&bootstrap).build().unwrap();
1082
1083 for signer in &signers {
1084 a.announce_signed_peer(info_hash, signer)
1085 .await
1086 .expect("failed to announce");
1087 }
1088
1089 let peers = b
1090 .get_signed_peers(info_hash)
1091 .await
1092 .unwrap()
1093 .next()
1094 .await
1095 .expect("No peers");
1096
1097 let mut keys = peers.iter().map(|a| a.key().to_vec()).collect::<Vec<_>>();
1098 keys.sort();
1099
1100 assert_eq!(keys, expected_keys);
1101 }
1102 }
1103}