1mod actors;
210mod channels;
211mod config;
212mod metrics;
213mod network;
214mod types;
215
216use thiserror::Error;
217
218#[derive(Error, Debug)]
220pub enum Error {
221 #[error("message too large: {0}")]
222 MessageTooLarge(usize),
223 #[error("network closed")]
224 NetworkClosed,
225}
226
227pub use actors::tracker::Oracle;
228pub use channels::{Receiver, Sender};
229pub use config::{Bootstrapper, Config};
230pub use network::Network;
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::{
236 authenticated::{
237 discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
238 relay::Relay,
239 },
240 Blocker, Ingress, Manager, Receiver, Recipients, Sender,
241 };
242 use bytes::Bytes;
243 use commonware_cryptography::{ed25519, Signer as _};
244 use commonware_macros::{select, select_loop, test_group, test_traced};
245 use commonware_runtime::{
246 count_running_tasks, deterministic, tokio, Clock, Handle, Metrics, Network as RNetwork,
247 Quota, Resolver, Runner, Spawner,
248 };
249 use commonware_utils::{hostname, ordered::Set, TryCollect, NZU32};
250 use futures::{channel::mpsc, SinkExt, StreamExt};
251 use rand_core::{CryptoRngCore, RngCore};
252 use std::{
253 collections::HashSet,
254 net::{IpAddr, Ipv4Addr, SocketAddr},
255 time::Duration,
256 };
257
258 #[derive(Copy, Clone)]
259 enum Mode {
260 All,
261 Some,
262 One,
263 }
264
265 const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
267
268 fn assert_no_rate_limiting(context: &impl Metrics) {
277 let metrics = context.encode();
278 assert!(
279 !metrics.contains("messages_rate_limited_total{"),
280 "no messages should be rate limited: {metrics}"
281 );
282 }
283
284 async fn run_network(
289 context: impl Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
290 max_message_size: u32,
291 base_port: u16,
292 n: usize,
293 mode: Mode,
294 ) {
295 let mut peers = Vec::new();
297 for i in 0..n {
298 peers.push(ed25519::PrivateKey::from_seed(i as u64));
299 }
300 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
301
302 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
304 for (i, peer) in peers.iter().enumerate() {
305 let context = context.with_label(&format!("peer_{i}"));
307
308 let port = base_port + i as u16;
310
311 let mut bootstrappers = Vec::new();
313 if i > 0 {
314 bootstrappers.push((
315 addresses[0].clone(),
316 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
317 ));
318 }
319
320 let signer = peer.clone();
322 let config = Config::test(
323 signer.clone(),
324 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
325 bootstrappers,
326 max_message_size,
327 );
328 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
329
330 oracle
332 .update(0, addresses.clone().try_into().unwrap())
333 .await;
334
335 let (mut sender, mut receiver) =
337 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
338
339 network.start();
341
342 context.with_label("agent").spawn({
344 let mut complete_sender = complete_sender.clone();
345 let addresses = addresses.clone();
346 move |context| async move {
347 let receiver = context.with_label("receiver").spawn(move |_| async move {
349 let mut received = HashSet::new();
351 while received.len() < n - 1 {
352 let (sender, message) = receiver.recv().await.unwrap();
354 assert_eq!(sender.as_ref(), message.as_ref());
355
356 received.insert(sender);
358 }
359 complete_sender.send(()).await.unwrap();
360
361 loop {
363 receiver.recv().await.unwrap();
364 }
365 });
366
367 let msg = signer.public_key();
369 let sender = context
370 .with_label("sender")
371 .spawn(move |context| async move {
372 loop {
374 match mode {
375 Mode::One => {
376 for (j, recipient) in addresses.iter().enumerate() {
377 if i == j {
379 continue;
380 }
381
382 loop {
384 let sent = sender
385 .send(
386 Recipients::One(recipient.clone()),
387 msg.as_ref(),
388 true,
389 )
390 .await
391 .unwrap();
392 if sent.len() != 1 {
393 context.sleep(Duration::from_millis(100)).await;
394 continue;
395 }
396 assert_eq!(&sent[0], recipient);
397 break;
398 }
399 }
400 }
401 Mode::Some => {
402 let mut recipients = addresses.clone();
404 recipients.remove(i);
405 recipients.sort();
406
407 loop {
409 let mut sent = sender
410 .send(
411 Recipients::Some(recipients.clone()),
412 msg.as_ref(),
413 true,
414 )
415 .await
416 .unwrap();
417 if sent.len() != n - 1 {
418 context.sleep(Duration::from_millis(100)).await;
419 continue;
420 }
421
422 sent.sort();
424 assert_eq!(sent, recipients);
425 break;
426 }
427 }
428 Mode::All => {
429 let mut recipients = addresses.clone();
431 recipients.remove(i);
432 recipients.sort();
433
434 loop {
436 let mut sent = sender
437 .send(Recipients::All, msg.as_ref(), true)
438 .await
439 .unwrap();
440 if sent.len() != n - 1 {
441 context.sleep(Duration::from_millis(100)).await;
442 continue;
443 }
444
445 sent.sort();
447 assert_eq!(sent, recipients);
448 break;
449 }
450 }
451 };
452
453 context.sleep(Duration::from_secs(10)).await;
455 }
456 });
457
458 select! {
460 receiver = receiver => {
461 panic!("receiver exited: {receiver:?}");
462 },
463 sender = sender => {
464 panic!("sender exited: {sender:?}");
465 },
466 }
467 }
468 });
469 }
470
471 for _ in 0..n {
473 complete_receiver.next().await.unwrap();
474 }
475
476 assert_no_rate_limiting(&context);
478 }
479
480 fn run_deterministic_test(seed: u64, mode: Mode) {
481 const NUM_PEERS: usize = 25;
483 const BASE_PORT: u16 = 3000;
484
485 let executor = deterministic::Runner::seeded(seed);
487 let state = executor.start(|context| async move {
488 run_network(
489 context.clone(),
490 MAX_MESSAGE_SIZE,
491 BASE_PORT,
492 NUM_PEERS,
493 mode,
494 )
495 .await;
496 context.auditor().state()
497 });
498
499 let executor = deterministic::Runner::seeded(seed);
501 let state2 = executor.start(|context| async move {
502 run_network(
503 context.clone(),
504 MAX_MESSAGE_SIZE,
505 BASE_PORT,
506 NUM_PEERS,
507 mode,
508 )
509 .await;
510 context.auditor().state()
511 });
512 assert_eq!(state, state2);
513 }
514
515 #[test_group("slow")]
516 #[test_traced]
517 fn test_determinism_one() {
518 for i in 0..10 {
519 run_deterministic_test(i, Mode::One);
520 }
521 }
522
523 #[test_group("slow")]
524 #[test_traced]
525 fn test_determinism_some() {
526 for i in 0..10 {
527 run_deterministic_test(i, Mode::Some);
528 }
529 }
530
531 #[test_group("slow")]
532 #[test_traced]
533 fn test_determinism_all() {
534 for i in 0..10 {
535 run_deterministic_test(i, Mode::All);
536 }
537 }
538
539 #[test_traced]
540 fn test_tokio_connectivity() {
541 let executor = tokio::Runner::default();
542 executor.start(|context| async move {
543 let base_port = 3000;
544 let n = 10;
545 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
546 });
547 }
548
549 #[test_traced]
550 fn test_multi_index_oracle() {
551 let base_port = 3000;
553 let n: usize = 100;
554
555 let executor = deterministic::Runner::default();
557 executor.start(|context| async move {
558 let mut peers = Vec::new();
560 for i in 0..n {
561 peers.push(ed25519::PrivateKey::from_seed(i as u64));
562 }
563 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
564
565 let mut waiters = Vec::new();
567 for (i, peer) in peers.iter().enumerate() {
568 let context = context.with_label(&format!("peer_{i}"));
570
571 let port = base_port + i as u16;
573
574 let mut bootstrappers = Vec::new();
576 if i > 0 {
577 bootstrappers.push((
578 addresses[0].clone(),
579 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
580 ));
581 }
582
583 let signer = peer.clone();
585 let config = Config::test(
586 signer.clone(),
587 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
588 bootstrappers,
589 1_024 * 1_024, );
591 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
592
593 oracle
595 .update(0, [addresses[0].clone()].try_into().unwrap())
596 .await;
597 oracle
598 .update(
599 1,
600 [addresses[1].clone(), addresses[2].clone()]
601 .try_into()
602 .unwrap(),
603 )
604 .await;
605 oracle
606 .update(2, addresses.iter().skip(2).cloned().try_collect().unwrap())
607 .await;
608
609 let (mut sender, mut receiver) =
611 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
612
613 network.start();
615
616 let handler = context
618 .with_label("agent")
619 .spawn(move |context| async move {
620 if i == 0 {
621 let msg = signer.public_key();
623 loop {
624 if sender
625 .send(Recipients::All, msg.as_ref(), true)
626 .await
627 .unwrap()
628 .len()
629 == n - 1
630 {
631 break;
632 }
633
634 context.sleep(Duration::from_millis(100)).await;
636 }
637 } else {
638 let (sender, message) = receiver.recv().await.unwrap();
640 assert_eq!(sender.as_ref(), message.as_ref());
641 }
642 });
643
644 waiters.push(handler);
646 }
647
648 for waiter in waiters.into_iter().rev() {
650 waiter.await.unwrap();
651 }
652
653 assert_no_rate_limiting(&context);
655 });
656 }
657
658 #[test_traced]
659 fn test_message_too_large() {
660 let base_port = 3000;
662 let n: usize = 2;
663
664 let executor = deterministic::Runner::seeded(0);
666 executor.start(|mut context| async move {
667 let mut peers = Vec::new();
669 for i in 0..n {
670 peers.push(ed25519::PrivateKey::from_seed(i as u64));
671 }
672 let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
673
674 let signer = peers[0].clone();
676 let config = Config::test(
677 signer.clone(),
678 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
679 Vec::new(),
680 1_024 * 1_024, );
682 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
683
684 oracle.update(0, addresses.clone()).await;
686
687 let (mut sender, _) =
689 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
690
691 network.start();
693
694 let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
697
698 let recipient = Recipients::One(addresses[1].clone());
700 let result = sender.send(recipient, &msg[..], true).await;
701 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
702 });
703 }
704
705 #[test_traced]
706 fn test_rate_limiting() {
707 let base_port = 3000;
709 let n: usize = 2;
710
711 let executor = deterministic::Runner::seeded(0);
713 executor.start(|context| async move {
714 let mut peers = Vec::new();
716 for i in 0..n {
717 peers.push(ed25519::PrivateKey::from_seed(i as u64));
718 }
719 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
720 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
721 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
722
723 let signer0 = peers[0].clone();
725 let config0 = Config::test(
726 signer0.clone(),
727 socket0,
728 vec![(peers[1].public_key(), socket1.into())],
729 1_024 * 1_024, );
731 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
732 oracle0
733 .update(0, addresses.clone().try_into().unwrap())
734 .await;
735 let (mut sender0, _receiver0) =
736 network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
737 network0.start();
738
739 let signer1 = peers[1].clone();
741 let config1 = Config::test(
742 signer1.clone(),
743 socket1,
744 vec![(peers[0].public_key(), socket0.into())],
745 1_024 * 1_024, );
747 let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
748 oracle1
749 .update(0, addresses.clone().try_into().unwrap())
750 .await;
751 let (_sender1, _receiver1) =
752 network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
753 network1.start();
754
755 let msg = vec![0u8; 1024]; loop {
758 let sent = sender0
760 .send(Recipients::One(addresses[1].clone()), &msg[..], true)
761 .await
762 .unwrap();
763 if !sent.is_empty() {
764 break;
765 }
766
767 context.sleep(Duration::from_mins(1)).await
770 }
771
772 let sent = sender0
776 .send(Recipients::One(addresses[1].clone()), &msg[..], true)
777 .await
778 .unwrap();
779 assert!(sent.is_empty());
780
781 for _ in 0..10 {
783 assert_no_rate_limiting(&context);
784 context.sleep(Duration::from_millis(100)).await;
785 }
786 });
787 }
788
789 #[test_traced]
790 fn test_unordered_peer_sets() {
791 let (n, base_port) = (10, 3000);
792 let executor = deterministic::Runner::default();
793 executor.start(|context| async move {
794 let mut peers_and_sks = Vec::new();
796 for i in 0..n {
797 let sk = ed25519::PrivateKey::from_seed(i as u64);
798 let pk = sk.public_key();
799 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
800 peers_and_sks.push((sk, pk, addr));
801 }
802 let peer0 = peers_and_sks[0].clone();
803 let config = Config::test(
804 peer0.0,
805 peer0.2,
806 vec![(peer0.1.clone(), peer0.2.into())],
807 1_024 * 1_024,
808 );
809 let (network, mut oracle) = Network::new(context.with_label("network"), config);
810 network.start();
811
812 let mut subscription = oracle.subscribe().await;
814
815 let set10: Set<_> = peers_and_sks
817 .iter()
818 .take(2)
819 .map(|(_, pk, _)| pk.clone())
820 .try_collect()
821 .unwrap();
822 oracle.update(10, set10.clone()).await;
823 let (id, new, all) = subscription.next().await.unwrap();
824 assert_eq!(id, 10);
825 assert_eq!(new, set10);
826 assert_eq!(all, set10);
827
828 let set9: Set<_> = peers_and_sks
830 .iter()
831 .skip(2)
832 .map(|(_, pk, _)| pk.clone())
833 .try_collect()
834 .unwrap();
835 oracle.update(9, set9.clone()).await;
836
837 let set11: Set<_> = peers_and_sks
839 .iter()
840 .skip(4)
841 .map(|(_, pk, _)| pk.clone())
842 .try_collect()
843 .unwrap();
844 oracle.update(11, set11.clone()).await;
845 let (id, new, all) = subscription.next().await.unwrap();
846 assert_eq!(id, 11);
847 assert_eq!(new, set11);
848 let all_keys: Set<_> = set10
849 .into_iter()
850 .chain(set11.into_iter())
851 .try_collect()
852 .unwrap();
853 assert_eq!(all, all_keys);
854 });
855 }
856
857 #[test_traced]
858 fn test_graceful_shutdown() {
859 let base_port = 3000;
860 let n: usize = 5;
861
862 let executor = deterministic::Runner::default();
863 executor.start(|context| async move {
864 let mut peers = Vec::new();
866 for i in 0..n {
867 peers.push(ed25519::PrivateKey::from_seed(i as u64));
868 }
869 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
870
871 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
873 for (i, peer) in peers.iter().enumerate() {
874 let peer_context = context.with_label(&format!("peer_{i}"));
875 let port = base_port + i as u16;
876
877 let mut bootstrappers = Vec::new();
879 if i > 0 {
880 bootstrappers.push((
881 addresses[0].clone(),
882 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
883 ));
884 }
885
886 let signer = peer.clone();
887 let config = Config::test(
888 signer.clone(),
889 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
890 bootstrappers,
891 1_024 * 1_024, );
893 let (mut network, mut oracle) =
894 Network::new(peer_context.with_label("network"), config);
895
896 oracle
898 .update(0, addresses.clone().try_into().unwrap())
899 .await;
900
901 let (mut sender, mut receiver) =
902 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
903 network.start();
904
905 peer_context.with_label("agent").spawn({
906 let mut complete_sender = complete_sender.clone();
907 move |context| async move {
908 let expected_connections = if i == 0 { n - 1 } else { 1 };
910
911 let msg = signer.public_key();
913 loop {
914 let sent = sender
915 .send(Recipients::All, msg.as_ref(), true)
916 .await
917 .unwrap();
918 if sent.len() >= expected_connections {
919 break;
920 }
921 context.sleep(Duration::from_millis(100)).await;
922 }
923
924 complete_sender.send(()).await.unwrap();
926
927 select_loop! {
929 context,
930 on_stopped => {},
931 result = receiver.recv() => {
932 if result.is_err() {
933 break;
935 }
936 }
937 }
938 }
939 });
940 }
941
942 for _ in 0..n {
944 complete_receiver.next().await.unwrap();
945 }
946
947 let metrics_before = context.encode();
949 let is_running = |name: &str| -> bool {
950 metrics_before.lines().any(|line| {
951 line.starts_with("runtime_tasks_running{")
952 && line.contains(&format!("name=\"{name}\""))
953 && line.contains("kind=\"Task\"")
954 && line.trim_end().ends_with(" 1")
955 })
956 };
957 for i in 0..n {
958 let prefix = format!("peer_{i}_network");
959 assert!(
960 is_running(&format!("{prefix}_tracker")),
961 "peer_{i} tracker should be running"
962 );
963 assert!(
964 is_running(&format!("{prefix}_router")),
965 "peer_{i} router should be running"
966 );
967 assert!(
968 is_running(&format!("{prefix}_spawner")),
969 "peer_{i} spawner should be running"
970 );
971 assert!(
972 is_running(&format!("{prefix}_listener")),
973 "peer_{i} listener should be running"
974 );
975 assert!(
976 is_running(&format!("{prefix}_dialer")),
977 "peer_{i} dialer should be running"
978 );
979 }
980
981 let shutdown_context = context.clone();
984 context.with_label("shutdown").spawn(move |_| async move {
985 let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
987
988 assert!(
990 result.is_ok(),
991 "graceful shutdown should complete: {result:?}"
992 );
993 });
994
995 context.stopped().await.unwrap();
997
998 context.sleep(Duration::from_millis(100)).await;
1000
1001 let metrics_after = context.encode();
1003 let is_stopped = |name: &str| -> bool {
1004 metrics_after.lines().any(|line| {
1005 line.starts_with("runtime_tasks_running{")
1006 && line.contains(&format!("name=\"{name}\""))
1007 && line.contains("kind=\"Task\"")
1008 && line.trim_end().ends_with(" 0")
1009 })
1010 };
1011 for i in 0..n {
1012 let prefix = format!("peer_{i}_network");
1013 assert!(
1014 is_stopped(&format!("{prefix}_tracker")),
1015 "peer_{i} tracker should be stopped"
1016 );
1017 assert!(
1018 is_stopped(&format!("{prefix}_router")),
1019 "peer_{i} router should be stopped"
1020 );
1021 assert!(
1022 is_stopped(&format!("{prefix}_spawner")),
1023 "peer_{i} spawner should be stopped"
1024 );
1025 assert!(
1026 is_stopped(&format!("{prefix}_listener")),
1027 "peer_{i} listener should be stopped"
1028 );
1029 assert!(
1030 is_stopped(&format!("{prefix}_dialer")),
1031 "peer_{i} dialer should be stopped"
1032 );
1033 }
1034 });
1035 }
1036
1037 #[test_traced]
1038 fn test_subscription_includes_self_when_registered() {
1039 let base_port = 3000;
1040 let executor = deterministic::Runner::default();
1041 executor.start(|context| async move {
1042 let self_sk = ed25519::PrivateKey::from_seed(0);
1044 let self_pk = self_sk.public_key();
1045 let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1046
1047 let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
1048
1049 let config = Config::test(
1051 self_sk,
1052 self_addr,
1053 vec![], 1_024 * 1_024,
1055 );
1056 let (network, mut oracle) = Network::new(context.with_label("network"), config);
1057 network.start();
1058
1059 let mut subscription = oracle.subscribe().await;
1061
1062 let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
1064 oracle.update(1, peer_set.clone()).await;
1065
1066 let (id, new, all) = subscription.next().await.unwrap();
1068 assert_eq!(id, 1);
1069 assert_eq!(new.len(), 1);
1070 assert_eq!(all.len(), 1);
1071
1072 assert!(
1074 new.position(&self_pk).is_none(),
1075 "new set should not include self"
1076 );
1077 assert!(
1078 new.position(&other_pk).is_some(),
1079 "new set should include other"
1080 );
1081
1082 assert!(
1084 all.position(&self_pk).is_none(),
1085 "tracked peers should not include self"
1086 );
1087 assert!(
1088 all.position(&other_pk).is_some(),
1089 "tracked peers should include other"
1090 );
1091
1092 let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
1094 oracle.update(2, peer_set.clone()).await;
1095
1096 let (id, new, all) = subscription.next().await.unwrap();
1098 assert_eq!(id, 2);
1099 assert_eq!(new.len(), 2);
1100 assert_eq!(all.len(), 2);
1101
1102 assert!(
1104 new.position(&self_pk).is_some(),
1105 "new set should include self"
1106 );
1107 assert!(
1108 new.position(&other_pk).is_some(),
1109 "new set should include other"
1110 );
1111
1112 assert!(
1114 all.position(&self_pk).is_some(),
1115 "tracked peers should include self"
1116 );
1117 assert!(
1118 all.position(&other_pk).is_some(),
1119 "tracked peers should include other"
1120 );
1121 });
1122 }
1123
1124 #[test_traced]
1125 fn test_dns_bootstrapper_resolution() {
1126 let base_port = 3000;
1127 let n: usize = 3;
1128
1129 let executor = deterministic::Runner::default();
1130 executor.start(|context| async move {
1131 let mut peers = Vec::new();
1133 for i in 0..n {
1134 peers.push(ed25519::PrivateKey::from_seed(i as u64));
1135 }
1136 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1137
1138 let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1140 context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1141
1142 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1144 for (i, peer) in peers.iter().enumerate() {
1145 let context = context.with_label(&format!("peer_{i}"));
1146 let port = base_port + i as u16;
1147
1148 let bootstrappers = if i > 0 {
1150 vec![(
1151 addresses[0].clone(),
1152 Ingress::Dns {
1153 host: hostname!("boot.local"),
1154 port: base_port,
1155 },
1156 )]
1157 } else {
1158 vec![]
1159 };
1160
1161 let signer = peer.clone();
1163 let config = Config::test(
1164 signer.clone(),
1165 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1166 bootstrappers,
1167 1_024 * 1_024,
1168 );
1169 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1170
1171 oracle
1173 .update(0, addresses.clone().try_into().unwrap())
1174 .await;
1175
1176 let (mut sender, mut receiver) =
1178 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1179
1180 network.start();
1181
1182 context.with_label("agent").spawn({
1184 let mut complete_sender = complete_sender.clone();
1185 let addresses = addresses.clone();
1186 move |context| async move {
1187 let receiver = context.with_label("receiver").spawn(move |_| async move {
1189 let mut received = HashSet::new();
1190 while received.len() < n - 1 {
1191 let (sender, message) = receiver.recv().await.unwrap();
1192 assert_eq!(sender.as_ref(), message.as_ref());
1193 received.insert(sender);
1194 }
1195 complete_sender.send(()).await.unwrap();
1196
1197 loop {
1198 receiver.recv().await.unwrap();
1199 }
1200 });
1201
1202 let msg = signer.public_key();
1204 let sender =
1205 context
1206 .with_label("sender")
1207 .spawn(move |context| async move {
1208 loop {
1209 let mut recipients = addresses.clone();
1210 recipients.remove(i);
1211 recipients.sort();
1212
1213 loop {
1214 let mut sent = sender
1215 .send(Recipients::All, msg.as_ref(), true)
1216 .await
1217 .unwrap();
1218 if sent.len() != n - 1 {
1219 context.sleep(Duration::from_millis(100)).await;
1220 continue;
1221 }
1222 sent.sort();
1223 assert_eq!(sent, recipients);
1224 break;
1225 }
1226
1227 context.sleep(Duration::from_secs(10)).await;
1228 }
1229 });
1230
1231 select! {
1232 receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1233 sender = sender => { panic!("sender exited: {sender:?}") },
1234 }
1235 }
1236 });
1237 }
1238
1239 for _ in 0..n {
1241 complete_receiver.next().await.unwrap();
1242 }
1243
1244 assert_no_rate_limiting(&context);
1245 });
1246 }
1247
1248 #[test_traced]
1249 fn test_dns_resolution_failure_then_success() {
1250 let base_port = 3100;
1251
1252 let executor = deterministic::Runner::default();
1253 executor.start(|context| async move {
1254 let peer0 = ed25519::PrivateKey::from_seed(0);
1256 let peer1 = ed25519::PrivateKey::from_seed(1);
1257 let addresses = vec![peer0.public_key(), peer1.public_key()];
1258
1259 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1260 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1261
1262 let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1266 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1267 oracle0
1268 .update(0, addresses.clone().try_into().unwrap())
1269 .await;
1270 let (mut sender0, mut receiver0) =
1271 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1272 network0.start();
1273
1274 let config1 = Config::test(
1276 peer1.clone(),
1277 socket1,
1278 vec![(
1279 peer0.public_key(),
1280 Ingress::Dns {
1281 host: hostname!("boot.local"),
1282 port: base_port,
1283 },
1284 )],
1285 1_024 * 1_024,
1286 );
1287 let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1288 oracle1
1289 .update(0, addresses.clone().try_into().unwrap())
1290 .await;
1291 let (mut sender1, mut receiver1) =
1292 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1293 network1.start();
1294
1295 context.sleep(Duration::from_secs(2)).await;
1297
1298 let sent = sender0
1300 .send(Recipients::One(peer1.public_key()), b"test".as_ref(), true)
1301 .await
1302 .unwrap();
1303 assert!(sent.is_empty(), "should not be connected yet");
1304
1305 context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
1307
1308 let pk0 = peer0.public_key();
1310 let pk1 = peer1.public_key();
1311 let msg0 = pk0.to_vec();
1312 let msg1 = pk1.to_vec();
1313
1314 let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
1316 let mut done0 = done_sender.clone();
1317 let pk1_clone = pk1.clone();
1318 context.with_label("recv0").spawn(move |_| async move {
1319 let (sender, message) = receiver0.recv().await.unwrap();
1320 assert_eq!(sender, pk1_clone);
1321 assert_eq!(message.as_ref(), msg1.as_slice());
1322 done0.send(()).await.unwrap();
1323 });
1324 let mut done1 = done_sender.clone();
1325 let pk0_clone = pk0.clone();
1326 context.with_label("recv1").spawn(move |_| async move {
1327 let (sender, message) = receiver1.recv().await.unwrap();
1328 assert_eq!(sender, pk0_clone);
1329 assert_eq!(message.as_ref(), msg0.as_slice());
1330 done1.send(()).await.unwrap();
1331 });
1332
1333 context.with_label("sender").spawn({
1335 let pk0 = pk0.clone();
1336 let pk1 = pk1.clone();
1337 move |context| async move {
1338 loop {
1339 let sent0 = sender0
1340 .send(Recipients::One(pk1.clone()), pk0.as_ref(), true)
1341 .await
1342 .unwrap();
1343 let sent1 = sender1
1344 .send(Recipients::One(pk0.clone()), pk1.as_ref(), true)
1345 .await
1346 .unwrap();
1347 if !sent0.is_empty() && !sent1.is_empty() {
1348 break;
1349 }
1350 context.sleep(Duration::from_millis(100)).await;
1351 }
1352 }
1353 });
1354
1355 done_receiver.next().await.unwrap();
1357 done_receiver.next().await.unwrap();
1358 });
1359 }
1360
1361 fn run_dns_connectivity(seed: u64) -> String {
1363 let base_port = 3400;
1364 let n: usize = 3;
1365
1366 let executor = deterministic::Runner::seeded(seed);
1367 executor.start(|context| async move {
1368 let mut peers = Vec::new();
1370 for i in 0..n {
1371 peers.push(ed25519::PrivateKey::from_seed(i as u64));
1372 }
1373 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1374
1375 let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1377 context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1378
1379 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1381 for (i, peer) in peers.iter().enumerate() {
1382 let context = context.with_label(&format!("peer_{i}"));
1383 let port = base_port + i as u16;
1384
1385 let bootstrappers = if i > 0 {
1387 vec![(
1388 addresses[0].clone(),
1389 Ingress::Dns {
1390 host: hostname!("boot.local"),
1391 port: base_port,
1392 },
1393 )]
1394 } else {
1395 vec![]
1396 };
1397
1398 let signer = peer.clone();
1399 let config = Config::test(
1400 signer.clone(),
1401 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1402 bootstrappers,
1403 1_024 * 1_024,
1404 );
1405 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1406 oracle
1407 .update(0, addresses.clone().try_into().unwrap())
1408 .await;
1409 let (mut sender, mut receiver) =
1410 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1411 network.start();
1412
1413 context.with_label("agent").spawn({
1414 let mut complete_sender = complete_sender.clone();
1415 let addresses = addresses.clone();
1416 move |context| async move {
1417 let receiver = context.with_label("receiver").spawn(move |_| async move {
1418 let mut received = HashSet::new();
1419 while received.len() < n - 1 {
1420 let (sender, message) = receiver.recv().await.unwrap();
1421 assert_eq!(sender.as_ref(), message.as_ref());
1422 received.insert(sender);
1423 }
1424 complete_sender.send(()).await.unwrap();
1425 loop {
1426 receiver.recv().await.unwrap();
1427 }
1428 });
1429
1430 let msg = signer.public_key();
1431 let sender =
1432 context
1433 .with_label("sender")
1434 .spawn(move |context| async move {
1435 loop {
1436 let mut recipients = addresses.clone();
1437 recipients.remove(i);
1438 recipients.sort();
1439
1440 loop {
1441 let mut sent = sender
1442 .send(Recipients::All, msg.as_ref(), true)
1443 .await
1444 .unwrap();
1445 if sent.len() != n - 1 {
1446 context.sleep(Duration::from_millis(100)).await;
1447 continue;
1448 }
1449 sent.sort();
1450 assert_eq!(sent, recipients);
1451 break;
1452 }
1453
1454 context.sleep(Duration::from_secs(10)).await;
1455 }
1456 });
1457
1458 select! {
1459 receiver = receiver => { panic!("receiver exited: {receiver:?}") },
1460 sender = sender => { panic!("sender exited: {sender:?}") },
1461 }
1462 }
1463 });
1464 }
1465
1466 for _ in 0..n {
1467 complete_receiver.next().await.unwrap();
1468 }
1469
1470 context.auditor().state()
1471 })
1472 }
1473
1474 #[test_traced]
1475 fn test_dns_resolution_determinism() {
1476 let state1 = run_dns_connectivity(42);
1478 let state2 = run_dns_connectivity(42);
1479 assert_eq!(state1, state2, "DNS resolution should be deterministic");
1480 }
1481
1482 #[test_traced]
1483 fn test_dns_resolving_to_private_ip_not_dialed() {
1484 let base_port = 3300;
1487 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1488 executor.start(|context| async move {
1489 let peer0 = ed25519::PrivateKey::from_seed(0);
1490 let peer1 = ed25519::PrivateKey::from_seed(1);
1491
1492 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1493 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1494
1495 context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
1497
1498 let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1499
1500 let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1502 config0.allow_private_ips = true;
1503 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1504 oracle0
1505 .update(0, addresses.clone().try_into().unwrap())
1506 .await;
1507 let (_sender0, mut receiver0) =
1508 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1509 network0.start();
1510
1511 let bootstrappers = vec![(
1513 peer0.public_key(),
1514 Ingress::Dns {
1515 host: hostname!("boot.local"),
1516 port: base_port,
1517 },
1518 )];
1519 let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
1520 config1.allow_private_ips = false; let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1522 oracle1
1523 .update(0, addresses.clone().try_into().unwrap())
1524 .await;
1525 let (mut sender1, _receiver1) =
1526 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1527 network1.start();
1528
1529 context.sleep(Duration::from_secs(5)).await;
1531
1532 let sent = sender1
1534 .send(Recipients::All, peer1.public_key().as_ref(), true)
1535 .await
1536 .unwrap();
1537 assert!(
1538 sent.is_empty(),
1539 "peer 1 should not have connected to peer 0 (private IP)"
1540 );
1541
1542 select! {
1544 msg = receiver0.recv() => {
1545 panic!("peer 0 should not have received any message, got: {msg:?}");
1546 },
1547 _ = context.sleep(Duration::from_secs(1)) => {
1548 }
1550 }
1551 });
1552 }
1553
1554 #[test_traced]
1555 fn test_dns_mixed_ips_connectivity() {
1556 for seed in 0..25 {
1562 let base_port = 3400;
1563
1564 let cfg = deterministic::Config::default()
1565 .with_seed(seed)
1566 .with_timeout(Some(Duration::from_secs(120)));
1567 let executor = deterministic::Runner::new(cfg);
1568 executor.start(|context| async move {
1569 let peer0 = ed25519::PrivateKey::from_seed(0);
1570 let peer1 = ed25519::PrivateKey::from_seed(1);
1571
1572 let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1573 let socket0 = SocketAddr::new(good_ip, base_port);
1574 let socket1 = SocketAddr::new(good_ip, base_port + 1);
1575
1576 let mut all_ips0: Vec<IpAddr> = (1..=3)
1578 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1579 .collect();
1580 all_ips0.push(good_ip);
1581 context.resolver_register("peer-0.local", Some(all_ips0));
1582
1583 let mut all_ips1: Vec<IpAddr> = (1..=3)
1584 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1585 .collect();
1586 all_ips1.push(good_ip);
1587 context.resolver_register("peer-1.local", Some(all_ips1));
1588
1589 let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1590
1591 let bootstrappers0 = vec![(
1593 peer1.public_key(),
1594 Ingress::Dns {
1595 host: hostname!("peer-1.local"),
1596 port: base_port + 1,
1597 },
1598 )];
1599 let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
1600 let (mut network0, mut oracle0) =
1601 Network::new(context.with_label("peer_0"), config0);
1602 oracle0
1603 .update(0, addresses.clone().try_into().unwrap())
1604 .await;
1605 let (_sender0, mut receiver0) =
1606 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1607 network0.start();
1608
1609 let bootstrappers1 = vec![(
1611 peer0.public_key(),
1612 Ingress::Dns {
1613 host: hostname!("peer-0.local"),
1614 port: base_port,
1615 },
1616 )];
1617 let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
1618 let (mut network1, mut oracle1) =
1619 Network::new(context.with_label("peer_1"), config1);
1620 oracle1
1621 .update(0, addresses.clone().try_into().unwrap())
1622 .await;
1623 let (mut sender1, _receiver1) =
1624 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1625 network1.start();
1626
1627 let pk0 = peer0.public_key();
1629 loop {
1630 let sent = sender1
1631 .send(
1632 Recipients::One(pk0.clone()),
1633 peer1.public_key().as_ref(),
1634 true,
1635 )
1636 .await
1637 .unwrap();
1638 if !sent.is_empty() {
1639 break;
1640 }
1641 context.sleep(Duration::from_millis(100)).await;
1642 }
1643
1644 let (sender, msg) = receiver0.recv().await.unwrap();
1646 assert_eq!(sender, peer1.public_key());
1647 assert_eq!(msg.as_ref(), peer1.public_key().as_ref());
1648 });
1649 }
1650 }
1651
1652 #[test_traced]
1653 fn test_many_peer_restart_with_new_address() {
1654 let base_port = 7500;
1655 let n = 5;
1656
1657 let executor = deterministic::Runner::default();
1658 executor.start(|context| async move {
1659 let peers: Vec<_> = (0..n)
1661 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1662 .collect();
1663 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1664
1665 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1667 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1668 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1669
1670 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1672
1673 for (i, peer) in peers.iter().enumerate() {
1675 let peer_context = context.with_label(&format!("peer_{i}"));
1676
1677 let mut bootstrappers = Vec::new();
1679 if i > 0 {
1680 bootstrappers.push((
1681 addresses[0].clone(),
1682 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1683 ));
1684 }
1685
1686 let config = Config::test(
1687 peer.clone(),
1688 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1689 bootstrappers,
1690 MAX_MESSAGE_SIZE,
1691 );
1692 let (mut network, mut oracle) =
1693 Network::new(peer_context.with_label("network"), config);
1694
1695 oracle
1697 .update(0, addresses.clone().try_into().unwrap())
1698 .await;
1699
1700 let (sender, receiver) =
1701 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1702 senders[i] = Some(sender);
1703 receivers[i] = Some(receiver);
1704
1705 let handle = network.start();
1706 handles[i] = Some(handle);
1707 }
1708
1709 for (i, sender) in senders.iter_mut().enumerate() {
1711 let sender = sender.as_mut().unwrap();
1712 loop {
1713 let sent = sender
1714 .send(Recipients::All, peers[i].public_key().as_ref(), true)
1715 .await
1716 .unwrap();
1717 if sent.len() == n - 1 {
1718 break;
1719 }
1720 context.sleep(Duration::from_millis(100)).await;
1721 }
1722 }
1723
1724 for receiver in receivers.iter_mut() {
1726 let receiver = receiver.as_mut().unwrap();
1727 let mut received = HashSet::new();
1728 while received.len() < n - 1 {
1729 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1730 assert_eq!(sender.as_ref(), message.as_ref());
1731 received.insert(sender);
1732 }
1733 }
1734
1735 let mut restart_counter = 0u16;
1737 for round in 0..3 {
1738 for restart_peer_idx in 1..n {
1739 restart_counter += 1;
1741 let new_port = base_port + 100 + restart_counter;
1742 ports[restart_peer_idx] = new_port;
1743
1744 if let Some(handle) = handles[restart_peer_idx].take() {
1746 handle.abort();
1747 }
1748 senders[restart_peer_idx] = None;
1749 receivers[restart_peer_idx] = None;
1750
1751 let peer_context =
1753 context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1754 let bootstrappers = vec![(
1755 addresses[0].clone(),
1756 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1757 )];
1758 let config = Config::test(
1759 peers[restart_peer_idx].clone(),
1760 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1761 bootstrappers,
1762 MAX_MESSAGE_SIZE,
1763 );
1764 let (mut network, mut oracle) =
1765 Network::new(peer_context.with_label("network"), config);
1766
1767 oracle
1769 .update(0, addresses.clone().try_into().unwrap())
1770 .await;
1771
1772 let (sender, receiver) = network.register(
1773 0,
1774 Quota::per_second(NZU32!(100)),
1775 DEFAULT_MESSAGE_BACKLOG,
1776 );
1777 senders[restart_peer_idx] = Some(sender);
1778 receivers[restart_peer_idx] = Some(receiver);
1779
1780 let handle = network.start();
1781 handles[restart_peer_idx] = Some(handle);
1782
1783 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1785 loop {
1786 let sent = restarted_sender
1787 .send(
1788 Recipients::All,
1789 peers[restart_peer_idx].public_key().as_ref(),
1790 true,
1791 )
1792 .await
1793 .unwrap();
1794 if sent.len() == n - 1 {
1795 break;
1796 }
1797 context.sleep(Duration::from_millis(100)).await;
1798 }
1799
1800 for i in 0..n {
1802 if i == restart_peer_idx {
1803 continue;
1804 }
1805 let sender = senders[i].as_mut().unwrap();
1806 loop {
1807 let sent = sender
1808 .send(
1809 Recipients::One(addresses[restart_peer_idx].clone()),
1810 peers[i].public_key().as_ref(),
1811 true,
1812 )
1813 .await
1814 .unwrap();
1815 if sent.len() == 1 {
1816 break;
1817 }
1818 context.sleep(Duration::from_millis(100)).await;
1819 }
1820 }
1821
1822 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1824 let mut received = HashSet::new();
1825 while received.len() < n - 1 {
1826 let (sender, message): (ed25519::PublicKey, _) =
1827 restarted_receiver.recv().await.unwrap();
1828 assert_eq!(sender.as_ref(), message.as_ref());
1829 received.insert(sender);
1830 }
1831 }
1832 }
1833
1834 assert_no_rate_limiting(&context);
1835 });
1836 }
1837
1838 #[test_traced]
1839 fn test_simultaneous_peer_restart() {
1840 let base_port = 7700;
1841 let n = 5;
1842
1843 let executor = deterministic::Runner::default();
1844 executor.start(|context| async move {
1845 let peers: Vec<_> = (0..n)
1847 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1848 .collect();
1849 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1850
1851 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1853
1854 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1856 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1857 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1858
1859 for (i, peer) in peers.iter().enumerate() {
1861 let peer_context = context.with_label(&format!("peer_{i}"));
1862
1863 let mut bootstrappers = Vec::new();
1865 if i > 0 {
1866 bootstrappers.push((
1867 addresses[0].clone(),
1868 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1869 ));
1870 }
1871
1872 let config = Config::test(
1873 peer.clone(),
1874 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1875 bootstrappers,
1876 MAX_MESSAGE_SIZE,
1877 );
1878 let (mut network, mut oracle) =
1879 Network::new(peer_context.with_label("network"), config);
1880
1881 oracle
1883 .update(0, addresses.clone().try_into().unwrap())
1884 .await;
1885
1886 let (sender, receiver) =
1887 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1888 senders[i] = Some(sender);
1889 receivers[i] = Some(receiver);
1890
1891 let handle = network.start();
1892 handles[i] = Some(handle);
1893 }
1894
1895 for (i, sender) in senders.iter_mut().enumerate() {
1897 let sender = sender.as_mut().unwrap();
1898 loop {
1899 let sent = sender
1900 .send(Recipients::All, peers[i].public_key().as_ref(), true)
1901 .await
1902 .unwrap();
1903 if sent.len() == n - 1 {
1904 break;
1905 }
1906 context.sleep(Duration::from_millis(100)).await;
1907 }
1908 }
1909
1910 for receiver in receivers.iter_mut() {
1912 let receiver = receiver.as_mut().unwrap();
1913 let mut received = HashSet::new();
1914 while received.len() < n - 1 {
1915 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1916 assert_eq!(sender.as_ref(), message.as_ref());
1917 received.insert(sender);
1918 }
1919 }
1920
1921 let restart_peers: Vec<usize> = (1..n).collect();
1926 for &idx in &restart_peers {
1927 if let Some(handle) = handles[idx].take() {
1928 handle.abort();
1929 }
1930 senders[idx] = None;
1931 receivers[idx] = None;
1932 ports[idx] = base_port + 100 + idx as u16;
1934 }
1935
1936 context.sleep(Duration::from_secs(2)).await;
1938
1939 for &idx in &restart_peers {
1941 let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1942 let bootstrappers = vec![(
1943 addresses[0].clone(),
1944 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1945 )];
1946 let config = Config::test(
1947 peers[idx].clone(),
1948 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1949 bootstrappers,
1950 MAX_MESSAGE_SIZE,
1951 );
1952 let (mut network, mut oracle) =
1953 Network::new(peer_context.with_label("network"), config);
1954
1955 oracle
1957 .update(0, addresses.clone().try_into().unwrap())
1958 .await;
1959
1960 let (sender, receiver) =
1961 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1962 senders[idx] = Some(sender);
1963 receivers[idx] = Some(receiver);
1964
1965 let handle = network.start();
1966 handles[idx] = Some(handle);
1967 }
1968
1969 for (i, sender) in senders.iter_mut().enumerate() {
1971 let sender = sender.as_mut().unwrap();
1972 loop {
1973 let sent = sender
1974 .send(Recipients::All, peers[i].public_key().as_ref(), true)
1975 .await
1976 .unwrap();
1977 if sent.len() == n - 1 {
1978 break;
1979 }
1980 context.sleep(Duration::from_millis(100)).await;
1981 }
1982 }
1983
1984 for receiver in receivers.iter_mut() {
1986 let receiver = receiver.as_mut().unwrap();
1987 let mut received = HashSet::new();
1988 while received.len() < n - 1 {
1989 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1990 assert_eq!(sender.as_ref(), message.as_ref());
1991 received.insert(sender);
1992 }
1993 }
1994
1995 assert_no_rate_limiting(&context);
1996 });
1997 }
1998 #[test_traced]
1999 fn test_peer_restart_with_new_address_must_dial() {
2000 let base_port = 3600;
2001 let n: usize = 5;
2002
2003 let executor = deterministic::Runner::default();
2004 executor.start(|context| async move {
2005 let peers: Vec<_> = (0..n)
2007 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
2008 .collect();
2009 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
2010
2011 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
2013 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
2014 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
2015
2016 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
2018
2019 let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); let wrong_address_peer_idx = 2;
2025
2026 for (i, peer) in peers.iter().enumerate() {
2027 let peer_context = context.with_label(&format!("peer_{i}"));
2028 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
2029
2030 let dialable_addr: Ingress = if i == wrong_address_peer_idx {
2032 SocketAddr::new(wrong_ip, ports[i]).into()
2033 } else {
2034 listen_addr.into()
2035 };
2036
2037 let mut bootstrappers = Vec::new();
2039 if i > 0 {
2040 bootstrappers.push((
2041 addresses[0].clone(),
2042 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2043 ));
2044 }
2045
2046 let mut config =
2047 Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
2048 config.dialable = dialable_addr;
2049
2050 let (mut network, mut oracle) =
2051 Network::new(peer_context.with_label("network"), config);
2052
2053 oracle
2054 .update(0, addresses.clone().try_into().unwrap())
2055 .await;
2056
2057 let (sender, receiver) =
2058 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2059 senders[i] = Some(sender);
2060 receivers[i] = Some(receiver);
2061
2062 let handle = network.start();
2063 handles[i] = Some(handle);
2064 }
2065
2066 for (i, sender) in senders.iter_mut().enumerate() {
2068 let sender = sender.as_mut().unwrap();
2069 loop {
2070 let sent = sender
2071 .send(Recipients::All, peers[i].public_key().as_ref(), true)
2072 .await
2073 .unwrap();
2074 if sent.len() == n - 1 {
2075 break;
2076 }
2077 context.sleep(Duration::from_millis(100)).await;
2078 }
2079 }
2080
2081 for receiver in receivers.iter_mut() {
2083 let receiver = receiver.as_mut().unwrap();
2084 let mut received = HashSet::new();
2085 while received.len() < n - 1 {
2086 let (sender, message) = receiver.recv().await.unwrap();
2087 assert_eq!(sender.as_ref(), message.as_ref());
2088 received.insert(sender);
2089 }
2090 }
2091
2092 let restart_peer_idx = 1;
2098 let new_port = base_port + 100;
2099 ports[restart_peer_idx] = new_port;
2100
2101 if let Some(handle) = handles[restart_peer_idx].take() {
2103 handle.abort();
2104 }
2105 senders[restart_peer_idx] = None;
2106 receivers[restart_peer_idx] = None;
2107
2108 let peer_context = context.with_label(&format!("peer_{restart_peer_idx}_restarted"));
2110 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
2111 let bootstrappers = vec![(
2112 addresses[0].clone(),
2113 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2114 )];
2115
2116 let config = Config::test(
2117 peers[restart_peer_idx].clone(),
2118 listen_addr,
2119 bootstrappers,
2120 1_024 * 1_024,
2121 );
2122
2123 let (mut network, mut oracle) =
2124 Network::new(peer_context.with_label("network"), config);
2125
2126 oracle
2127 .update(0, addresses.clone().try_into().unwrap())
2128 .await;
2129
2130 let (sender, receiver) =
2131 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2132 senders[restart_peer_idx] = Some(sender);
2133 receivers[restart_peer_idx] = Some(receiver);
2134
2135 let handle = network.start();
2136 handles[restart_peer_idx] = Some(handle);
2137
2138 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
2144 loop {
2145 let sent = restarted_sender
2146 .send(
2147 Recipients::All,
2148 peers[restart_peer_idx].public_key().as_ref(),
2149 true,
2150 )
2151 .await
2152 .unwrap();
2153 if sent.len() == n - 1 {
2154 break;
2155 }
2156 context.sleep(Duration::from_millis(100)).await;
2157 }
2158
2159 for i in 0..n {
2161 if i == restart_peer_idx {
2162 continue;
2163 }
2164 let sender = senders[i].as_mut().unwrap();
2165 loop {
2166 let sent = sender
2167 .send(
2168 Recipients::One(addresses[restart_peer_idx].clone()),
2169 peers[i].public_key().as_ref(),
2170 true,
2171 )
2172 .await
2173 .unwrap();
2174 if sent.len() == 1 {
2175 break;
2176 }
2177 context.sleep(Duration::from_millis(100)).await;
2178 }
2179 }
2180
2181 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
2183 let mut received = HashSet::new();
2184 while received.len() < n - 1 {
2185 let (sender, message) = restarted_receiver.recv().await.unwrap();
2186 assert_eq!(sender.as_ref(), message.as_ref());
2187 received.insert(sender);
2188 }
2189
2190 assert_no_rate_limiting(&context);
2191 });
2192 }
2193
2194 #[test_traced]
2195 fn test_operations_after_shutdown_do_not_panic() {
2196 let executor = deterministic::Runner::default();
2197 executor.start(|context| async move {
2198 let peer = ed25519::PrivateKey::from_seed(0);
2199 let address = peer.public_key();
2200
2201 let peer_context = context.with_label("peer");
2202 let config = Config::test(
2203 peer.clone(),
2204 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2205 vec![],
2206 MAX_MESSAGE_SIZE,
2207 );
2208 let (mut network, mut oracle) =
2209 Network::new(peer_context.with_label("network"), config);
2210
2211 let (mut sender, _receiver) =
2213 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2214 let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
2215 oracle.update(0, peers.clone()).await;
2216
2217 let handle = network.start();
2219 handle.abort();
2220
2221 context.sleep(Duration::from_millis(100)).await;
2223
2224 oracle.update(1, peers.clone()).await;
2226 let _ = oracle.peer_set(0).await;
2227 let _ = oracle.subscribe().await;
2228 oracle.block(address.clone()).await;
2229
2230 let sent = sender
2232 .send(Recipients::All, address.as_ref(), true)
2233 .await
2234 .unwrap();
2235 assert!(sent.is_empty());
2236 });
2237 }
2238
2239 fn clean_shutdown(seed: u64) {
2240 let cfg = deterministic::Config::default()
2241 .with_seed(seed)
2242 .with_timeout(Some(Duration::from_secs(30)));
2243 let executor = deterministic::Runner::new(cfg);
2244 executor.start(|context| async move {
2245 let peer = ed25519::PrivateKey::from_seed(0);
2246
2247 let peer_context = context.with_label("peer");
2248 let config = Config::test(
2249 peer.clone(),
2250 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2251 vec![],
2252 MAX_MESSAGE_SIZE,
2253 );
2254 let (mut network, mut oracle) =
2255 Network::new(peer_context.with_label("network"), config);
2256
2257 let (_, _) =
2259 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2260 let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
2261 oracle.update(0, peers).await;
2262
2263 let handle = network.start();
2265
2266 context.sleep(Duration::from_millis(100)).await;
2268
2269 let running_before = count_running_tasks(&context, "peer_network");
2271 assert!(
2272 running_before > 0,
2273 "at least one network task should be running"
2274 );
2275
2276 handle.abort();
2278 let _ = handle.await;
2279
2280 context.sleep(Duration::from_millis(100)).await;
2282
2283 let running_after = count_running_tasks(&context, "peer_network");
2285 assert_eq!(
2286 running_after, 0,
2287 "all network tasks should be stopped, but {running_after} still running"
2288 );
2289 });
2290 }
2291
2292 #[test_traced]
2293 fn test_clean_shutdown() {
2294 for seed in 0..25 {
2295 clean_shutdown(seed);
2296 }
2297 }
2298
2299 #[test]
2300 fn test_broadcast_slow_peer_no_blocking() {
2301 let executor = deterministic::Runner::timed(Duration::from_secs(5));
2302 executor.start(|context| async move {
2303 let cfg = RouterConfig { mailbox_size: 10 };
2305 let (router, mut mailbox, messenger) =
2306 RouterActor::<_, ed25519::PublicKey>::new(context.clone(), cfg);
2307
2308 let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2310 let _handle = router.start(channels);
2311
2312 let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2315 let (slow_low, _slow_low_rx) = mpsc::channel(10);
2316 let (slow_high, _slow_high_rx) = mpsc::channel(10);
2317 assert!(
2318 mailbox
2319 .ready(slow_peer.clone(), Relay::new(slow_low, slow_high))
2320 .await
2321 .is_some(),
2322 "Failed to register slow peer"
2323 );
2324
2325 let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2327 let (fast_low, mut fast_receiver) = mpsc::channel(100);
2328 let (fast_high, _fast_high_rx) = mpsc::channel(100);
2329 assert!(
2330 mailbox
2331 .ready(fast_peer.clone(), Relay::new(fast_low, fast_high))
2332 .await
2333 .is_some(),
2334 "Failed to register fast peer"
2335 );
2336
2337 let message = Bytes::from(vec![0u8; 100]);
2338 let mut messenger = messenger;
2339
2340 for i in 0..10 {
2342 let sent = messenger
2343 .content(Recipients::All, 0, message.clone(), false)
2344 .await;
2345 assert_eq!(sent.len(), 2, "Broadcast {i} should reach both peers");
2346 }
2347
2348 let sent = messenger.content(Recipients::All, 0, message, false).await;
2350 assert!(
2351 sent.contains(&fast_peer),
2352 "Fast peer should receive message"
2353 );
2354
2355 for _ in 0..11 {
2357 assert!(fast_receiver.try_next().is_ok());
2358 }
2359 });
2360 }
2361}