1mod actors;
154mod channels;
155mod config;
156mod metrics;
157mod network;
158mod types;
159
160use thiserror::Error;
161
162#[derive(Error, Debug)]
164pub enum Error {
165 #[error("message too large: {0}")]
166 MessageTooLarge(usize),
167 #[error("network closed")]
168 NetworkClosed,
169}
170
171pub use actors::tracker::Oracle;
172pub use channels::{Receiver, Sender};
173pub use config::Config;
174pub use network::Network;
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::{Address, Blocker, Ingress, Manager, Receiver, Recipients, Sender};
180 use commonware_cryptography::{ed25519, Signer as _};
181 use commonware_macros::{select, test_group, test_traced};
182 use commonware_runtime::{
183 count_running_tasks, deterministic, tokio, Clock, Metrics, Network as RNetwork, Quota,
184 Resolver, Runner, Spawner,
185 };
186 use commonware_utils::{
187 hostname,
188 ordered::{Map, Set},
189 Hostname, TryCollect, NZU32,
190 };
191 use futures::{channel::mpsc, SinkExt, StreamExt};
192 use rand_core::{CryptoRngCore, RngCore};
193 use std::{
194 collections::HashSet,
195 net::{IpAddr, Ipv4Addr, SocketAddr},
196 time::Duration,
197 };
198
199 #[derive(Copy, Clone)]
200 enum Mode {
201 All,
202 Some,
203 One,
204 }
205
206 const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
208
209 fn assert_no_rate_limiting(context: &impl Metrics) {
218 let metrics = context.encode();
219 assert!(
220 !metrics.contains("messages_rate_limited_total{"),
221 "no messages should be rate limited: {metrics}"
222 );
223 }
224
225 async fn run_network(
230 context: impl Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
231 max_message_size: u32,
232 base_port: u16,
233 n: usize,
234 mode: Mode,
235 ) {
236 let mut peers_and_sks = Vec::new();
238 for i in 0..n {
239 let private_key = ed25519::PrivateKey::from_seed(i as u64);
240 let public_key = private_key.public_key();
241 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
242 peers_and_sks.push((private_key, public_key, address));
243 }
244 let peers = peers_and_sks
245 .iter()
246 .map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
247 .collect::<Vec<_>>();
248
249 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
251 for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
252 let public_key = public_key.clone();
253
254 let context = context.with_label(&format!("peer_{i}"));
256
257 let config = Config::test(private_key.clone(), *address, max_message_size);
259 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
260
261 oracle.update(0, peers.clone().try_into().unwrap()).await;
263
264 let (mut sender, mut receiver) =
266 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
267
268 network.start();
270
271 let mut public_keys = peers
273 .iter()
274 .filter_map(|(pk, _)| {
275 if pk != &public_key {
276 Some(pk.clone())
277 } else {
278 None
279 }
280 })
281 .collect::<Vec<_>>();
282 public_keys.sort();
283 context.with_label("agent").spawn({
284 let mut complete_sender = complete_sender.clone();
285 let peers = peers.clone();
286 move |context| async move {
287 let receiver = context.with_label("receiver").spawn(move |_| async move {
289 let mut received = HashSet::new();
291 while received.len() < n - 1 {
292 let (sender, message) = receiver.recv().await.unwrap();
294 assert_eq!(sender.as_ref(), message.as_ref());
295
296 received.insert(sender);
298 }
299 complete_sender.send(()).await.unwrap();
300
301 loop {
303 receiver.recv().await.unwrap();
304 }
305 });
306
307 let sender = context
309 .with_label("sender")
310 .spawn(move |context| async move {
311 loop {
313 match mode {
314 Mode::One => {
315 for (j, (pub_key, _)) in peers.iter().enumerate() {
316 if i == j {
318 continue;
319 }
320
321 loop {
323 let sent = sender
324 .send(
325 Recipients::One(pub_key.clone()),
326 public_key.as_ref(),
327 true,
328 )
329 .await
330 .unwrap();
331 if sent.len() != 1 {
332 context.sleep(Duration::from_millis(100)).await;
333 continue;
334 }
335 assert_eq!(&sent[0], pub_key);
336 break;
337 }
338 }
339 }
340 Mode::Some => {
341 let mut recipients = peers.clone();
343 recipients.remove(i);
344 recipients.sort();
345
346 loop {
348 let mut sent = sender
349 .send(
350 Recipients::Some(public_keys.clone()),
351 public_key.as_ref(),
352 true,
353 )
354 .await
355 .unwrap();
356 if sent.len() != n - 1 {
357 context.sleep(Duration::from_millis(100)).await;
358 continue;
359 }
360
361 sent.sort();
363 assert_eq!(sent, public_keys);
364 break;
365 }
366 }
367 Mode::All => {
368 let mut recipients = peers.clone();
370 recipients.remove(i);
371 recipients.sort();
372
373 loop {
375 let mut sent = sender
376 .send(Recipients::All, public_key.as_ref(), true)
377 .await
378 .unwrap();
379 if sent.len() != n - 1 {
380 context.sleep(Duration::from_millis(100)).await;
381 continue;
382 }
383
384 sent.sort();
386 assert_eq!(sent, public_keys);
387 break;
388 }
389 }
390 };
391
392 context.sleep(Duration::from_secs(10)).await;
394 }
395 });
396
397 select! {
399 receiver = receiver => {
400 panic!("receiver exited: {receiver:?}");
401 },
402 sender = sender => {
403 panic!("sender exited: {sender:?}");
404 },
405 }
406 }
407 });
408 }
409
410 for _ in 0..n {
412 complete_receiver.next().await.unwrap();
413 }
414
415 assert_no_rate_limiting(&context);
417 }
418
419 fn run_deterministic_test(seed: u64, mode: Mode) {
420 const NUM_PEERS: usize = 25;
422 const BASE_PORT: u16 = 3000;
423
424 let executor = deterministic::Runner::seeded(seed);
426 let state = executor.start(|context| async move {
427 run_network(
428 context.clone(),
429 MAX_MESSAGE_SIZE,
430 BASE_PORT,
431 NUM_PEERS,
432 mode,
433 )
434 .await;
435 context.auditor().state()
436 });
437
438 let executor = deterministic::Runner::seeded(seed);
440 let state2 = executor.start(|context| async move {
441 run_network(
442 context.clone(),
443 MAX_MESSAGE_SIZE,
444 BASE_PORT,
445 NUM_PEERS,
446 mode,
447 )
448 .await;
449 context.auditor().state()
450 });
451 assert_eq!(state, state2);
452 }
453
454 #[test_group("slow")]
455 #[test_traced]
456 fn test_determinism_one() {
457 for i in 0..10 {
458 run_deterministic_test(i, Mode::One);
459 }
460 }
461
462 #[test_group("slow")]
463 #[test_traced]
464 fn test_determinism_some() {
465 for i in 0..10 {
466 run_deterministic_test(i, Mode::Some);
467 }
468 }
469
470 #[test_group("slow")]
471 #[test_traced]
472 fn test_determinism_all() {
473 for i in 0..10 {
474 run_deterministic_test(i, Mode::All);
475 }
476 }
477
478 #[test_traced]
479 fn test_tokio_connectivity() {
480 let executor = tokio::Runner::default();
481 executor.start(|context| async move {
482 let base_port = 4000;
483 let n = 10;
484 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
485 });
486 }
487
488 #[test_traced]
489 fn test_multi_index_oracle() {
490 let base_port = 3000;
492 let n: usize = 10;
493
494 let executor = deterministic::Runner::default();
496 executor.start(|context| async move {
497 let mut peers_and_sks = Vec::new();
499 for i in 0..n {
500 let sk = ed25519::PrivateKey::from_seed(i as u64);
501 let pk = sk.public_key();
502 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
503 peers_and_sks.push((sk, pk, addr));
504 }
505 let peers = peers_and_sks
506 .iter()
507 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
508 .collect::<Vec<_>>();
509
510 let mut waiters = Vec::new();
512 for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
513 let context = context.with_label(&format!("peer_{i}"));
515
516 let config = Config::test(
518 peer_sk.clone(),
519 *peer_addr,
520 1_024 * 1_024, );
522 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
523
524 oracle
526 .update(0, [peers[0].clone()].try_into().unwrap())
527 .await;
528 oracle
529 .update(1, [peers[1].clone(), peers[2].clone()].try_into().unwrap())
530 .await;
531 oracle
532 .update(2, peers.iter().skip(2).cloned().try_collect().unwrap())
533 .await;
534
535 let (mut sender, mut receiver) =
537 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
538
539 network.start();
541
542 let msg = peer_pk.clone();
544 let handler = context
545 .with_label("agent")
546 .spawn(move |context| async move {
547 if i == 0 {
548 loop {
550 if sender
551 .send(Recipients::All, msg.as_ref(), true)
552 .await
553 .unwrap()
554 .len()
555 == n - 1
556 {
557 break;
558 }
559
560 context.sleep(Duration::from_millis(100)).await;
562 }
563 } else {
564 let (sender, message) = receiver.recv().await.unwrap();
566 assert_eq!(sender.as_ref(), message.as_ref());
567 }
568 });
569
570 waiters.push(handler);
572 }
573
574 for waiter in waiters.into_iter().rev() {
576 waiter.await.unwrap();
577 }
578
579 assert_no_rate_limiting(&context);
581 });
582 }
583
584 #[test_traced]
585 fn test_message_too_large() {
586 let base_port = 3000;
588 let n: usize = 2;
589
590 let executor = deterministic::Runner::seeded(0);
592 executor.start(|mut context| async move {
593 let mut peers_and_sks = Vec::new();
595 for i in 0..n {
596 let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
597 let peer_pk = peer_sk.public_key();
598 let peer_addr =
599 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
600 peers_and_sks.push((peer_sk, peer_pk, peer_addr));
601 }
602 let peers: Map<_, _> = peers_and_sks
603 .iter()
604 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
605 .try_collect()
606 .unwrap();
607
608 let (sk, _, addr) = peers_and_sks[0].clone();
610 let config = Config::test(
611 sk,
612 addr,
613 1_024 * 1_024, );
615 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
616
617 oracle.update(0, peers.clone()).await;
619
620 let (mut sender, _) =
622 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
623
624 network.start();
626
627 let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
630
631 let recipient = Recipients::One(peers[1].clone());
633 let result = sender.send(recipient, &msg[..], true).await;
634 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
635 });
636 }
637
638 #[test_traced]
639 fn test_rate_limiting() {
640 let base_port = 3000;
642 let n: usize = 2;
643
644 let executor = deterministic::Runner::seeded(0);
646 executor.start(|context| async move {
647 let mut peers_and_sks = Vec::new();
649 for i in 0..n {
650 let sk = ed25519::PrivateKey::from_seed(i as u64);
651 let pk = sk.public_key();
652 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
653 peers_and_sks.push((sk, pk, addr));
654 }
655 let peers: Map<_, _> = peers_and_sks
656 .iter()
657 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
658 .try_collect()
659 .unwrap();
660 let (sk0, _, addr0) = peers_and_sks[0].clone();
661 let (sk1, pk1, addr1) = peers_and_sks[1].clone();
662
663 let config0 = Config::test(sk0, addr0, 1_024 * 1_024); let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
666 oracle0.update(0, peers.clone()).await;
667 let (mut sender0, _receiver0) =
668 network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
669 network0.start();
670
671 let config1 = Config::test(sk1, addr1, 1_024 * 1_024); let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
674 oracle1.update(0, peers.clone()).await;
675 let (_sender1, _receiver1) =
676 network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
677 network1.start();
678
679 let msg = vec![0u8; 1024]; loop {
682 let sent = sender0
684 .send(Recipients::One(pk1.clone()), &msg[..], true)
685 .await
686 .unwrap();
687 if !sent.is_empty() {
688 break;
689 }
690
691 context.sleep(Duration::from_mins(1)).await
694 }
695
696 let sent = sender0
700 .send(Recipients::One(pk1), &msg[..], true)
701 .await
702 .unwrap();
703 assert!(sent.is_empty());
704
705 for _ in 0..10 {
707 assert_no_rate_limiting(&context);
708 context.sleep(Duration::from_millis(100)).await;
709 }
710 });
711 }
712
713 #[test_traced]
714 fn test_unordered_peer_sets() {
715 let (n, base_port) = (10, 3000);
716 let executor = deterministic::Runner::default();
717 executor.start(|context| async move {
718 let mut peers_and_sks = Vec::new();
720 for i in 0..n {
721 let sk = ed25519::PrivateKey::from_seed(i as u64);
722 let pk = sk.public_key();
723 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
724 peers_and_sks.push((sk, pk, addr));
725 }
726 let peer0 = peers_and_sks[0].clone();
727 let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
728 let (network, mut oracle) = Network::new(context.with_label("network"), config);
729 network.start();
730
731 let mut subscription = oracle.subscribe().await;
733
734 let set10: Map<_, _> = peers_and_sks
736 .iter()
737 .take(2)
738 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
739 .try_collect()
740 .unwrap();
741 oracle.update(10, set10.clone()).await;
742 let (id, new, all) = subscription.next().await.unwrap();
743 assert_eq!(id, 10);
744 assert_eq!(&new, set10.keys());
745 assert_eq!(&all, set10.keys());
746
747 let set9: Map<_, _> = peers_and_sks
749 .iter()
750 .skip(2)
751 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
752 .try_collect()
753 .unwrap();
754 oracle.update(9, set9.clone()).await;
755
756 let set11: Map<_, _> = peers_and_sks
758 .iter()
759 .skip(4)
760 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
761 .try_collect()
762 .unwrap();
763 oracle.update(11, set11.clone()).await;
764 let (id, new, all) = subscription.next().await.unwrap();
765 assert_eq!(id, 11);
766 assert_eq!(&new, set11.keys());
767 let all_keys: Set<_> = set10
768 .into_keys()
769 .into_iter()
770 .chain(set11.into_keys().into_iter())
771 .try_collect()
772 .unwrap();
773 assert_eq!(all, all_keys);
774 });
775 }
776
777 #[test_traced]
778 fn test_graceful_shutdown() {
779 let base_port = 3000;
780 let n: usize = 5;
781
782 let executor = deterministic::Runner::default();
783 executor.start(|context| async move {
784 let mut peers_and_sks = Vec::new();
786 for i in 0..n {
787 let sk = ed25519::PrivateKey::from_seed(i as u64);
788 let pk = sk.public_key();
789 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
790 peers_and_sks.push((sk, pk, addr));
791 }
792 let peers: Map<_, _> = peers_and_sks
793 .iter()
794 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
795 .try_collect()
796 .unwrap();
797
798 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
800 for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
801 let peer_context = context.with_label(&format!("peer_{i}"));
802 let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
803 let (mut network, mut oracle) =
804 Network::new(peer_context.with_label("network"), config);
805
806 oracle.update(0, peers.clone()).await;
808
809 let (mut sender, mut receiver) =
810 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
811 network.start();
812
813 peer_context.with_label("agent").spawn({
814 let mut complete_sender = complete_sender.clone();
815 let pk = pk.clone();
816 move |context| async move {
817 let expected_connections = if i == 0 { n - 1 } else { 1 };
819
820 loop {
822 let sent = sender
823 .send(Recipients::All, pk.as_ref(), true)
824 .await
825 .unwrap();
826 if sent.len() >= expected_connections {
827 break;
828 }
829 context.sleep(Duration::from_millis(100)).await;
830 }
831
832 complete_sender.send(()).await.unwrap();
834
835 loop {
837 select! {
838 result = receiver.recv() => {
839 if result.is_err() {
840 break;
842 }
843 },
844 _ = context.stopped() => {
845 break;
847 }
848 }
849 }
850 }
851 });
852 }
853
854 for _ in 0..n {
856 complete_receiver.next().await.unwrap();
857 }
858
859 let metrics_before = context.encode();
861 let is_running = |name: &str| -> bool {
862 metrics_before.lines().any(|line| {
863 line.starts_with("runtime_tasks_running{")
864 && line.contains(&format!("name=\"{name}\""))
865 && line.contains("kind=\"Task\"")
866 && line.trim_end().ends_with(" 1")
867 })
868 };
869 for i in 0..n {
870 let prefix = format!("peer_{i}_network");
871 assert!(
872 is_running(&format!("{prefix}_tracker")),
873 "peer_{i} tracker should be running"
874 );
875 assert!(
876 is_running(&format!("{prefix}_router")),
877 "peer_{i} router should be running"
878 );
879 assert!(
880 is_running(&format!("{prefix}_spawner")),
881 "peer_{i} spawner should be running"
882 );
883 assert!(
884 is_running(&format!("{prefix}_listener")),
885 "peer_{i} listener should be running"
886 );
887 assert!(
888 is_running(&format!("{prefix}_dialer")),
889 "peer_{i} dialer should be running"
890 );
891 }
892
893 let shutdown_context = context.clone();
895 context.with_label("shutdown").spawn(move |_| async move {
896 let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
898
899 assert!(
901 result.is_ok(),
902 "graceful shutdown should complete: {result:?}"
903 );
904 });
905
906 context.stopped().await.unwrap();
908
909 context.sleep(Duration::from_millis(100)).await;
911
912 let metrics_after = context.encode();
914 let is_stopped = |name: &str| -> bool {
915 metrics_after.lines().any(|line| {
916 line.starts_with("runtime_tasks_running{")
917 && line.contains(&format!("name=\"{name}\""))
918 && line.contains("kind=\"Task\"")
919 && line.trim_end().ends_with(" 0")
920 })
921 };
922 for i in 0..n {
923 let prefix = format!("peer_{i}_network");
924 assert!(
925 is_stopped(&format!("{prefix}_tracker")),
926 "peer_{i} tracker should be stopped"
927 );
928 assert!(
929 is_stopped(&format!("{prefix}_router")),
930 "peer_{i} router should be stopped"
931 );
932 assert!(
933 is_stopped(&format!("{prefix}_spawner")),
934 "peer_{i} spawner should be stopped"
935 );
936 assert!(
937 is_stopped(&format!("{prefix}_listener")),
938 "peer_{i} listener should be stopped"
939 );
940 assert!(
941 is_stopped(&format!("{prefix}_dialer")),
942 "peer_{i} dialer should be stopped"
943 );
944 }
945 });
946 }
947
948 #[test_traced]
949 fn test_subscription_includes_self_when_registered() {
950 let base_port = 3000;
951 let executor = deterministic::Runner::default();
952 executor.start(|context| async move {
953 let self_sk = ed25519::PrivateKey::from_seed(0);
955 let self_pk = self_sk.public_key();
956 let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
957
958 let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
959 let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
960
961 let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
963 let (network, mut oracle) = Network::new(context.with_label("network"), config);
964 network.start();
965
966 let mut subscription = oracle.subscribe().await;
968
969 let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
971 oracle.update(1, peer_set.clone()).await;
972
973 let (id, new, all) = subscription.next().await.unwrap();
975 assert_eq!(id, 1);
976 assert_eq!(new.len(), 1);
977 assert_eq!(all.len(), 1);
978
979 assert!(
981 new.position(&self_pk).is_none(),
982 "new set should not include self"
983 );
984 assert!(
985 new.position(&other_pk).is_some(),
986 "new set should include other"
987 );
988
989 assert!(
991 all.position(&self_pk).is_none(),
992 "tracked peers should not include self"
993 );
994 assert!(
995 all.position(&other_pk).is_some(),
996 "tracked peers should include other"
997 );
998
999 let peer_set: Map<_, _> = [
1001 (self_pk.clone(), self_addr.into()),
1002 (other_pk.clone(), other_addr.into()),
1003 ]
1004 .try_into()
1005 .unwrap();
1006 oracle.update(2, peer_set.clone()).await;
1007
1008 let (id, new, all) = subscription.next().await.unwrap();
1010 assert_eq!(id, 2);
1011 assert_eq!(new.len(), 2);
1012 assert_eq!(all.len(), 2);
1013
1014 assert!(
1016 new.position(&self_pk).is_some(),
1017 "new set should include self"
1018 );
1019 assert!(
1020 new.position(&other_pk).is_some(),
1021 "new set should include other"
1022 );
1023
1024 assert!(
1026 all.position(&self_pk).is_some(),
1027 "tracked peers should include self"
1028 );
1029 assert!(
1030 all.position(&other_pk).is_some(),
1031 "tracked peers should include other"
1032 );
1033 });
1034 }
1035
1036 #[test_traced]
1037 fn test_dns_peer_addresses() {
1038 let base_port = 3200;
1039 let n: usize = 3;
1040
1041 let executor = deterministic::Runner::default();
1042 executor.start(|context| async move {
1043 let mut peers_and_sks = Vec::new();
1045 for i in 0..n {
1046 let private_key = ed25519::PrivateKey::from_seed(i as u64);
1047 let public_key = private_key.public_key();
1048 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1049 let host_str = format!("peer-{i}.local");
1050 let host = Hostname::new(&host_str).unwrap();
1051 peers_and_sks.push((private_key, public_key, socket, host_str, host));
1052 }
1053
1054 for (_, _, socket, host_str, _) in &peers_and_sks {
1056 context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
1057 }
1058
1059 let peers: Vec<(_, Address)> = peers_and_sks
1061 .iter()
1062 .map(|(_, pk, socket, _, host)| {
1063 (
1064 pk.clone(),
1065 Address::Asymmetric {
1066 ingress: Ingress::Dns {
1067 host: host.clone(),
1068 port: socket.port(),
1069 },
1070 egress: *socket,
1071 },
1072 )
1073 })
1074 .collect();
1075
1076 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1078 for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
1079 let context = context.with_label(&format!("peer_{i}"));
1080
1081 let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1083 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1084
1085 oracle.update(0, peers.clone().try_into().unwrap()).await;
1087
1088 let (mut sender, mut receiver) =
1090 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1091
1092 network.start();
1093
1094 let pk = public_key.clone();
1096 context.with_label("agent").spawn({
1097 let mut complete_sender = complete_sender.clone();
1098 let peers = peers.clone();
1099 move |context| async move {
1100 let receiver = context.with_label("receiver").spawn(move |_| async move {
1102 let mut received = HashSet::new();
1103 while received.len() < n - 1 {
1104 let (sender, message) = receiver.recv().await.unwrap();
1105 assert_eq!(sender.as_ref(), message.as_ref());
1106 received.insert(sender);
1107 }
1108 complete_sender.send(()).await.unwrap();
1109
1110 loop {
1111 receiver.recv().await.unwrap();
1112 }
1113 });
1114
1115 let sender_task =
1117 context
1118 .with_label("sender")
1119 .spawn(move |context| async move {
1120 loop {
1121 let mut recipients: Vec<_> = peers
1122 .iter()
1123 .filter(|(p, _)| p != &pk)
1124 .map(|(p, _)| p.clone())
1125 .collect();
1126 recipients.sort();
1127
1128 loop {
1129 let mut sent = sender
1130 .send(Recipients::All, pk.as_ref(), true)
1131 .await
1132 .unwrap();
1133 if sent.len() != n - 1 {
1134 context.sleep(Duration::from_millis(100)).await;
1135 continue;
1136 }
1137 sent.sort();
1138 assert_eq!(sent, recipients);
1139 break;
1140 }
1141
1142 context.sleep(Duration::from_secs(10)).await;
1143 }
1144 });
1145
1146 select! {
1147 receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1148 sender = sender_task => { panic!("sender exited: {sender:?}") },
1149 }
1150 }
1151 });
1152 }
1153
1154 for _ in 0..n {
1156 complete_receiver.next().await.unwrap();
1157 }
1158
1159 assert_no_rate_limiting(&context);
1160 });
1161 }
1162
1163 #[test_traced]
1164 fn test_mixed_socket_and_dns_addresses() {
1165 let base_port = 3300;
1166 let n: usize = 4;
1167
1168 let executor = deterministic::Runner::default();
1169 executor.start(|context| async move {
1170 let mut peers_and_sks = Vec::new();
1172 for i in 0..n {
1173 let private_key = ed25519::PrivateKey::from_seed(i as u64);
1174 let public_key = private_key.public_key();
1175 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1176 peers_and_sks.push((private_key, public_key, socket));
1177 }
1178
1179 for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
1181 context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
1182 }
1183
1184 let peers: Vec<(_, Address)> = peers_and_sks
1186 .iter()
1187 .enumerate()
1188 .map(|(i, (_, pk, socket))| {
1189 let addr = if i < 2 {
1190 Address::Symmetric(*socket)
1192 } else {
1193 Address::Asymmetric {
1195 ingress: Ingress::Dns {
1196 host: hostname!(&format!("peer-{i}.local")),
1197 port: socket.port(),
1198 },
1199 egress: *socket,
1200 }
1201 };
1202 (pk.clone(), addr)
1203 })
1204 .collect();
1205
1206 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1208 for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
1209 let context = context.with_label(&format!("peer_{i}"));
1210
1211 let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1213 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1214
1215 oracle.update(0, peers.clone().try_into().unwrap()).await;
1217
1218 let (mut sender, mut receiver) =
1220 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1221
1222 network.start();
1223
1224 let pk = public_key.clone();
1226 context.with_label("agent").spawn({
1227 let mut complete_sender = complete_sender.clone();
1228 let peers = peers.clone();
1229 move |context| async move {
1230 let receiver = context.with_label("receiver").spawn(move |_| async move {
1232 let mut received = HashSet::new();
1233 while received.len() < n - 1 {
1234 let (sender, message) = receiver.recv().await.unwrap();
1235 assert_eq!(sender.as_ref(), message.as_ref());
1236 received.insert(sender);
1237 }
1238 complete_sender.send(()).await.unwrap();
1239
1240 loop {
1241 receiver.recv().await.unwrap();
1242 }
1243 });
1244
1245 let sender_task =
1247 context
1248 .with_label("sender")
1249 .spawn(move |context| async move {
1250 loop {
1251 let mut recipients: Vec<_> = peers
1252 .iter()
1253 .filter(|(p, _)| p != &pk)
1254 .map(|(p, _)| p.clone())
1255 .collect();
1256 recipients.sort();
1257
1258 loop {
1259 let mut sent = sender
1260 .send(Recipients::All, pk.as_ref(), true)
1261 .await
1262 .unwrap();
1263 if sent.len() != n - 1 {
1264 context.sleep(Duration::from_millis(100)).await;
1265 continue;
1266 }
1267 sent.sort();
1268 assert_eq!(sent, recipients);
1269 break;
1270 }
1271
1272 context.sleep(Duration::from_secs(10)).await;
1273 }
1274 });
1275
1276 select! {
1277 receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1278 sender = sender_task => { panic!("sender exited: {sender:?}") },
1279 }
1280 }
1281 });
1282 }
1283
1284 for _ in 0..n {
1286 complete_receiver.next().await.unwrap();
1287 }
1288
1289 assert_no_rate_limiting(&context);
1290 });
1291 }
1292
1293 #[test_traced]
1294 fn test_dns_resolving_to_private_ip_not_dialed() {
1295 let base_port = 4400;
1298 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1299 executor.start(|context| async move {
1300 let peer0 = ed25519::PrivateKey::from_seed(0);
1301 let peer1 = ed25519::PrivateKey::from_seed(1);
1302
1303 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1304 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1305
1306 context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
1308
1309 let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1311 config0.allow_private_ips = true;
1312 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1313
1314 let peers0: Vec<(_, Address)> = vec![
1316 (peer0.public_key(), Address::Symmetric(socket0)),
1317 (peer1.public_key(), Address::Symmetric(socket1)),
1318 ];
1319 oracle0.update(0, peers0.try_into().unwrap()).await;
1320
1321 let (_sender0, mut receiver0) =
1322 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1323 network0.start();
1324
1325 let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1327 config1.allow_private_ips = false; let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1329
1330 let peers1: Vec<(_, Address)> = vec![
1332 (
1333 peer0.public_key(),
1334 Address::Asymmetric {
1335 ingress: Ingress::Dns {
1336 host: hostname!("peer-0.local"),
1337 port: socket0.port(),
1338 },
1339 egress: socket0,
1340 },
1341 ),
1342 (peer1.public_key(), Address::Symmetric(socket1)),
1343 ];
1344 oracle1.update(0, peers1.try_into().unwrap()).await;
1345
1346 let (mut sender1, _receiver1) =
1347 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1348 network1.start();
1349
1350 context.sleep(Duration::from_secs(5)).await;
1352
1353 let sent = sender1
1355 .send(Recipients::All, peer1.public_key().as_ref(), true)
1356 .await
1357 .unwrap();
1358 assert!(
1359 sent.is_empty(),
1360 "peer 1 should not have connected to peer 0 (private IP)"
1361 );
1362
1363 select! {
1365 msg = receiver0.recv() => {
1366 panic!("peer 0 should not have received any message, got: {msg:?}");
1367 },
1368 _ = context.sleep(Duration::from_secs(1)) => {
1369 }
1371 }
1372 });
1373 }
1374
1375 #[test_traced]
1376 fn test_dns_mixed_ips_connectivity() {
1377 for seed in 0..25 {
1383 let base_port = 3500;
1384
1385 let cfg = deterministic::Config::default()
1386 .with_seed(seed)
1387 .with_timeout(Some(Duration::from_secs(120)));
1388 let executor = deterministic::Runner::new(cfg);
1389 executor.start(|context| async move {
1390 let peer0 = ed25519::PrivateKey::from_seed(0);
1391 let peer1 = ed25519::PrivateKey::from_seed(1);
1392
1393 let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1394 let socket0 = SocketAddr::new(good_ip, base_port);
1395 let socket1 = SocketAddr::new(good_ip, base_port + 1);
1396
1397 let mut all_ips0: Vec<IpAddr> = (1..=3)
1399 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1400 .collect();
1401 all_ips0.push(good_ip);
1402 context.resolver_register("peer-0.local", Some(all_ips0));
1403
1404 let mut all_ips1: Vec<IpAddr> = (1..=3)
1405 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1406 .collect();
1407 all_ips1.push(good_ip);
1408 context.resolver_register("peer-1.local", Some(all_ips1));
1409
1410 let peers: Vec<(_, Address)> = vec![
1412 (
1413 peer0.public_key(),
1414 Address::Asymmetric {
1415 ingress: Ingress::Dns {
1416 host: hostname!("peer-0.local"),
1417 port: base_port,
1418 },
1419 egress: socket0,
1420 },
1421 ),
1422 (
1423 peer1.public_key(),
1424 Address::Asymmetric {
1425 ingress: Ingress::Dns {
1426 host: hostname!("peer-1.local"),
1427 port: base_port + 1,
1428 },
1429 egress: socket1,
1430 },
1431 ),
1432 ];
1433
1434 let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1436 let (mut network0, mut oracle0) =
1437 Network::new(context.with_label("peer_0"), config0);
1438 oracle0.update(0, peers.clone().try_into().unwrap()).await;
1439 let (_sender0, mut receiver0) =
1440 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1441 network0.start();
1442
1443 let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1445 let (mut network1, mut oracle1) =
1446 Network::new(context.with_label("peer_1"), config1);
1447 oracle1.update(0, peers.clone().try_into().unwrap()).await;
1448 let (mut sender1, _receiver1) =
1449 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1450 network1.start();
1451
1452 let pk0 = peer0.public_key();
1454 loop {
1455 let sent = sender1
1456 .send(
1457 Recipients::One(pk0.clone()),
1458 peer1.public_key().as_ref(),
1459 true,
1460 )
1461 .await
1462 .unwrap();
1463 if !sent.is_empty() {
1464 break;
1465 }
1466 context.sleep(Duration::from_millis(100)).await;
1467 }
1468
1469 let (sender, msg) = receiver0.recv().await.unwrap();
1471 assert_eq!(sender, peer1.public_key());
1472 assert_eq!(msg.as_ref(), peer1.public_key().as_ref());
1473 });
1474 }
1475 }
1476
1477 #[test_traced]
1478 fn test_many_peer_restart_with_new_address() {
1479 let base_port = 9500;
1480 let n = 5;
1481
1482 let executor = deterministic::Runner::default();
1483 executor.start(|context| async move {
1484 let peers: Vec<_> = (0..n)
1486 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1487 .collect();
1488 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1489
1490 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1492 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1493 let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1494 let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1495 (0..n).map(|_| None).collect();
1496
1497 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1499
1500 let peer_set: Vec<(_, Address)> = addresses
1502 .iter()
1503 .enumerate()
1504 .map(|(i, pk)| {
1505 (
1506 pk.clone(),
1507 Address::Symmetric(SocketAddr::new(
1508 IpAddr::V4(Ipv4Addr::LOCALHOST),
1509 ports[i],
1510 )),
1511 )
1512 })
1513 .collect();
1514
1515 for (i, peer) in peers.iter().enumerate() {
1517 let peer_context = context.with_label(&format!("peer_{i}"));
1518
1519 let config = Config::test(
1520 peer.clone(),
1521 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1522 MAX_MESSAGE_SIZE,
1523 );
1524 let (mut network, mut oracle) =
1525 Network::new(peer_context.with_label("network"), config);
1526
1527 oracle.update(0, peer_set.clone().try_into().unwrap()).await;
1529
1530 let (sender, receiver) =
1531 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1532 senders[i] = Some(sender);
1533 receivers[i] = Some(receiver);
1534 oracles[i] = Some(oracle);
1535
1536 let handle = network.start();
1537 handles[i] = Some(handle);
1538 }
1539
1540 for (i, sender) in senders.iter_mut().enumerate() {
1542 let sender = sender.as_mut().unwrap();
1543 loop {
1544 let sent = sender
1545 .send(Recipients::All, peers[i].public_key().as_ref(), true)
1546 .await
1547 .unwrap();
1548 if sent.len() == n - 1 {
1549 break;
1550 }
1551 context.sleep(Duration::from_millis(100)).await;
1552 }
1553 }
1554
1555 for receiver in receivers.iter_mut() {
1557 let receiver = receiver.as_mut().unwrap();
1558 let mut received = HashSet::new();
1559 while received.len() < n - 1 {
1560 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1561 assert_eq!(sender.as_ref(), message.as_ref());
1562 received.insert(sender);
1563 }
1564 }
1565
1566 let mut restart_counter = 0u16;
1568 for round in 0..3 {
1569 for restart_peer_idx in 1..n {
1570 restart_counter += 1;
1572 let new_port = base_port + 100 + restart_counter;
1573 ports[restart_peer_idx] = new_port;
1574
1575 if let Some(handle) = handles[restart_peer_idx].take() {
1577 handle.abort();
1578 }
1579 senders[restart_peer_idx] = None;
1580 receivers[restart_peer_idx] = None;
1581 oracles[restart_peer_idx] = None;
1582
1583 let updated_peer_set: Vec<(_, Address)> = addresses
1585 .iter()
1586 .enumerate()
1587 .map(|(i, pk)| {
1588 (
1589 pk.clone(),
1590 Address::Symmetric(SocketAddr::new(
1591 IpAddr::V4(Ipv4Addr::LOCALHOST),
1592 ports[i],
1593 )),
1594 )
1595 })
1596 .collect();
1597
1598 for oracle in oracles.iter_mut().flatten() {
1600 oracle
1601 .update(
1602 (round * (n - 1) + restart_peer_idx) as u64,
1603 updated_peer_set.clone().try_into().unwrap(),
1604 )
1605 .await;
1606 }
1607
1608 let peer_context =
1610 context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1611 let config = Config::test(
1612 peers[restart_peer_idx].clone(),
1613 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1614 MAX_MESSAGE_SIZE,
1615 );
1616 let (mut network, mut oracle) =
1617 Network::new(peer_context.with_label("network"), config);
1618
1619 oracle
1620 .update(
1621 (round * (n - 1) + restart_peer_idx) as u64,
1622 updated_peer_set.clone().try_into().unwrap(),
1623 )
1624 .await;
1625
1626 let (sender, receiver) = network.register(
1627 0,
1628 Quota::per_second(NZU32!(100)),
1629 DEFAULT_MESSAGE_BACKLOG,
1630 );
1631 senders[restart_peer_idx] = Some(sender);
1632 receivers[restart_peer_idx] = Some(receiver);
1633 oracles[restart_peer_idx] = Some(oracle);
1634
1635 let handle = network.start();
1636 handles[restart_peer_idx] = Some(handle);
1637
1638 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1640 loop {
1641 let sent = restarted_sender
1642 .send(
1643 Recipients::All,
1644 peers[restart_peer_idx].public_key().as_ref(),
1645 true,
1646 )
1647 .await
1648 .unwrap();
1649 if sent.len() == n - 1 {
1650 break;
1651 }
1652 context.sleep(Duration::from_millis(100)).await;
1653 }
1654
1655 for i in 0..n {
1657 if i == restart_peer_idx {
1658 continue;
1659 }
1660 let sender = senders[i].as_mut().unwrap();
1661 loop {
1662 let sent = sender
1663 .send(
1664 Recipients::One(addresses[restart_peer_idx].clone()),
1665 peers[i].public_key().as_ref(),
1666 true,
1667 )
1668 .await
1669 .unwrap();
1670 if sent.len() == 1 {
1671 break;
1672 }
1673 context.sleep(Duration::from_millis(100)).await;
1674 }
1675 }
1676
1677 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1679 let mut received = HashSet::new();
1680 while received.len() < n - 1 {
1681 let (sender, message): (ed25519::PublicKey, _) =
1682 restarted_receiver.recv().await.unwrap();
1683 assert_eq!(sender.as_ref(), message.as_ref());
1684 received.insert(sender);
1685 }
1686 }
1687 }
1688
1689 assert_no_rate_limiting(&context);
1690 });
1691 }
1692
1693 #[test_traced]
1694 fn test_simultaneous_peer_restart() {
1695 let base_port = 9700;
1696 let n = 5;
1697
1698 let executor = deterministic::Runner::default();
1699 executor.start(|context| async move {
1700 let peers: Vec<_> = (0..n)
1702 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1703 .collect();
1704 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1705
1706 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1708
1709 let peer_set: Vec<(_, Address)> = addresses
1711 .iter()
1712 .enumerate()
1713 .map(|(i, pk)| {
1714 (
1715 pk.clone(),
1716 Address::Symmetric(SocketAddr::new(
1717 IpAddr::V4(Ipv4Addr::LOCALHOST),
1718 ports[i],
1719 )),
1720 )
1721 })
1722 .collect();
1723
1724 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1726 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1727 let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1728 let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1729 (0..n).map(|_| None).collect();
1730
1731 for (i, peer) in peers.iter().enumerate() {
1733 let peer_context = context.with_label(&format!("peer_{i}"));
1734
1735 let config = Config::test(
1736 peer.clone(),
1737 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1738 MAX_MESSAGE_SIZE,
1739 );
1740 let (mut network, mut oracle) =
1741 Network::new(peer_context.with_label("network"), config);
1742
1743 oracle.update(0, peer_set.clone().try_into().unwrap()).await;
1745
1746 let (sender, receiver) =
1747 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1748 senders[i] = Some(sender);
1749 receivers[i] = Some(receiver);
1750 oracles[i] = Some(oracle);
1751
1752 let handle = network.start();
1753 handles[i] = Some(handle);
1754 }
1755
1756 for (i, sender) in senders.iter_mut().enumerate() {
1758 let sender = sender.as_mut().unwrap();
1759 loop {
1760 let sent = sender
1761 .send(Recipients::All, peers[i].public_key().as_ref(), true)
1762 .await
1763 .unwrap();
1764 if sent.len() == n - 1 {
1765 break;
1766 }
1767 context.sleep(Duration::from_millis(100)).await;
1768 }
1769 }
1770
1771 for receiver in receivers.iter_mut() {
1773 let receiver = receiver.as_mut().unwrap();
1774 let mut received = HashSet::new();
1775 while received.len() < n - 1 {
1776 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1777 assert_eq!(sender.as_ref(), message.as_ref());
1778 received.insert(sender);
1779 }
1780 }
1781
1782 let restart_peers: Vec<usize> = (1..n).collect();
1787 for &idx in &restart_peers {
1788 if let Some(handle) = handles[idx].take() {
1789 handle.abort();
1790 }
1791 senders[idx] = None;
1792 receivers[idx] = None;
1793 oracles[idx] = None;
1794 ports[idx] = base_port + 100 + idx as u16;
1795 }
1796
1797 context.sleep(Duration::from_secs(2)).await;
1799
1800 let updated_peer_set: Vec<(_, Address)> = addresses
1802 .iter()
1803 .enumerate()
1804 .map(|(i, pk)| {
1805 (
1806 pk.clone(),
1807 Address::Symmetric(SocketAddr::new(
1808 IpAddr::V4(Ipv4Addr::LOCALHOST),
1809 ports[i],
1810 )),
1811 )
1812 })
1813 .collect();
1814
1815 oracles[0]
1817 .as_mut()
1818 .unwrap()
1819 .update(1, updated_peer_set.clone().try_into().unwrap())
1820 .await;
1821
1822 for &idx in &restart_peers {
1824 let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1825 let config = Config::test(
1826 peers[idx].clone(),
1827 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1828 MAX_MESSAGE_SIZE,
1829 );
1830 let (mut network, mut oracle) =
1831 Network::new(peer_context.with_label("network"), config);
1832
1833 oracle
1834 .update(1, updated_peer_set.clone().try_into().unwrap())
1835 .await;
1836
1837 let (sender, receiver) =
1838 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1839 senders[idx] = Some(sender);
1840 receivers[idx] = Some(receiver);
1841 oracles[idx] = Some(oracle);
1842
1843 let handle = network.start();
1844 handles[idx] = Some(handle);
1845 }
1846
1847 for (i, sender) in senders.iter_mut().enumerate() {
1849 let sender = sender.as_mut().unwrap();
1850 loop {
1851 let sent = sender
1852 .send(Recipients::All, peers[i].public_key().as_ref(), true)
1853 .await
1854 .unwrap();
1855 if sent.len() == n - 1 {
1856 break;
1857 }
1858 context.sleep(Duration::from_millis(100)).await;
1859 }
1860 }
1861
1862 for receiver in receivers.iter_mut() {
1864 let receiver = receiver.as_mut().unwrap();
1865 let mut received = HashSet::new();
1866 while received.len() < n - 1 {
1867 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1868 assert_eq!(sender.as_ref(), message.as_ref());
1869 received.insert(sender);
1870 }
1871 }
1872
1873 assert_no_rate_limiting(&context);
1874 });
1875 }
1876 #[test_traced]
1877 fn test_operations_after_shutdown_do_not_panic() {
1878 let executor = deterministic::Runner::default();
1879 executor.start(|context| async move {
1880 let peer = ed25519::PrivateKey::from_seed(0);
1881 let address = peer.public_key();
1882
1883 let peer_context = context.with_label("peer");
1884 let config = Config::test(
1885 peer.clone(),
1886 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1887 MAX_MESSAGE_SIZE,
1888 );
1889 let (mut network, mut oracle) =
1890 Network::new(peer_context.with_label("network"), config);
1891
1892 let (mut sender, _receiver) =
1894 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1895 let peer_addr =
1896 Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1897 let peers: Map<ed25519::PublicKey, Address> =
1898 vec![(address.clone(), peer_addr)].try_into().unwrap();
1899 oracle.update(0, peers.clone()).await;
1900
1901 let handle = network.start();
1903 handle.abort();
1904
1905 context.sleep(Duration::from_millis(100)).await;
1907
1908 oracle.update(1, peers.clone()).await;
1910 let _ = oracle.peer_set(0).await;
1911 let _ = oracle.subscribe().await;
1912 oracle.block(address.clone()).await;
1913
1914 let sent = sender
1916 .send(Recipients::All, address.as_ref(), true)
1917 .await
1918 .unwrap();
1919 assert!(sent.is_empty());
1920 });
1921 }
1922
1923 fn clean_shutdown(seed: u64) {
1924 let cfg = deterministic::Config::default()
1925 .with_seed(seed)
1926 .with_timeout(Some(Duration::from_secs(30)));
1927 let executor = deterministic::Runner::new(cfg);
1928 executor.start(|context| async move {
1929 let peer = ed25519::PrivateKey::from_seed(0);
1930
1931 let peer_context = context.with_label("peer");
1932 let config = Config::test(
1933 peer.clone(),
1934 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1935 MAX_MESSAGE_SIZE,
1936 );
1937 let (mut network, mut oracle) =
1938 Network::new(peer_context.with_label("network"), config);
1939
1940 let (_, _) =
1942 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1943 let peer_addr =
1944 Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1945 let peers: Map<ed25519::PublicKey, Address> =
1946 vec![(peer.public_key(), peer_addr)].try_into().unwrap();
1947 oracle.update(0, peers).await;
1948
1949 let handle = network.start();
1951
1952 context.sleep(Duration::from_millis(100)).await;
1954
1955 let running_before = count_running_tasks(&context, "peer_network");
1957 assert!(
1958 running_before > 0,
1959 "at least one network task should be running"
1960 );
1961
1962 handle.abort();
1964 let _ = handle.await;
1965
1966 context.sleep(Duration::from_millis(100)).await;
1968
1969 let running_after = count_running_tasks(&context, "peer_network");
1971 assert_eq!(
1972 running_after, 0,
1973 "all network tasks should be stopped, but {running_after} still running"
1974 );
1975 });
1976 }
1977
1978 #[test_traced]
1979 fn test_clean_shutdown() {
1980 for seed in 0..25 {
1981 clean_shutdown(seed);
1982 }
1983 }
1984}