1mod actors;
210mod channels;
211mod config;
212mod metrics;
213mod network;
214mod types;
215
216use thiserror::Error;
217
218#[derive(Error, Debug)]
220pub enum Error {
221 #[error("message too large: {0}")]
222 MessageTooLarge(usize),
223 #[error("network closed")]
224 NetworkClosed,
225}
226
227pub use actors::tracker::Oracle;
228pub use channels::{Receiver, Sender};
229pub use config::{Bootstrapper, Config};
230pub use network::Network;
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::{
236 authenticated::{
237 discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
238 relay::Relay,
239 },
240 Blocker, Ingress, Manager, Provider, Receiver, Recipients, Sender,
241 };
242 use commonware_cryptography::{ed25519, Signer as _};
243 use commonware_macros::{select, select_loop, test_group, test_traced};
244 use commonware_runtime::{
245 count_running_tasks, deterministic, tokio, BufferPooler, Clock, Handle, IoBuf, Metrics,
246 Network as RNetwork, Quota, Resolver, Runner, Spawner,
247 };
248 use commonware_utils::{channel::mpsc, hostname, ordered::Set, TryCollect, NZU32};
249 use rand_core::{CryptoRngCore, RngCore};
250 use std::{
251 collections::HashSet,
252 net::{IpAddr, Ipv4Addr, SocketAddr},
253 time::Duration,
254 };
255
256 #[derive(Copy, Clone)]
257 enum Mode {
258 All,
259 Some,
260 One,
261 }
262
263 const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
265
266 fn assert_no_rate_limiting(context: &impl Metrics) {
275 let metrics = context.encode();
276 assert!(
277 !metrics.contains("messages_rate_limited_total{"),
278 "no messages should be rate limited: {metrics}"
279 );
280 }
281
282 async fn run_network(
287 context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
288 max_message_size: u32,
289 base_port: u16,
290 n: usize,
291 mode: Mode,
292 ) {
293 let mut peers = Vec::new();
295 for i in 0..n {
296 peers.push(ed25519::PrivateKey::from_seed(i as u64));
297 }
298 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
299
300 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
302 for (i, peer) in peers.iter().enumerate() {
303 let context = context.with_label(&format!("peer_{i}"));
305
306 let port = base_port + i as u16;
308
309 let mut bootstrappers = Vec::new();
311 if i > 0 {
312 bootstrappers.push((
313 addresses[0].clone(),
314 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
315 ));
316 }
317
318 let signer = peer.clone();
320 let config = Config::test(
321 signer.clone(),
322 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
323 bootstrappers,
324 max_message_size,
325 );
326 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
327
328 oracle.track(0, addresses.clone().try_into().unwrap()).await;
330
331 let (mut sender, mut receiver) =
333 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
334
335 network.start();
337
338 context.with_label("agent").spawn({
340 let complete_sender = complete_sender.clone();
341 let addresses = addresses.clone();
342 move |context| async move {
343 let receiver = context.with_label("receiver").spawn(move |_| async move {
345 let mut received = HashSet::new();
347 while received.len() < n - 1 {
348 let (sender, message) = receiver.recv().await.unwrap();
350 assert_eq!(message, sender.as_ref());
351
352 received.insert(sender);
354 }
355 complete_sender.send(()).await.unwrap();
356
357 loop {
359 receiver.recv().await.unwrap();
360 }
361 });
362
363 let msg = signer.public_key();
365 let sender = context
366 .with_label("sender")
367 .spawn(move |context| async move {
368 let mut recipients = addresses.clone();
370 recipients.remove(i);
371 recipients.sort();
372
373 loop {
375 match mode {
376 Mode::One => {
377 for recipient in &recipients {
378 loop {
380 let sent = sender
381 .send(
382 Recipients::One(recipient.clone()),
383 msg.as_ref().to_vec(),
384 true,
385 )
386 .await
387 .unwrap();
388 if sent.len() != 1 {
389 context.sleep(Duration::from_millis(100)).await;
390 continue;
391 }
392 assert_eq!(&sent[0], recipient);
393 break;
394 }
395 }
396 }
397 Mode::Some | Mode::All => {
398 loop {
400 let mut sent = sender
401 .send(
402 match mode {
403 Mode::Some => {
404 Recipients::Some(recipients.clone())
405 }
406 Mode::All => Recipients::All,
407 _ => unreachable!(),
408 },
409 msg.as_ref().to_vec(),
410 true,
411 )
412 .await
413 .unwrap();
414 if sent.len() != recipients.len() {
415 context.sleep(Duration::from_millis(100)).await;
416 continue;
417 }
418
419 sent.sort();
421 assert_eq!(sent, recipients);
422 break;
423 }
424 }
425 };
426
427 context.sleep(Duration::from_secs(10)).await;
429 }
430 });
431
432 select! {
434 receiver = receiver => {
435 panic!("receiver exited: {receiver:?}");
436 },
437 sender = sender => {
438 panic!("sender exited: {sender:?}");
439 },
440 }
441 }
442 });
443 }
444
445 for _ in 0..n {
447 complete_receiver.recv().await.unwrap();
448 }
449
450 assert_no_rate_limiting(&context);
452 }
453
454 fn run_deterministic_test(seed: u64, mode: Mode) {
455 const NUM_PEERS: usize = 25;
457 const BASE_PORT: u16 = 3000;
458
459 let executor = deterministic::Runner::seeded(seed);
461 let state = executor.start(|context| async move {
462 run_network(
463 context.clone(),
464 MAX_MESSAGE_SIZE,
465 BASE_PORT,
466 NUM_PEERS,
467 mode,
468 )
469 .await;
470 context.auditor().state()
471 });
472
473 let executor = deterministic::Runner::seeded(seed);
475 let state2 = executor.start(|context| async move {
476 run_network(
477 context.clone(),
478 MAX_MESSAGE_SIZE,
479 BASE_PORT,
480 NUM_PEERS,
481 mode,
482 )
483 .await;
484 context.auditor().state()
485 });
486 assert_eq!(state, state2);
487 }
488
489 #[test_group("slow")]
490 #[test_traced]
491 fn test_determinism_one() {
492 for i in 0..10 {
493 run_deterministic_test(i, Mode::One);
494 }
495 }
496
497 #[test_group("slow")]
498 #[test_traced]
499 fn test_determinism_some() {
500 for i in 0..10 {
501 run_deterministic_test(i, Mode::Some);
502 }
503 }
504
505 #[test_group("slow")]
506 #[test_traced]
507 fn test_determinism_all() {
508 for i in 0..10 {
509 run_deterministic_test(i, Mode::All);
510 }
511 }
512
513 #[test_traced]
514 fn test_tokio_connectivity() {
515 let executor = tokio::Runner::default();
516 executor.start(|context| async move {
517 let base_port = 3000;
518 let n = 10;
519 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
520 });
521 }
522
523 #[test_traced]
524 fn test_multi_index_oracle() {
525 let base_port = 3000;
527 let n: usize = 100;
528
529 let executor = deterministic::Runner::default();
531 executor.start(|context| async move {
532 let mut peers = Vec::new();
534 for i in 0..n {
535 peers.push(ed25519::PrivateKey::from_seed(i as u64));
536 }
537 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
538
539 let mut waiters = Vec::new();
541 for (i, peer) in peers.iter().enumerate() {
542 let context = context.with_label(&format!("peer_{i}"));
544
545 let port = base_port + i as u16;
547
548 let mut bootstrappers = Vec::new();
550 if i > 0 {
551 bootstrappers.push((
552 addresses[0].clone(),
553 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
554 ));
555 }
556
557 let signer = peer.clone();
559 let config = Config::test(
560 signer.clone(),
561 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
562 bootstrappers,
563 1_024 * 1_024, );
565 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
566
567 oracle
569 .track(0, [addresses[0].clone()].try_into().unwrap())
570 .await;
571 oracle
572 .track(
573 1,
574 [addresses[1].clone(), addresses[2].clone()]
575 .try_into()
576 .unwrap(),
577 )
578 .await;
579 oracle
580 .track(2, addresses.iter().skip(2).cloned().try_collect().unwrap())
581 .await;
582
583 let (mut sender, mut receiver) =
585 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
586
587 network.start();
589
590 let handler = context
592 .with_label("agent")
593 .spawn(move |context| async move {
594 if i == 0 {
595 let msg = signer.public_key();
597 loop {
598 if sender
599 .send(Recipients::All, msg.as_ref().to_vec(), true)
600 .await
601 .unwrap()
602 .len()
603 == n - 1
604 {
605 break;
606 }
607
608 context.sleep(Duration::from_millis(100)).await;
610 }
611 } else {
612 let (sender, message) = receiver.recv().await.unwrap();
614 assert_eq!(message, sender.as_ref());
615 }
616 });
617
618 waiters.push(handler);
620 }
621
622 for waiter in waiters.into_iter().rev() {
624 waiter.await.unwrap();
625 }
626
627 assert_no_rate_limiting(&context);
629 });
630 }
631
632 #[test_traced]
633 fn test_message_too_large() {
634 let base_port = 3000;
636 let n: usize = 2;
637
638 let executor = deterministic::Runner::seeded(0);
640 executor.start(|mut context| async move {
641 let mut peers = Vec::new();
643 for i in 0..n {
644 peers.push(ed25519::PrivateKey::from_seed(i as u64));
645 }
646 let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
647
648 let signer = peers[0].clone();
650 let config = Config::test(
651 signer.clone(),
652 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
653 Vec::new(),
654 1_024 * 1_024, );
656 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
657
658 oracle.track(0, addresses.clone()).await;
660
661 let (mut sender, _) =
663 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
664
665 network.start();
667
668 let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
671
672 let recipient = Recipients::One(addresses[1].clone());
674 let result = sender.send(recipient, msg, true).await;
675 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
676 });
677 }
678
679 #[test_traced]
680 fn test_rate_limiting() {
681 let base_port = 3000;
683 let n: usize = 2;
684
685 let executor = deterministic::Runner::seeded(0);
687 executor.start(|context| async move {
688 let mut peers = Vec::new();
690 for i in 0..n {
691 peers.push(ed25519::PrivateKey::from_seed(i as u64));
692 }
693 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
694 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
695 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
696
697 let signer0 = peers[0].clone();
699 let config0 = Config::test(
700 signer0.clone(),
701 socket0,
702 vec![(peers[1].public_key(), socket1.into())],
703 1_024 * 1_024, );
705 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
706 oracle0
707 .track(0, addresses.clone().try_into().unwrap())
708 .await;
709 let (mut sender0, _receiver0) =
710 network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
711 network0.start();
712
713 let signer1 = peers[1].clone();
715 let config1 = Config::test(
716 signer1.clone(),
717 socket1,
718 vec![(peers[0].public_key(), socket0.into())],
719 1_024 * 1_024, );
721 let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
722 oracle1
723 .track(0, addresses.clone().try_into().unwrap())
724 .await;
725 let (_sender1, _receiver1) =
726 network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
727 network1.start();
728
729 let msg = vec![0u8; 1024]; loop {
732 let sent = sender0
734 .send(Recipients::One(addresses[1].clone()), msg.clone(), true)
735 .await
736 .unwrap();
737 if !sent.is_empty() {
738 break;
739 }
740
741 context.sleep(Duration::from_mins(1)).await
744 }
745
746 let sent = sender0
750 .send(Recipients::One(addresses[1].clone()), msg, true)
751 .await
752 .unwrap();
753 assert!(sent.is_empty());
754
755 for _ in 0..10 {
757 assert_no_rate_limiting(&context);
758 context.sleep(Duration::from_millis(100)).await;
759 }
760 });
761 }
762
763 #[test_traced]
764 fn test_unordered_peer_sets() {
765 let (n, base_port) = (10, 3000);
766 let executor = deterministic::Runner::default();
767 executor.start(|context| async move {
768 let mut peers_and_sks = Vec::new();
770 for i in 0..n {
771 let sk = ed25519::PrivateKey::from_seed(i as u64);
772 let pk = sk.public_key();
773 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
774 peers_and_sks.push((sk, pk, addr));
775 }
776 let peer0 = peers_and_sks[0].clone();
777 let config = Config::test(
778 peer0.0,
779 peer0.2,
780 vec![(peer0.1.clone(), peer0.2.into())],
781 1_024 * 1_024,
782 );
783 let (network, mut oracle) = Network::new(context.with_label("network"), config);
784 network.start();
785
786 let mut subscription = oracle.subscribe().await;
788
789 let set10: Set<_> = peers_and_sks
791 .iter()
792 .take(2)
793 .map(|(_, pk, _)| pk.clone())
794 .try_collect()
795 .unwrap();
796 oracle.track(10, set10.clone()).await;
797 let (id, new, all) = subscription.recv().await.unwrap();
798 assert_eq!(id, 10);
799 assert_eq!(new, set10);
800 assert_eq!(all, set10);
801
802 let set9: Set<_> = peers_and_sks
804 .iter()
805 .skip(2)
806 .map(|(_, pk, _)| pk.clone())
807 .try_collect()
808 .unwrap();
809 oracle.track(9, set9.clone()).await;
810
811 let set11: Set<_> = peers_and_sks
813 .iter()
814 .skip(4)
815 .map(|(_, pk, _)| pk.clone())
816 .try_collect()
817 .unwrap();
818 oracle.track(11, set11.clone()).await;
819 let (id, new, all) = subscription.recv().await.unwrap();
820 assert_eq!(id, 11);
821 assert_eq!(new, set11);
822 let all_keys: Set<_> = set10
823 .into_iter()
824 .chain(set11.into_iter())
825 .try_collect()
826 .unwrap();
827 assert_eq!(all, all_keys);
828 });
829 }
830
831 #[test_traced]
832 fn test_graceful_shutdown() {
833 let base_port = 3000;
834 let n: usize = 5;
835
836 let executor = deterministic::Runner::default();
837 executor.start(|context| async move {
838 let mut peers = Vec::new();
840 for i in 0..n {
841 peers.push(ed25519::PrivateKey::from_seed(i as u64));
842 }
843 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
844
845 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
847 for (i, peer) in peers.iter().enumerate() {
848 let peer_context = context.with_label(&format!("peer_{i}"));
849 let port = base_port + i as u16;
850
851 let mut bootstrappers = Vec::new();
853 if i > 0 {
854 bootstrappers.push((
855 addresses[0].clone(),
856 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
857 ));
858 }
859
860 let signer = peer.clone();
861 let config = Config::test(
862 signer.clone(),
863 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
864 bootstrappers,
865 1_024 * 1_024, );
867 let (mut network, mut oracle) =
868 Network::new(peer_context.with_label("network"), config);
869
870 oracle.track(0, addresses.clone().try_into().unwrap()).await;
872
873 let (mut sender, mut receiver) =
874 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
875 network.start();
876
877 peer_context.with_label("agent").spawn({
878 let complete_sender = complete_sender.clone();
879 move |context| async move {
880 let expected_connections = if i == 0 { n - 1 } else { 1 };
882
883 let msg = signer.public_key();
885 loop {
886 let sent = sender
887 .send(Recipients::All, msg.as_ref().to_vec(), true)
888 .await
889 .unwrap();
890 if sent.len() >= expected_connections {
891 break;
892 }
893 context.sleep(Duration::from_millis(100)).await;
894 }
895
896 complete_sender.send(()).await.unwrap();
898
899 select_loop! {
901 context,
902 on_stopped => {},
903 Ok(_) = receiver.recv() else break => {},
904 }
905 }
906 });
907 }
908
909 for _ in 0..n {
911 complete_receiver.recv().await.unwrap();
912 }
913
914 let metrics_before = context.encode();
916 let is_running = |name: &str| -> bool {
917 metrics_before.lines().any(|line| {
918 line.starts_with("runtime_tasks_running{")
919 && line.contains(&format!("name=\"{name}\""))
920 && line.contains("kind=\"Task\"")
921 && line.trim_end().ends_with(" 1")
922 })
923 };
924 for i in 0..n {
925 let prefix = format!("peer_{i}_network");
926 assert!(
927 is_running(&format!("{prefix}_tracker")),
928 "peer_{i} tracker should be running"
929 );
930 assert!(
931 is_running(&format!("{prefix}_router")),
932 "peer_{i} router should be running"
933 );
934 assert!(
935 is_running(&format!("{prefix}_spawner")),
936 "peer_{i} spawner should be running"
937 );
938 assert!(
939 is_running(&format!("{prefix}_listener")),
940 "peer_{i} listener should be running"
941 );
942 assert!(
943 is_running(&format!("{prefix}_dialer")),
944 "peer_{i} dialer should be running"
945 );
946 }
947
948 let shutdown_context = context.clone();
951 context.with_label("shutdown").spawn(move |_| async move {
952 let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
954
955 assert!(
957 result.is_ok(),
958 "graceful shutdown should complete: {result:?}"
959 );
960 });
961
962 context.stopped().await.unwrap();
964
965 context.sleep(Duration::from_millis(100)).await;
967
968 let metrics_after = context.encode();
970 let is_stopped = |name: &str| -> bool {
971 metrics_after.lines().any(|line| {
972 line.starts_with("runtime_tasks_running{")
973 && line.contains(&format!("name=\"{name}\""))
974 && line.contains("kind=\"Task\"")
975 && line.trim_end().ends_with(" 0")
976 })
977 };
978 for i in 0..n {
979 let prefix = format!("peer_{i}_network");
980 assert!(
981 is_stopped(&format!("{prefix}_tracker")),
982 "peer_{i} tracker should be stopped"
983 );
984 assert!(
985 is_stopped(&format!("{prefix}_router")),
986 "peer_{i} router should be stopped"
987 );
988 assert!(
989 is_stopped(&format!("{prefix}_spawner")),
990 "peer_{i} spawner should be stopped"
991 );
992 assert!(
993 is_stopped(&format!("{prefix}_listener")),
994 "peer_{i} listener should be stopped"
995 );
996 assert!(
997 is_stopped(&format!("{prefix}_dialer")),
998 "peer_{i} dialer should be stopped"
999 );
1000 }
1001 });
1002 }
1003
1004 #[test_traced]
1005 fn test_subscription_includes_self_when_registered() {
1006 let base_port = 3000;
1007 let executor = deterministic::Runner::default();
1008 executor.start(|context| async move {
1009 let self_sk = ed25519::PrivateKey::from_seed(0);
1011 let self_pk = self_sk.public_key();
1012 let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1013
1014 let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
1015
1016 let config = Config::test(
1018 self_sk,
1019 self_addr,
1020 vec![], 1_024 * 1_024,
1022 );
1023 let (network, mut oracle) = Network::new(context.with_label("network"), config);
1024 network.start();
1025
1026 let mut subscription = oracle.subscribe().await;
1028
1029 let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
1031 oracle.track(1, peer_set.clone()).await;
1032
1033 let (id, new, all) = subscription.recv().await.unwrap();
1035 assert_eq!(id, 1);
1036 assert_eq!(new.len(), 1);
1037 assert_eq!(all.len(), 1);
1038
1039 assert!(
1041 new.position(&self_pk).is_none(),
1042 "new set should not include self"
1043 );
1044 assert!(
1045 new.position(&other_pk).is_some(),
1046 "new set should include other"
1047 );
1048
1049 assert!(
1051 all.position(&self_pk).is_none(),
1052 "tracked peers should not include self"
1053 );
1054 assert!(
1055 all.position(&other_pk).is_some(),
1056 "tracked peers should include other"
1057 );
1058
1059 let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
1061 oracle.track(2, peer_set.clone()).await;
1062
1063 let (id, new, all) = subscription.recv().await.unwrap();
1065 assert_eq!(id, 2);
1066 assert_eq!(new.len(), 2);
1067 assert_eq!(all.len(), 2);
1068
1069 assert!(
1071 new.position(&self_pk).is_some(),
1072 "new set should include self"
1073 );
1074 assert!(
1075 new.position(&other_pk).is_some(),
1076 "new set should include other"
1077 );
1078
1079 assert!(
1081 all.position(&self_pk).is_some(),
1082 "tracked peers should include self"
1083 );
1084 assert!(
1085 all.position(&other_pk).is_some(),
1086 "tracked peers should include other"
1087 );
1088 });
1089 }
1090
1091 #[test_traced]
1092 fn test_dns_bootstrapper_resolution() {
1093 let base_port = 3000;
1094 let n: usize = 3;
1095
1096 let executor = deterministic::Runner::default();
1097 executor.start(|context| async move {
1098 let mut peers = Vec::new();
1100 for i in 0..n {
1101 peers.push(ed25519::PrivateKey::from_seed(i as u64));
1102 }
1103 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1104
1105 let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1107 context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1108
1109 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1111 for (i, peer) in peers.iter().enumerate() {
1112 let context = context.with_label(&format!("peer_{i}"));
1113 let port = base_port + i as u16;
1114
1115 let bootstrappers = if i > 0 {
1117 vec![(
1118 addresses[0].clone(),
1119 Ingress::Dns {
1120 host: hostname!("boot.local"),
1121 port: base_port,
1122 },
1123 )]
1124 } else {
1125 vec![]
1126 };
1127
1128 let signer = peer.clone();
1130 let config = Config::test(
1131 signer.clone(),
1132 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1133 bootstrappers,
1134 1_024 * 1_024,
1135 );
1136 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1137
1138 oracle.track(0, addresses.clone().try_into().unwrap()).await;
1140
1141 let (mut sender, mut receiver) =
1143 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1144
1145 network.start();
1146
1147 context.with_label("agent").spawn({
1149 let complete_sender = complete_sender.clone();
1150 let addresses = addresses.clone();
1151 move |context| async move {
1152 let receiver = context.with_label("receiver").spawn(move |_| async move {
1154 let mut received = HashSet::new();
1155 while received.len() < n - 1 {
1156 let (sender, message) = receiver.recv().await.unwrap();
1157 assert_eq!(message, sender.as_ref());
1158 received.insert(sender);
1159 }
1160 complete_sender.send(()).await.unwrap();
1161
1162 loop {
1163 receiver.recv().await.unwrap();
1164 }
1165 });
1166
1167 let msg = signer.public_key();
1169 let sender =
1170 context
1171 .with_label("sender")
1172 .spawn(move |context| async move {
1173 loop {
1174 let mut recipients = addresses.clone();
1175 recipients.remove(i);
1176 recipients.sort();
1177
1178 loop {
1179 let mut sent = sender
1180 .send(Recipients::All, msg.as_ref().to_vec(), true)
1181 .await
1182 .unwrap();
1183 if sent.len() != recipients.len() {
1184 context.sleep(Duration::from_millis(100)).await;
1185 continue;
1186 }
1187 sent.sort();
1188 assert_eq!(sent, recipients);
1189 break;
1190 }
1191
1192 context.sleep(Duration::from_secs(10)).await;
1193 }
1194 });
1195
1196 select! {
1197 receiver = receiver => {
1198 panic!("receiver exited: {receiver:?}")
1199 },
1200 sender = sender => {
1201 panic!("sender exited: {sender:?}")
1202 },
1203 }
1204 }
1205 });
1206 }
1207
1208 for _ in 0..n {
1210 complete_receiver.recv().await.unwrap();
1211 }
1212
1213 assert_no_rate_limiting(&context);
1214 });
1215 }
1216
1217 #[test_traced]
1218 fn test_dns_resolution_failure_then_success() {
1219 let base_port = 3100;
1220
1221 let executor = deterministic::Runner::default();
1222 executor.start(|context| async move {
1223 let peer0 = ed25519::PrivateKey::from_seed(0);
1225 let peer1 = ed25519::PrivateKey::from_seed(1);
1226 let addresses = vec![peer0.public_key(), peer1.public_key()];
1227
1228 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1229 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1230
1231 let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1235 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1236 oracle0
1237 .track(0, addresses.clone().try_into().unwrap())
1238 .await;
1239 let (mut sender0, mut receiver0) =
1240 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1241 network0.start();
1242
1243 let config1 = Config::test(
1245 peer1.clone(),
1246 socket1,
1247 vec![(
1248 peer0.public_key(),
1249 Ingress::Dns {
1250 host: hostname!("boot.local"),
1251 port: base_port,
1252 },
1253 )],
1254 1_024 * 1_024,
1255 );
1256 let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1257 oracle1
1258 .track(0, addresses.clone().try_into().unwrap())
1259 .await;
1260 let (mut sender1, mut receiver1) =
1261 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1262 network1.start();
1263
1264 context.sleep(Duration::from_secs(2)).await;
1266
1267 let sent = sender0
1269 .send(Recipients::One(peer1.public_key()), b"test", true)
1270 .await
1271 .unwrap();
1272 assert!(sent.is_empty(), "should not be connected yet");
1273
1274 context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
1276
1277 let pk0 = peer0.public_key();
1279 let pk1 = peer1.public_key();
1280 let msg0 = pk0.to_vec();
1281 let msg1 = pk1.to_vec();
1282
1283 let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
1285 let done0 = done_sender.clone();
1286 let pk1_clone = pk1.clone();
1287 context.with_label("recv0").spawn(move |_| async move {
1288 let (sender, message) = receiver0.recv().await.unwrap();
1289 assert_eq!(sender, pk1_clone);
1290 assert_eq!(message, msg1.as_slice());
1291 done0.send(()).await.unwrap();
1292 });
1293 let done1 = done_sender.clone();
1294 let pk0_clone = pk0.clone();
1295 context.with_label("recv1").spawn(move |_| async move {
1296 let (sender, message) = receiver1.recv().await.unwrap();
1297 assert_eq!(sender, pk0_clone);
1298 assert_eq!(message, msg0.as_slice());
1299 done1.send(()).await.unwrap();
1300 });
1301
1302 context.with_label("sender").spawn({
1304 let pk0 = pk0.clone();
1305 let pk1 = pk1.clone();
1306 move |context| async move {
1307 loop {
1308 let sent0 = sender0
1309 .send(Recipients::One(pk1.clone()), pk0.as_ref().to_vec(), true)
1310 .await
1311 .unwrap();
1312 let sent1 = sender1
1313 .send(Recipients::One(pk0.clone()), pk1.as_ref().to_vec(), true)
1314 .await
1315 .unwrap();
1316 if !sent0.is_empty() && !sent1.is_empty() {
1317 break;
1318 }
1319 context.sleep(Duration::from_millis(100)).await;
1320 }
1321 }
1322 });
1323
1324 done_receiver.recv().await.unwrap();
1326 done_receiver.recv().await.unwrap();
1327 });
1328 }
1329
1330 fn run_dns_connectivity(seed: u64) -> String {
1332 let base_port = 3400;
1333 let n: usize = 3;
1334
1335 let executor = deterministic::Runner::seeded(seed);
1336 executor.start(|context| async move {
1337 let mut peers = Vec::new();
1339 for i in 0..n {
1340 peers.push(ed25519::PrivateKey::from_seed(i as u64));
1341 }
1342 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1343
1344 let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1346 context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1347
1348 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1350 for (i, peer) in peers.iter().enumerate() {
1351 let context = context.with_label(&format!("peer_{i}"));
1352 let port = base_port + i as u16;
1353
1354 let bootstrappers = if i > 0 {
1356 vec![(
1357 addresses[0].clone(),
1358 Ingress::Dns {
1359 host: hostname!("boot.local"),
1360 port: base_port,
1361 },
1362 )]
1363 } else {
1364 vec![]
1365 };
1366
1367 let signer = peer.clone();
1368 let config = Config::test(
1369 signer.clone(),
1370 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1371 bootstrappers,
1372 1_024 * 1_024,
1373 );
1374 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1375 oracle.track(0, addresses.clone().try_into().unwrap()).await;
1376 let (mut sender, mut receiver) =
1377 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1378 network.start();
1379
1380 context.with_label("agent").spawn({
1381 let complete_sender = complete_sender.clone();
1382 let addresses = addresses.clone();
1383 move |context| async move {
1384 let receiver = context.with_label("receiver").spawn(move |_| async move {
1385 let mut received = HashSet::new();
1386 while received.len() < n - 1 {
1387 let (sender, message) = receiver.recv().await.unwrap();
1388 assert_eq!(message, sender.as_ref());
1389 received.insert(sender);
1390 }
1391 complete_sender.send(()).await.unwrap();
1392 loop {
1393 receiver.recv().await.unwrap();
1394 }
1395 });
1396
1397 let msg = signer.public_key();
1398 let sender =
1399 context
1400 .with_label("sender")
1401 .spawn(move |context| async move {
1402 loop {
1403 let mut recipients = addresses.clone();
1404 recipients.remove(i);
1405 recipients.sort();
1406
1407 loop {
1408 let mut sent = sender
1409 .send(Recipients::All, msg.as_ref().to_vec(), true)
1410 .await
1411 .unwrap();
1412 if sent.len() != recipients.len() {
1413 context.sleep(Duration::from_millis(100)).await;
1414 continue;
1415 }
1416 sent.sort();
1417 assert_eq!(sent, recipients);
1418 break;
1419 }
1420
1421 context.sleep(Duration::from_secs(10)).await;
1422 }
1423 });
1424
1425 select! {
1426 receiver = receiver => {
1427 panic!("receiver exited: {receiver:?}")
1428 },
1429 sender = sender => {
1430 panic!("sender exited: {sender:?}")
1431 },
1432 }
1433 }
1434 });
1435 }
1436
1437 for _ in 0..n {
1438 complete_receiver.recv().await.unwrap();
1439 }
1440
1441 context.auditor().state()
1442 })
1443 }
1444
1445 #[test_traced]
1446 fn test_dns_resolution_determinism() {
1447 let state1 = run_dns_connectivity(42);
1449 let state2 = run_dns_connectivity(42);
1450 assert_eq!(state1, state2, "DNS resolution should be deterministic");
1451 }
1452
1453 #[test_traced]
1454 fn test_dns_resolving_to_private_ip_not_dialed() {
1455 let base_port = 3300;
1458 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1459 executor.start(|context| async move {
1460 let peer0 = ed25519::PrivateKey::from_seed(0);
1461 let peer1 = ed25519::PrivateKey::from_seed(1);
1462
1463 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1464 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1465
1466 context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
1468
1469 let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1470
1471 let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1473 config0.allow_private_ips = true;
1474 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1475 oracle0
1476 .track(0, addresses.clone().try_into().unwrap())
1477 .await;
1478 let (_sender0, mut receiver0) =
1479 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1480 network0.start();
1481
1482 let bootstrappers = vec![(
1484 peer0.public_key(),
1485 Ingress::Dns {
1486 host: hostname!("boot.local"),
1487 port: base_port,
1488 },
1489 )];
1490 let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
1491 config1.allow_private_ips = false; let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1493 oracle1
1494 .track(0, addresses.clone().try_into().unwrap())
1495 .await;
1496 let (mut sender1, _receiver1) =
1497 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1498 network1.start();
1499
1500 context.sleep(Duration::from_secs(5)).await;
1502
1503 let sent = sender1
1505 .send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
1506 .await
1507 .unwrap();
1508 assert!(
1509 sent.is_empty(),
1510 "peer 1 should not have connected to peer 0 (private IP)"
1511 );
1512
1513 select! {
1515 msg = receiver0.recv() => {
1516 panic!("peer 0 should not have received any message, got: {msg:?}");
1517 },
1518 _ = context.sleep(Duration::from_secs(1)) => {
1519 },
1521 }
1522 });
1523 }
1524
1525 #[test_traced]
1526 fn test_dns_mixed_ips_connectivity() {
1527 for seed in 0..25 {
1533 let base_port = 3400;
1534
1535 let cfg = deterministic::Config::default()
1536 .with_seed(seed)
1537 .with_timeout(Some(Duration::from_secs(120)));
1538 let executor = deterministic::Runner::new(cfg);
1539 executor.start(|context| async move {
1540 let peer0 = ed25519::PrivateKey::from_seed(0);
1541 let peer1 = ed25519::PrivateKey::from_seed(1);
1542
1543 let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1544 let socket0 = SocketAddr::new(good_ip, base_port);
1545 let socket1 = SocketAddr::new(good_ip, base_port + 1);
1546
1547 let mut all_ips0: Vec<IpAddr> = (1..=3)
1549 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1550 .collect();
1551 all_ips0.push(good_ip);
1552 context.resolver_register("peer-0.local", Some(all_ips0));
1553
1554 let mut all_ips1: Vec<IpAddr> = (1..=3)
1555 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1556 .collect();
1557 all_ips1.push(good_ip);
1558 context.resolver_register("peer-1.local", Some(all_ips1));
1559
1560 let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1561
1562 let bootstrappers0 = vec![(
1564 peer1.public_key(),
1565 Ingress::Dns {
1566 host: hostname!("peer-1.local"),
1567 port: base_port + 1,
1568 },
1569 )];
1570 let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
1571 let (mut network0, mut oracle0) =
1572 Network::new(context.with_label("peer_0"), config0);
1573 oracle0
1574 .track(0, addresses.clone().try_into().unwrap())
1575 .await;
1576 let (_sender0, mut receiver0) =
1577 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1578 network0.start();
1579
1580 let bootstrappers1 = vec![(
1582 peer0.public_key(),
1583 Ingress::Dns {
1584 host: hostname!("peer-0.local"),
1585 port: base_port,
1586 },
1587 )];
1588 let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
1589 let (mut network1, mut oracle1) =
1590 Network::new(context.with_label("peer_1"), config1);
1591 oracle1
1592 .track(0, addresses.clone().try_into().unwrap())
1593 .await;
1594 let (mut sender1, _receiver1) =
1595 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1596 network1.start();
1597
1598 let pk0 = peer0.public_key();
1600 loop {
1601 let sent = sender1
1602 .send(
1603 Recipients::One(pk0.clone()),
1604 peer1.public_key().as_ref().to_vec(),
1605 true,
1606 )
1607 .await
1608 .unwrap();
1609 if !sent.is_empty() {
1610 break;
1611 }
1612 context.sleep(Duration::from_millis(100)).await;
1613 }
1614
1615 let (sender, msg) = receiver0.recv().await.unwrap();
1617 assert_eq!(sender, peer1.public_key());
1618 assert_eq!(msg, peer1.public_key().as_ref());
1619 });
1620 }
1621 }
1622
1623 #[test_traced]
1624 fn test_many_peer_restart_with_new_address() {
1625 let base_port = 7500;
1626 let n = 5;
1627
1628 let executor = deterministic::Runner::default();
1629 executor.start(|context| async move {
1630 let peers: Vec<_> = (0..n)
1632 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1633 .collect();
1634 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1635
1636 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1638 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1639 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1640
1641 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1643
1644 for (i, peer) in peers.iter().enumerate() {
1646 let peer_context = context.with_label(&format!("peer_{i}"));
1647
1648 let mut bootstrappers = Vec::new();
1650 if i > 0 {
1651 bootstrappers.push((
1652 addresses[0].clone(),
1653 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1654 ));
1655 }
1656
1657 let config = Config::test(
1658 peer.clone(),
1659 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1660 bootstrappers,
1661 MAX_MESSAGE_SIZE,
1662 );
1663 let (mut network, mut oracle) =
1664 Network::new(peer_context.with_label("network"), config);
1665
1666 oracle.track(0, addresses.clone().try_into().unwrap()).await;
1668
1669 let (sender, receiver) =
1670 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1671 senders[i] = Some(sender);
1672 receivers[i] = Some(receiver);
1673
1674 let handle = network.start();
1675 handles[i] = Some(handle);
1676 }
1677
1678 for (i, sender) in senders.iter_mut().enumerate() {
1680 let sender = sender.as_mut().unwrap();
1681 loop {
1682 let sent = sender
1683 .send(
1684 Recipients::All,
1685 peers[i].public_key().as_ref().to_vec(),
1686 true,
1687 )
1688 .await
1689 .unwrap();
1690 if sent.len() == n - 1 {
1691 break;
1692 }
1693 context.sleep(Duration::from_millis(100)).await;
1694 }
1695 }
1696
1697 for receiver in receivers.iter_mut() {
1699 let receiver = receiver.as_mut().unwrap();
1700 let mut received = HashSet::new();
1701 while received.len() < n - 1 {
1702 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1703 assert_eq!(message, sender.as_ref());
1704 received.insert(sender);
1705 }
1706 }
1707
1708 let mut restart_counter = 0u16;
1710 for round in 0..3 {
1711 for restart_peer_idx in 1..n {
1712 restart_counter += 1;
1714 let new_port = base_port + 100 + restart_counter;
1715 ports[restart_peer_idx] = new_port;
1716
1717 if let Some(handle) = handles[restart_peer_idx].take() {
1719 handle.abort();
1720 }
1721 senders[restart_peer_idx] = None;
1722 receivers[restart_peer_idx] = None;
1723
1724 let peer_context =
1726 context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1727 let bootstrappers = vec![(
1728 addresses[0].clone(),
1729 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1730 )];
1731 let config = Config::test(
1732 peers[restart_peer_idx].clone(),
1733 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1734 bootstrappers,
1735 MAX_MESSAGE_SIZE,
1736 );
1737 let (mut network, mut oracle) =
1738 Network::new(peer_context.with_label("network"), config);
1739
1740 oracle.track(0, addresses.clone().try_into().unwrap()).await;
1742
1743 let (sender, receiver) = network.register(
1744 0,
1745 Quota::per_second(NZU32!(100)),
1746 DEFAULT_MESSAGE_BACKLOG,
1747 );
1748 senders[restart_peer_idx] = Some(sender);
1749 receivers[restart_peer_idx] = Some(receiver);
1750
1751 let handle = network.start();
1752 handles[restart_peer_idx] = Some(handle);
1753
1754 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1756 loop {
1757 let sent = restarted_sender
1758 .send(
1759 Recipients::All,
1760 peers[restart_peer_idx].public_key().as_ref().to_vec(),
1761 true,
1762 )
1763 .await
1764 .unwrap();
1765 if sent.len() == n - 1 {
1766 break;
1767 }
1768 context.sleep(Duration::from_millis(100)).await;
1769 }
1770
1771 for i in 0..n {
1773 if i == restart_peer_idx {
1774 continue;
1775 }
1776 let sender = senders[i].as_mut().unwrap();
1777 loop {
1778 let sent = sender
1779 .send(
1780 Recipients::One(addresses[restart_peer_idx].clone()),
1781 peers[i].public_key().as_ref().to_vec(),
1782 true,
1783 )
1784 .await
1785 .unwrap();
1786 if sent.len() == 1 {
1787 break;
1788 }
1789 context.sleep(Duration::from_millis(100)).await;
1790 }
1791 }
1792
1793 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1795 let mut received = HashSet::new();
1796 while received.len() < n - 1 {
1797 let (sender, message): (ed25519::PublicKey, _) =
1798 restarted_receiver.recv().await.unwrap();
1799 assert_eq!(message, sender.as_ref());
1800 received.insert(sender);
1801 }
1802 }
1803 }
1804
1805 assert_no_rate_limiting(&context);
1806 });
1807 }
1808
1809 #[test_traced]
1810 fn test_simultaneous_peer_restart() {
1811 let base_port = 7700;
1812 let n = 5;
1813
1814 let executor = deterministic::Runner::default();
1815 executor.start(|context| async move {
1816 let peers: Vec<_> = (0..n)
1818 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1819 .collect();
1820 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1821
1822 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1824
1825 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1827 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1828 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1829
1830 for (i, peer) in peers.iter().enumerate() {
1832 let peer_context = context.with_label(&format!("peer_{i}"));
1833
1834 let mut bootstrappers = Vec::new();
1836 if i > 0 {
1837 bootstrappers.push((
1838 addresses[0].clone(),
1839 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1840 ));
1841 }
1842
1843 let config = Config::test(
1844 peer.clone(),
1845 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1846 bootstrappers,
1847 MAX_MESSAGE_SIZE,
1848 );
1849 let (mut network, mut oracle) =
1850 Network::new(peer_context.with_label("network"), config);
1851
1852 oracle.track(0, addresses.clone().try_into().unwrap()).await;
1854
1855 let (sender, receiver) =
1856 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1857 senders[i] = Some(sender);
1858 receivers[i] = Some(receiver);
1859
1860 let handle = network.start();
1861 handles[i] = Some(handle);
1862 }
1863
1864 for (i, sender) in senders.iter_mut().enumerate() {
1866 let sender = sender.as_mut().unwrap();
1867 loop {
1868 let sent = sender
1869 .send(
1870 Recipients::All,
1871 peers[i].public_key().as_ref().to_vec(),
1872 true,
1873 )
1874 .await
1875 .unwrap();
1876 if sent.len() == n - 1 {
1877 break;
1878 }
1879 context.sleep(Duration::from_millis(100)).await;
1880 }
1881 }
1882
1883 for receiver in receivers.iter_mut() {
1885 let receiver = receiver.as_mut().unwrap();
1886 let mut received = HashSet::new();
1887 while received.len() < n - 1 {
1888 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1889 assert_eq!(message, sender.as_ref());
1890 received.insert(sender);
1891 }
1892 }
1893
1894 let restart_peers: Vec<usize> = (1..n).collect();
1899 for &idx in &restart_peers {
1900 if let Some(handle) = handles[idx].take() {
1901 handle.abort();
1902 }
1903 senders[idx] = None;
1904 receivers[idx] = None;
1905 ports[idx] = base_port + 100 + idx as u16;
1907 }
1908
1909 context.sleep(Duration::from_secs(2)).await;
1911
1912 for &idx in &restart_peers {
1914 let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1915 let bootstrappers = vec![(
1916 addresses[0].clone(),
1917 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1918 )];
1919 let config = Config::test(
1920 peers[idx].clone(),
1921 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1922 bootstrappers,
1923 MAX_MESSAGE_SIZE,
1924 );
1925 let (mut network, mut oracle) =
1926 Network::new(peer_context.with_label("network"), config);
1927
1928 oracle.track(0, addresses.clone().try_into().unwrap()).await;
1930
1931 let (sender, receiver) =
1932 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1933 senders[idx] = Some(sender);
1934 receivers[idx] = Some(receiver);
1935
1936 let handle = network.start();
1937 handles[idx] = Some(handle);
1938 }
1939
1940 for (i, sender) in senders.iter_mut().enumerate() {
1942 let sender = sender.as_mut().unwrap();
1943 loop {
1944 let sent = sender
1945 .send(
1946 Recipients::All,
1947 peers[i].public_key().as_ref().to_vec(),
1948 true,
1949 )
1950 .await
1951 .unwrap();
1952 if sent.len() == n - 1 {
1953 break;
1954 }
1955 context.sleep(Duration::from_millis(100)).await;
1956 }
1957 }
1958
1959 for receiver in receivers.iter_mut() {
1961 let receiver = receiver.as_mut().unwrap();
1962 let mut received = HashSet::new();
1963 while received.len() < n - 1 {
1964 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1965 assert_eq!(message, sender.as_ref());
1966 received.insert(sender);
1967 }
1968 }
1969
1970 assert_no_rate_limiting(&context);
1971 });
1972 }
1973 #[test_traced]
1974 fn test_peer_restart_with_new_address_must_dial() {
1975 let base_port = 3600;
1976 let n: usize = 5;
1977
1978 let executor = deterministic::Runner::default();
1979 executor.start(|context| async move {
1980 let peers: Vec<_> = (0..n)
1982 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1983 .collect();
1984 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1985
1986 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1988 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1989 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1990
1991 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1993
1994 let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); let wrong_address_peer_idx = 2;
2000
2001 for (i, peer) in peers.iter().enumerate() {
2002 let peer_context = context.with_label(&format!("peer_{i}"));
2003 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
2004
2005 let dialable_addr: Ingress = if i == wrong_address_peer_idx {
2007 SocketAddr::new(wrong_ip, ports[i]).into()
2008 } else {
2009 listen_addr.into()
2010 };
2011
2012 let mut bootstrappers = Vec::new();
2014 if i > 0 {
2015 bootstrappers.push((
2016 addresses[0].clone(),
2017 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2018 ));
2019 }
2020
2021 let mut config =
2022 Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
2023 config.dialable = dialable_addr;
2024
2025 let (mut network, mut oracle) =
2026 Network::new(peer_context.with_label("network"), config);
2027
2028 oracle.track(0, addresses.clone().try_into().unwrap()).await;
2029
2030 let (sender, receiver) =
2031 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2032 senders[i] = Some(sender);
2033 receivers[i] = Some(receiver);
2034
2035 let handle = network.start();
2036 handles[i] = Some(handle);
2037 }
2038
2039 for (i, sender) in senders.iter_mut().enumerate() {
2041 let sender = sender.as_mut().unwrap();
2042 loop {
2043 let sent = sender
2044 .send(
2045 Recipients::All,
2046 peers[i].public_key().as_ref().to_vec(),
2047 true,
2048 )
2049 .await
2050 .unwrap();
2051 if sent.len() == n - 1 {
2052 break;
2053 }
2054 context.sleep(Duration::from_millis(100)).await;
2055 }
2056 }
2057
2058 for receiver in receivers.iter_mut() {
2060 let receiver = receiver.as_mut().unwrap();
2061 let mut received = HashSet::new();
2062 while received.len() < n - 1 {
2063 let (sender, message) = receiver.recv().await.unwrap();
2064 assert_eq!(message, sender.as_ref());
2065 received.insert(sender);
2066 }
2067 }
2068
2069 let restart_peer_idx = 1;
2075 let new_port = base_port + 100;
2076 ports[restart_peer_idx] = new_port;
2077
2078 if let Some(handle) = handles[restart_peer_idx].take() {
2080 handle.abort();
2081 }
2082 senders[restart_peer_idx] = None;
2083 receivers[restart_peer_idx] = None;
2084
2085 let peer_context = context.with_label(&format!("peer_{restart_peer_idx}_restarted"));
2087 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
2088 let bootstrappers = vec![(
2089 addresses[0].clone(),
2090 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2091 )];
2092
2093 let config = Config::test(
2094 peers[restart_peer_idx].clone(),
2095 listen_addr,
2096 bootstrappers,
2097 1_024 * 1_024,
2098 );
2099
2100 let (mut network, mut oracle) =
2101 Network::new(peer_context.with_label("network"), config);
2102
2103 oracle.track(0, addresses.clone().try_into().unwrap()).await;
2104
2105 let (sender, receiver) =
2106 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2107 senders[restart_peer_idx] = Some(sender);
2108 receivers[restart_peer_idx] = Some(receiver);
2109
2110 let handle = network.start();
2111 handles[restart_peer_idx] = Some(handle);
2112
2113 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
2119 loop {
2120 let sent = restarted_sender
2121 .send(
2122 Recipients::All,
2123 peers[restart_peer_idx].public_key().as_ref().to_vec(),
2124 true,
2125 )
2126 .await
2127 .unwrap();
2128 if sent.len() == n - 1 {
2129 break;
2130 }
2131 context.sleep(Duration::from_millis(100)).await;
2132 }
2133
2134 for i in 0..n {
2136 if i == restart_peer_idx {
2137 continue;
2138 }
2139 let sender = senders[i].as_mut().unwrap();
2140 loop {
2141 let sent = sender
2142 .send(
2143 Recipients::One(addresses[restart_peer_idx].clone()),
2144 peers[i].public_key().as_ref().to_vec(),
2145 true,
2146 )
2147 .await
2148 .unwrap();
2149 if sent.len() == 1 {
2150 break;
2151 }
2152 context.sleep(Duration::from_millis(100)).await;
2153 }
2154 }
2155
2156 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
2158 let mut received = HashSet::new();
2159 while received.len() < n - 1 {
2160 let (sender, message) = restarted_receiver.recv().await.unwrap();
2161 assert_eq!(message, sender.as_ref());
2162 received.insert(sender);
2163 }
2164
2165 assert_no_rate_limiting(&context);
2166 });
2167 }
2168
2169 #[test_traced]
2170 fn test_operations_after_shutdown_do_not_panic() {
2171 let executor = deterministic::Runner::default();
2172 executor.start(|context| async move {
2173 let peer = ed25519::PrivateKey::from_seed(0);
2174 let address = peer.public_key();
2175
2176 let peer_context = context.with_label("peer");
2177 let config = Config::test(
2178 peer.clone(),
2179 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2180 vec![],
2181 MAX_MESSAGE_SIZE,
2182 );
2183 let (mut network, mut oracle) =
2184 Network::new(peer_context.with_label("network"), config);
2185
2186 let (mut sender, _receiver) =
2188 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2189 let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
2190 oracle.track(0, peers.clone()).await;
2191
2192 let handle = network.start();
2194 handle.abort();
2195
2196 context.sleep(Duration::from_millis(100)).await;
2198
2199 oracle.track(1, peers.clone()).await;
2201 let _ = oracle.peer_set(0).await;
2202 let _ = oracle.subscribe().await;
2203 oracle.block(address.clone()).await;
2204
2205 let sent = sender
2207 .send(Recipients::All, address.as_ref().to_vec(), true)
2208 .await
2209 .unwrap();
2210 assert!(sent.is_empty());
2211 });
2212 }
2213
2214 fn clean_shutdown(seed: u64) {
2215 let cfg = deterministic::Config::default()
2216 .with_seed(seed)
2217 .with_timeout(Some(Duration::from_secs(30)));
2218 let executor = deterministic::Runner::new(cfg);
2219 executor.start(|context| async move {
2220 let peer = ed25519::PrivateKey::from_seed(0);
2221
2222 let peer_context = context.with_label("peer");
2223 let config = Config::test(
2224 peer.clone(),
2225 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2226 vec![],
2227 MAX_MESSAGE_SIZE,
2228 );
2229 let (mut network, mut oracle) =
2230 Network::new(peer_context.with_label("network"), config);
2231
2232 let (_, _) =
2234 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2235 let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
2236 oracle.track(0, peers).await;
2237
2238 let handle = network.start();
2240
2241 context.sleep(Duration::from_millis(100)).await;
2243
2244 let running_before = count_running_tasks(&context, "peer_network");
2246 assert!(
2247 running_before > 0,
2248 "at least one network task should be running"
2249 );
2250
2251 handle.abort();
2253 let _ = handle.await;
2254
2255 context.sleep(Duration::from_millis(100)).await;
2257
2258 let running_after = count_running_tasks(&context, "peer_network");
2260 assert_eq!(
2261 running_after, 0,
2262 "all network tasks should be stopped, but {running_after} still running"
2263 );
2264 });
2265 }
2266
2267 #[test_traced]
2268 fn test_clean_shutdown() {
2269 for seed in 0..25 {
2270 clean_shutdown(seed);
2271 }
2272 }
2273
2274 #[test]
2275 fn test_broadcast_slow_peer_no_blocking() {
2276 let executor = deterministic::Runner::timed(Duration::from_secs(5));
2277 executor.start(|context| async move {
2278 let cfg = RouterConfig { mailbox_size: 10 };
2280 let (router, mut mailbox, messenger) =
2281 RouterActor::<_, ed25519::PublicKey>::new(context.clone(), cfg);
2282
2283 let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2285 let _handle = router.start(channels);
2286
2287 let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2290 let (slow_low, _slow_low_rx) = mpsc::channel(10);
2291 let (slow_high, _slow_high_rx) = mpsc::channel(10);
2292 assert!(
2293 mailbox
2294 .ready(slow_peer.clone(), Relay::new(slow_low, slow_high))
2295 .await
2296 .is_some(),
2297 "Failed to register slow peer"
2298 );
2299
2300 let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2302 let (fast_low, mut fast_receiver) = mpsc::channel(100);
2303 let (fast_high, _fast_high_rx) = mpsc::channel(100);
2304 assert!(
2305 mailbox
2306 .ready(fast_peer.clone(), Relay::new(fast_low, fast_high))
2307 .await
2308 .is_some(),
2309 "Failed to register fast peer"
2310 );
2311
2312 let message = IoBuf::from(vec![0u8; 100]);
2313 let mut messenger = messenger;
2314
2315 for i in 0..10 {
2317 let sent = messenger
2318 .content(Recipients::All, 0, message.clone().into(), false)
2319 .await;
2320 assert_eq!(sent.len(), 2, "Broadcast {i} should reach both peers");
2321 }
2322
2323 let sent = messenger
2325 .content(Recipients::All, 0, message.into(), false)
2326 .await;
2327 assert!(
2328 sent.contains(&fast_peer),
2329 "Fast peer should receive message"
2330 );
2331
2332 for _ in 0..11 {
2334 assert!(fast_receiver.try_recv().is_ok());
2335 }
2336 });
2337 }
2338}