1mod actors;
239mod channels;
240mod config;
241mod metrics;
242mod network;
243mod types;
244
245use thiserror::Error;
246
247#[derive(Error, Debug)]
249pub enum Error {
250 #[error("network closed")]
251 NetworkClosed,
252}
253
254pub use actors::tracker::Oracle;
255pub use channels::{Receiver, Sender};
256pub use config::{Bootstrapper, Config};
257pub use network::Network;
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use crate::{
263 authenticated::{
264 discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
265 relay::Relay,
266 },
267 CheckedSender as _, Ingress, LimitedSender as _, Manager, Provider, Receiver, Recipients,
268 Sender,
269 };
270 use commonware_actor::{Feedback, Unreliable};
271 use commonware_cryptography::{ed25519, Signer as _};
272 use commonware_macros::{select, select_loop, test_group, test_traced};
273 use commonware_runtime::{
274 deterministic, telemetry::metrics::count_running_tasks, tokio, BufferPooler, Clock, Handle,
275 IoBuf, Metrics, Network as RNetwork, Quota, Resolver, Runner, Spawner, Supervisor as _,
276 };
277 use commonware_utils::{channel::mpsc, hostname, ordered::Set, NZUsize, TryCollect, NZU32};
278 use rand_core::{CryptoRngCore, RngCore};
279 use std::{
280 collections::HashSet,
281 net::{IpAddr, Ipv4Addr, SocketAddr},
282 time::Duration,
283 };
284
285 #[derive(Copy, Clone)]
286 enum Mode {
287 All,
288 Some,
289 One,
290 }
291
292 const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
294
295 fn assert_no_rate_limiting(metrics: &str) {
304 assert!(
305 !metrics.contains("messages_rate_limited_total{"),
306 "no messages should be rate limited: {metrics}"
307 );
308 }
309
310 async fn run_network(
315 context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
316 max_message_size: u32,
317 base_port: u16,
318 n: usize,
319 mode: Mode,
320 ) {
321 let mut peers = Vec::new();
323 for i in 0..n {
324 peers.push(ed25519::PrivateKey::from_seed(i as u64));
325 }
326 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
327
328 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
330 for (i, peer) in peers.iter().enumerate() {
331 let context = context.child("peer").with_attribute("index", i);
333
334 let port = base_port + i as u16;
336
337 let mut bootstrappers = Vec::new();
339 if i > 0 {
340 bootstrappers.push((
341 addresses[0].clone(),
342 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
343 ));
344 }
345
346 let signer = peer.clone();
348 let config = Config::test(
349 signer.clone(),
350 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
351 bootstrappers,
352 max_message_size,
353 );
354 let (mut network, mut oracle) = Network::new(context.child("network"), config);
355
356 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
358
359 let (mut sender, mut receiver) =
361 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
362
363 network.start();
365
366 context.child("agent").spawn({
368 let complete_sender = complete_sender.clone();
369 let addresses = addresses.clone();
370 move |context| async move {
371 let receiver = context.child("receiver").spawn(move |_| async move {
373 let mut received = HashSet::new();
375 while received.len() < n - 1 {
376 let (sender, message) = receiver.recv().await.unwrap();
378 assert_eq!(message, sender.as_ref());
379
380 received.insert(sender);
382 }
383 complete_sender.send(()).await.unwrap();
384
385 loop {
387 receiver.recv().await.unwrap();
388 }
389 });
390
391 let msg = signer.public_key();
393 let sender = context.child("sender").spawn(move |context| async move {
394 let mut recipients = addresses.clone();
396 recipients.remove(i);
397 recipients.sort();
398
399 loop {
401 match mode {
402 Mode::One => {
403 for recipient in &recipients {
404 loop {
406 let sent = sender.send(
407 Recipients::One(recipient.clone()),
408 msg.as_ref().to_vec(),
409 true,
410 );
411 if sent.len() != 1 {
412 context.sleep(Duration::from_millis(100)).await;
413 continue;
414 }
415 break;
416 }
417 }
418 }
419 Mode::Some | Mode::All => {
420 loop {
422 let sent = sender.send(
423 match mode {
424 Mode::Some => Recipients::Some(recipients.clone()),
425 Mode::All => Recipients::All,
426 _ => unreachable!(),
427 },
428 msg.as_ref().to_vec(),
429 true,
430 );
431 if sent.len() != recipients.len() {
432 context.sleep(Duration::from_millis(100)).await;
433 continue;
434 }
435 break;
436 }
437 }
438 };
439
440 context.sleep(Duration::from_secs(10)).await;
442 }
443 });
444
445 select! {
447 receiver = receiver => {
448 panic!("receiver exited: {receiver:?}");
449 },
450 sender = sender => {
451 panic!("sender exited: {sender:?}");
452 },
453 }
454 }
455 });
456 }
457
458 for _ in 0..n {
460 complete_receiver.recv().await.unwrap();
461 }
462
463 assert_no_rate_limiting(&context.encode());
465 }
466
467 fn run_deterministic_test(seed: u64, mode: Mode) {
468 const NUM_PEERS: usize = 25;
470 const BASE_PORT: u16 = 3000;
471
472 let executor = deterministic::Runner::seeded(seed);
474 let state = executor.start(|context| async move {
475 run_network(
476 context.child("network"),
477 MAX_MESSAGE_SIZE,
478 BASE_PORT,
479 NUM_PEERS,
480 mode,
481 )
482 .await;
483 context.auditor().state()
484 });
485
486 let executor = deterministic::Runner::seeded(seed);
488 let state2 = executor.start(|context| async move {
489 run_network(
490 context.child("network"),
491 MAX_MESSAGE_SIZE,
492 BASE_PORT,
493 NUM_PEERS,
494 mode,
495 )
496 .await;
497 context.auditor().state()
498 });
499 assert_eq!(state, state2);
500 }
501
502 #[test_group("slow")]
503 #[test_traced]
504 fn test_determinism_one() {
505 for i in 0..10 {
506 run_deterministic_test(i, Mode::One);
507 }
508 }
509
510 #[test_group("slow")]
511 #[test_traced]
512 fn test_determinism_some() {
513 for i in 0..10 {
514 run_deterministic_test(i, Mode::Some);
515 }
516 }
517
518 #[test_group("slow")]
519 #[test_traced]
520 fn test_determinism_all() {
521 for i in 0..10 {
522 run_deterministic_test(i, Mode::All);
523 }
524 }
525
526 #[test_traced]
527 fn test_tokio_connectivity() {
528 let executor = tokio::Runner::default();
529 executor.start(|context| async move {
530 let base_port = 3000;
531 let n = 10;
532 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
533 });
534 }
535
536 #[test_traced]
537 fn test_multi_index_oracle() {
538 let base_port = 3000;
540 let n: usize = 100;
541
542 let executor = deterministic::Runner::default();
544 executor.start(|context| async move {
545 let mut peers = Vec::new();
547 for i in 0..n {
548 peers.push(ed25519::PrivateKey::from_seed(i as u64));
549 }
550 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
551
552 let mut waiters = Vec::new();
554 for (i, peer) in peers.iter().enumerate() {
555 let context = context.child("peer").with_attribute("index", i);
557
558 let port = base_port + i as u16;
560
561 let mut bootstrappers = Vec::new();
563 if i > 0 {
564 bootstrappers.push((
565 addresses[0].clone(),
566 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
567 ));
568 }
569
570 let signer = peer.clone();
572 let config = Config::test(
573 signer.clone(),
574 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
575 bootstrappers,
576 1_024 * 1_024, );
578 let (mut network, mut oracle) = Network::new(context.child("network"), config);
579
580 oracle.track(0, Set::try_from([addresses[0].clone()]).unwrap());
582 oracle.track(
583 1,
584 Set::try_from([addresses[1].clone(), addresses[2].clone()]).unwrap(),
585 );
586 oracle.track(
587 2,
588 addresses
589 .iter()
590 .skip(2)
591 .cloned()
592 .try_collect::<Set<_>>()
593 .unwrap(),
594 );
595
596 let (mut sender, mut receiver) =
598 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
599
600 network.start();
602
603 let handler = context.child("agent").spawn(move |context| async move {
605 if i == 0 {
606 let msg = signer.public_key();
608 loop {
609 if sender
610 .send(Recipients::All, msg.as_ref().to_vec(), true)
611 .len()
612 == n - 1
613 {
614 break;
615 }
616
617 context.sleep(Duration::from_millis(100)).await;
619 }
620 } else {
621 let (sender, message) = receiver.recv().await.unwrap();
623 assert_eq!(message, sender.as_ref());
624 }
625 });
626
627 waiters.push(handler);
629 }
630
631 for waiter in waiters.into_iter().rev() {
633 waiter.await.unwrap();
634 }
635
636 assert_no_rate_limiting(&context.encode());
638 });
639 }
640
641 #[test_traced]
642 #[should_panic(expected = "message too large")]
643 fn test_message_too_large() {
644 let base_port = 3000;
646 let n: usize = 2;
647
648 let executor = deterministic::Runner::seeded(0);
650 executor.start(|mut context| async move {
651 let mut peers = Vec::new();
653 for i in 0..n {
654 peers.push(ed25519::PrivateKey::from_seed(i as u64));
655 }
656 let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
657
658 let signer = peers[0].clone();
660 let config = Config::test(
661 signer,
662 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
663 Vec::new(),
664 1_024 * 1_024, );
666 let (mut network, mut oracle) = Network::new(context.child("network"), config);
667
668 oracle.track(0, addresses.clone());
670
671 let (mut sender, _) =
673 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
674
675 network.start();
677
678 let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
681
682 let recipient = Recipients::One(addresses[1].clone());
684 sender.send(recipient, msg, true);
685 });
686 }
687
688 #[test_traced]
689 fn test_rate_limiting() {
690 let base_port = 3000;
692 let n: usize = 2;
693
694 let executor = deterministic::Runner::seeded(0);
696 executor.start(|context| async move {
697 let mut peers = Vec::new();
699 for i in 0..n {
700 peers.push(ed25519::PrivateKey::from_seed(i as u64));
701 }
702 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
703 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
704 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
705
706 let signer0 = peers[0].clone();
708 let config0 = Config::test(
709 signer0.clone(),
710 socket0,
711 vec![(peers[1].public_key(), socket1.into())],
712 1_024 * 1_024, );
714 let (mut network0, mut oracle0) =
715 Network::new(context.child("peer").with_attribute("index", 0), config0);
716 oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
717 let (mut sender0, _receiver0) =
718 network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
719 network0.start();
720
721 let signer1 = peers[1].clone();
723 let config1 = Config::test(
724 signer1.clone(),
725 socket1,
726 vec![(peers[0].public_key(), socket0.into())],
727 1_024 * 1_024, );
729 let (mut network1, mut oracle1) =
730 Network::new(context.child("peer").with_attribute("index", 1), config1);
731 oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
732 let (_sender1, _receiver1) =
733 network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
734 network1.start();
735
736 let msg = vec![0u8; 1024]; loop {
739 let checked = sender0.check(Recipients::All).unwrap();
741 if !checked.recipients().is_empty() {
742 checked.send(msg.clone(), true);
743 break;
744 }
745
746 context.sleep(Duration::from_mins(1)).await
749 }
750
751 let sent = sender0.send(Recipients::One(addresses[1].clone()), msg, true);
753 assert!(sent.is_empty());
754
755 for _ in 0..10 {
757 assert_no_rate_limiting(&context.encode());
758 context.sleep(Duration::from_millis(100)).await;
759 }
760 });
761 }
762
763 #[test_traced]
764 fn test_unordered_peer_sets() {
765 let (n, base_port) = (10, 3000);
766 let executor = deterministic::Runner::default();
767 executor.start(|context| async move {
768 let mut peers_and_sks = Vec::new();
770 for i in 0..n {
771 let sk = ed25519::PrivateKey::from_seed(i as u64);
772 let pk = sk.public_key();
773 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
774 peers_and_sks.push((sk, pk, addr));
775 }
776 let peer0 = peers_and_sks[0].clone();
777 let config = Config::test(
778 peer0.0,
779 peer0.2,
780 vec![(peer0.1.clone(), peer0.2.into())],
781 1_024 * 1_024,
782 );
783 let (network, mut oracle) = Network::new(context.child("network"), config);
784 network.start();
785
786 let mut subscription = oracle.subscribe().await;
788
789 let set10: Set<_> = peers_and_sks
791 .iter()
792 .take(2)
793 .map(|(_, pk, _)| pk.clone())
794 .try_collect()
795 .unwrap();
796 oracle.track(10, set10.clone());
797 let update = subscription.recv().await.unwrap();
798 assert_eq!(update.index, 10);
799 assert_eq!(update.latest.primary, set10);
800 assert!(update.latest.secondary.is_empty());
801 assert_eq!(update.all.primary, set10);
802 assert!(update.all.secondary.is_empty());
803
804 let set9: Set<_> = peers_and_sks
806 .iter()
807 .skip(2)
808 .map(|(_, pk, _)| pk.clone())
809 .try_collect()
810 .unwrap();
811 oracle.track(9, set9.clone());
812
813 let set11: Set<_> = peers_and_sks
815 .iter()
816 .skip(4)
817 .map(|(_, pk, _)| pk.clone())
818 .try_collect()
819 .unwrap();
820 oracle.track(11, set11.clone());
821 let update = subscription.recv().await.unwrap();
822 assert_eq!(update.index, 11);
823 assert_eq!(update.latest.primary, set11);
824 assert!(update.latest.secondary.is_empty());
825 let all_keys: Set<_> = set10
826 .into_iter()
827 .chain(set11.into_iter())
828 .try_collect()
829 .unwrap();
830 assert_eq!(update.all.primary, all_keys);
831 assert!(update.all.secondary.is_empty());
832 });
833 }
834
835 #[test_traced]
836 fn test_graceful_shutdown() {
837 let base_port = 3000;
838 let n: usize = 5;
839
840 let executor = deterministic::Runner::default();
841 executor.start(|context| async move {
842 let mut peers = Vec::new();
844 for i in 0..n {
845 peers.push(ed25519::PrivateKey::from_seed(i as u64));
846 }
847 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
848
849 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
851 for (i, peer) in peers.iter().enumerate() {
852 let peer_context = context.child("peer").with_attribute("index", i);
853 let port = base_port + i as u16;
854
855 let mut bootstrappers = Vec::new();
857 if i > 0 {
858 bootstrappers.push((
859 addresses[0].clone(),
860 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
861 ));
862 }
863
864 let signer = peer.clone();
865 let config = Config::test(
866 signer.clone(),
867 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
868 bootstrappers,
869 1_024 * 1_024, );
871 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
872
873 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
875
876 let (mut sender, mut receiver) =
877 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
878 network.start();
879
880 peer_context.child("agent").spawn({
881 let complete_sender = complete_sender.clone();
882 move |context| async move {
883 let expected_connections = if i == 0 { n - 1 } else { 1 };
885
886 let msg = signer.public_key();
888 loop {
889 let sent = sender.send(Recipients::All, msg.as_ref().to_vec(), true);
890 if sent.len() >= expected_connections {
891 break;
892 }
893 context.sleep(Duration::from_millis(100)).await;
894 }
895
896 complete_sender.send(()).await.unwrap();
898
899 select_loop! {
901 context,
902 on_stopped => {},
903 Ok(_) = receiver.recv() else break => {},
904 }
905 }
906 });
907 }
908
909 for _ in 0..n {
911 complete_receiver.recv().await.unwrap();
912 }
913
914 let metrics_before = context.encode();
916 let tasks_running = |metrics: &str, name: &str| -> Option<u64> {
917 metrics.lines().find_map(|line| {
918 (line.starts_with("runtime_tasks_running{")
919 && line.contains(&format!("name=\"{name}\""))
920 && line.contains("kind=\"Task\""))
921 .then(|| {
922 line.split_whitespace()
923 .next_back()
924 .expect("metric line must have a value")
925 .parse::<u64>()
926 .expect("running task count must be an integer")
927 })
928 })
929 };
930
931 for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
932 let name = format!("peer_network_{actor}");
933 assert_eq!(
934 tasks_running(&metrics_before, &name),
935 Some(n as u64),
936 "{name} should have {n} running tasks before shutdown"
937 );
938 }
939
940 context.child("shutdown").spawn(move |context| async move {
943 let result = context.stop(0, Some(Duration::from_secs(5))).await;
945
946 assert!(
948 result.is_ok(),
949 "graceful shutdown should complete: {result:?}"
950 );
951 });
952
953 context.stopped().await.unwrap();
955
956 context.sleep(Duration::from_millis(100)).await;
958
959 let metrics_after = context.encode();
961 for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
962 let name = format!("peer_network_{actor}");
963 assert_eq!(
964 tasks_running(&metrics_after, &name),
965 Some(0),
966 "{name} should be stopped"
967 );
968 }
969 });
970 }
971
972 #[test_traced]
973 fn test_subscription_includes_self_when_registered() {
974 let base_port = 3000;
975 let executor = deterministic::Runner::default();
976 executor.start(|context| async move {
977 let self_sk = ed25519::PrivateKey::from_seed(0);
979 let self_pk = self_sk.public_key();
980 let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
981
982 let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
983
984 let config = Config::test(
986 self_sk,
987 self_addr,
988 vec![], 1_024 * 1_024,
990 );
991 let (network, mut oracle) = Network::new(context.child("network"), config);
992 network.start();
993
994 let mut subscription = oracle.subscribe().await;
996
997 let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
999 oracle.track(1, peer_set.clone());
1000
1001 let update = subscription.recv().await.unwrap();
1003 assert_eq!(update.index, 1);
1004 assert_eq!(update.latest.primary.len(), 1);
1005 assert!(update.latest.secondary.is_empty());
1006 assert_eq!(update.all.primary.len(), 1);
1007 assert!(update.all.secondary.is_empty());
1008
1009 assert!(
1011 update.latest.primary.position(&self_pk).is_none(),
1012 "latest set should not include self"
1013 );
1014 assert!(
1015 update.latest.primary.position(&other_pk).is_some(),
1016 "latest set should include other"
1017 );
1018
1019 assert!(
1021 update.all.primary.position(&self_pk).is_none(),
1022 "peer set should not include self"
1023 );
1024 assert!(
1025 update.all.primary.position(&other_pk).is_some(),
1026 "peer set should include other"
1027 );
1028
1029 let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
1031 oracle.track(2, peer_set.clone());
1032
1033 let update = subscription.recv().await.unwrap();
1035 assert_eq!(update.index, 2);
1036 assert_eq!(update.latest.primary.len(), 2);
1037 assert!(update.latest.secondary.is_empty());
1038 assert_eq!(update.all.primary.len(), 2);
1039 assert!(update.all.secondary.is_empty());
1040
1041 assert!(
1043 update.latest.primary.position(&self_pk).is_some(),
1044 "latest set should include self"
1045 );
1046 assert!(
1047 update.latest.primary.position(&other_pk).is_some(),
1048 "latest set should include other"
1049 );
1050
1051 assert!(
1053 update.all.primary.position(&self_pk).is_some(),
1054 "peer set should include self"
1055 );
1056 assert!(
1057 update.all.primary.position(&other_pk).is_some(),
1058 "peer set should include other"
1059 );
1060 });
1061 }
1062
1063 #[test_traced]
1064 fn test_dns_bootstrapper_resolution() {
1065 let base_port = 3000;
1066 let n: usize = 3;
1067
1068 let executor = deterministic::Runner::default();
1069 executor.start(|context| async move {
1070 let mut peers = Vec::new();
1072 for i in 0..n {
1073 peers.push(ed25519::PrivateKey::from_seed(i as u64));
1074 }
1075 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1076
1077 let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1079 context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1080
1081 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1083 for (i, peer) in peers.iter().enumerate() {
1084 let context = context.child("peer").with_attribute("index", i);
1085 let port = base_port + i as u16;
1086
1087 let bootstrappers = if i > 0 {
1089 vec![(
1090 addresses[0].clone(),
1091 Ingress::Dns {
1092 host: hostname!("boot.local"),
1093 port: base_port,
1094 },
1095 )]
1096 } else {
1097 vec![]
1098 };
1099
1100 let signer = peer.clone();
1102 let config = Config::test(
1103 signer.clone(),
1104 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1105 bootstrappers,
1106 1_024 * 1_024,
1107 );
1108 let (mut network, mut oracle) = Network::new(context.child("network"), config);
1109
1110 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1112
1113 let (mut sender, mut receiver) =
1115 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1116
1117 network.start();
1118
1119 context.child("agent").spawn({
1121 let complete_sender = complete_sender.clone();
1122 let addresses = addresses.clone();
1123 move |context| async move {
1124 let receiver = context.child("receiver").spawn(move |_| async move {
1126 let mut received = HashSet::new();
1127 while received.len() < n - 1 {
1128 let (sender, message) = receiver.recv().await.unwrap();
1129 assert_eq!(message, sender.as_ref());
1130 received.insert(sender);
1131 }
1132 complete_sender.send(()).await.unwrap();
1133
1134 loop {
1135 receiver.recv().await.unwrap();
1136 }
1137 });
1138
1139 let msg = signer.public_key();
1141 let sender = context.child("sender").spawn(move |context| async move {
1142 loop {
1143 let mut recipients = addresses.clone();
1144 recipients.remove(i);
1145 recipients.sort();
1146
1147 loop {
1148 let mut sent =
1149 sender.send(Recipients::All, msg.as_ref().to_vec(), true);
1150 if sent.len() != recipients.len() {
1151 context.sleep(Duration::from_millis(100)).await;
1152 continue;
1153 }
1154 sent.sort();
1155 assert_eq!(sent, recipients);
1156 break;
1157 }
1158
1159 context.sleep(Duration::from_secs(10)).await;
1160 }
1161 });
1162
1163 select! {
1164 receiver = receiver => {
1165 panic!("receiver exited: {receiver:?}")
1166 },
1167 sender = sender => {
1168 panic!("sender exited: {sender:?}")
1169 },
1170 }
1171 }
1172 });
1173 }
1174
1175 for _ in 0..n {
1177 complete_receiver.recv().await.unwrap();
1178 }
1179
1180 assert_no_rate_limiting(&context.encode());
1181 });
1182 }
1183
1184 #[test_traced]
1185 fn test_dns_resolution_failure_then_success() {
1186 let base_port = 3100;
1187
1188 let executor = deterministic::Runner::default();
1189 executor.start(|context| async move {
1190 let peer0 = ed25519::PrivateKey::from_seed(0);
1192 let peer1 = ed25519::PrivateKey::from_seed(1);
1193 let addresses = vec![peer0.public_key(), peer1.public_key()];
1194
1195 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1196 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1197
1198 let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1202 let (mut network0, mut oracle0) =
1203 Network::new(context.child("peer").with_attribute("index", 0), config0);
1204 oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
1205 let (mut sender0, mut receiver0) =
1206 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1207 network0.start();
1208
1209 let config1 = Config::test(
1211 peer1.clone(),
1212 socket1,
1213 vec![(
1214 peer0.public_key(),
1215 Ingress::Dns {
1216 host: hostname!("boot.local"),
1217 port: base_port,
1218 },
1219 )],
1220 1_024 * 1_024,
1221 );
1222 let (mut network1, mut oracle1) =
1223 Network::new(context.child("peer").with_attribute("index", 1), config1);
1224 oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
1225 let (mut sender1, mut receiver1) =
1226 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1227 network1.start();
1228
1229 context.sleep(Duration::from_secs(2)).await;
1231
1232 let checked = sender0.check(Recipients::All).unwrap();
1234 assert!(
1235 checked.recipients().is_empty(),
1236 "should not be connected yet"
1237 );
1238
1239 context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
1241
1242 let pk0 = peer0.public_key();
1244 let pk1 = peer1.public_key();
1245 let msg0 = pk0.to_vec();
1246 let msg1 = pk1.to_vec();
1247
1248 let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
1250 let done0 = done_sender.clone();
1251 let pk1_clone = pk1.clone();
1252 context
1253 .child("recv")
1254 .with_attribute("index", 0)
1255 .spawn(move |_| async move {
1256 let (sender, message) = receiver0.recv().await.unwrap();
1257 assert_eq!(sender, pk1_clone);
1258 assert_eq!(message, msg1.as_slice());
1259 done0.send(()).await.unwrap();
1260 });
1261 let done1 = done_sender.clone();
1262 let pk0_clone = pk0.clone();
1263 context
1264 .child("recv")
1265 .with_attribute("index", 1)
1266 .spawn(move |_| async move {
1267 let (sender, message) = receiver1.recv().await.unwrap();
1268 assert_eq!(sender, pk0_clone);
1269 assert_eq!(message, msg0.as_slice());
1270 done1.send(()).await.unwrap();
1271 });
1272
1273 let mut received = 0;
1274 while received < 2 {
1275 let sent0 = sender0.send(Recipients::One(pk1.clone()), pk0.as_ref().to_vec(), true);
1276 let sent1 = sender1.send(Recipients::One(pk0.clone()), pk1.as_ref().to_vec(), true);
1277 assert!(!sent0.is_empty());
1278 assert!(!sent1.is_empty());
1279
1280 select! {
1281 done = done_receiver.recv() => {
1282 done.expect("receiver task stopped");
1283 received += 1;
1284 },
1285 _ = context.sleep(Duration::from_millis(100)) => {},
1286 }
1287 }
1288 });
1289 }
1290
1291 fn run_dns_connectivity(seed: u64) -> String {
1293 let base_port = 3400;
1294 let n: usize = 3;
1295
1296 let executor = deterministic::Runner::seeded(seed);
1297 executor.start(|context| async move {
1298 let mut peers = Vec::new();
1300 for i in 0..n {
1301 peers.push(ed25519::PrivateKey::from_seed(i as u64));
1302 }
1303 let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
1304
1305 let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1307 context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
1308
1309 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1311 for (i, peer) in peers.iter().enumerate() {
1312 let context = context.child("peer").with_attribute("index", i);
1313 let port = base_port + i as u16;
1314
1315 let bootstrappers = if i > 0 {
1317 vec![(
1318 addresses[0].clone(),
1319 Ingress::Dns {
1320 host: hostname!("boot.local"),
1321 port: base_port,
1322 },
1323 )]
1324 } else {
1325 vec![]
1326 };
1327
1328 let signer = peer.clone();
1329 let config = Config::test(
1330 signer.clone(),
1331 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
1332 bootstrappers,
1333 1_024 * 1_024,
1334 );
1335 let (mut network, mut oracle) = Network::new(context.child("network"), config);
1336 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1337 let (mut sender, mut receiver) =
1338 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1339 network.start();
1340
1341 context.child("agent").spawn({
1342 let complete_sender = complete_sender.clone();
1343 let addresses = addresses.clone();
1344 move |context| async move {
1345 let receiver = context.child("receiver").spawn(move |_| async move {
1346 let mut received = HashSet::new();
1347 while received.len() < n - 1 {
1348 let (sender, message) = receiver.recv().await.unwrap();
1349 assert_eq!(message, sender.as_ref());
1350 received.insert(sender);
1351 }
1352 complete_sender.send(()).await.unwrap();
1353 loop {
1354 receiver.recv().await.unwrap();
1355 }
1356 });
1357
1358 let msg = signer.public_key();
1359 let sender = context.child("sender").spawn(move |context| async move {
1360 loop {
1361 let mut recipients = addresses.clone();
1362 recipients.remove(i);
1363 recipients.sort();
1364
1365 loop {
1366 let mut sent =
1367 sender.send(Recipients::All, msg.as_ref().to_vec(), true);
1368 if sent.len() != recipients.len() {
1369 context.sleep(Duration::from_millis(100)).await;
1370 continue;
1371 }
1372 sent.sort();
1373 assert_eq!(sent, recipients);
1374 break;
1375 }
1376
1377 context.sleep(Duration::from_secs(10)).await;
1378 }
1379 });
1380
1381 select! {
1382 receiver = receiver => {
1383 panic!("receiver exited: {receiver:?}")
1384 },
1385 sender = sender => {
1386 panic!("sender exited: {sender:?}")
1387 },
1388 }
1389 }
1390 });
1391 }
1392
1393 for _ in 0..n {
1394 complete_receiver.recv().await.unwrap();
1395 }
1396
1397 context.auditor().state()
1398 })
1399 }
1400
1401 #[test_traced]
1402 fn test_dns_resolution_determinism() {
1403 let state1 = run_dns_connectivity(42);
1405 let state2 = run_dns_connectivity(42);
1406 assert_eq!(state1, state2, "DNS resolution should be deterministic");
1407 }
1408
1409 #[test_traced]
1410 fn test_dns_resolving_to_private_ip_not_dialed() {
1411 let base_port = 3300;
1414 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1415 executor.start(|context| async move {
1416 let peer0 = ed25519::PrivateKey::from_seed(0);
1417 let peer1 = ed25519::PrivateKey::from_seed(1);
1418
1419 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1420 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1421
1422 context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
1424
1425 let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1426
1427 let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
1429 config0.allow_private_ips = true;
1430 let (mut network0, mut oracle0) =
1431 Network::new(context.child("peer").with_attribute("index", 0), config0);
1432 oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
1433 let (_sender0, mut receiver0) =
1434 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1435 network0.start();
1436
1437 let bootstrappers = vec![(
1439 peer0.public_key(),
1440 Ingress::Dns {
1441 host: hostname!("boot.local"),
1442 port: base_port,
1443 },
1444 )];
1445 let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
1446 config1.allow_private_ips = false; let (mut network1, mut oracle1) =
1448 Network::new(context.child("peer").with_attribute("index", 1), config1);
1449 oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
1450 let (mut sender1, _receiver1) =
1451 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1452 network1.start();
1453
1454 context.sleep(Duration::from_secs(5)).await;
1456
1457 let checked = sender1.check(Recipients::All).unwrap();
1459 assert!(
1460 checked.recipients().is_empty(),
1461 "peer 1 should not have connected to peer 0 (private IP)"
1462 );
1463
1464 select! {
1466 msg = receiver0.recv() => {
1467 panic!("peer 0 should not have received any message, got: {msg:?}");
1468 },
1469 _ = context.sleep(Duration::from_secs(1)) => {
1470 },
1472 }
1473 });
1474 }
1475
1476 #[test_traced]
1477 fn test_dns_mixed_ips_connectivity() {
1478 for seed in 0..25 {
1484 let base_port = 3400;
1485
1486 let cfg = deterministic::Config::default()
1487 .with_seed(seed)
1488 .with_timeout(Some(Duration::from_secs(120)));
1489 let executor = deterministic::Runner::new(cfg);
1490 executor.start(|context| async move {
1491 let peer0 = ed25519::PrivateKey::from_seed(0);
1492 let peer1 = ed25519::PrivateKey::from_seed(1);
1493
1494 let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1495 let socket0 = SocketAddr::new(good_ip, base_port);
1496 let socket1 = SocketAddr::new(good_ip, base_port + 1);
1497
1498 let mut all_ips0: Vec<IpAddr> = (1..=3)
1500 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1501 .collect();
1502 all_ips0.push(good_ip);
1503 context.resolver_register("peer-0.local", Some(all_ips0));
1504
1505 let mut all_ips1: Vec<IpAddr> = (1..=3)
1506 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1507 .collect();
1508 all_ips1.push(good_ip);
1509 context.resolver_register("peer-1.local", Some(all_ips1));
1510
1511 let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
1512
1513 let bootstrappers0 = vec![(
1515 peer1.public_key(),
1516 Ingress::Dns {
1517 host: hostname!("peer-1.local"),
1518 port: base_port + 1,
1519 },
1520 )];
1521 let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
1522 let (mut network0, mut oracle0) =
1523 Network::new(context.child("peer").with_attribute("index", 0), config0);
1524 oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
1525 let (_sender0, mut receiver0) =
1526 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1527 network0.start();
1528
1529 let bootstrappers1 = vec![(
1531 peer0.public_key(),
1532 Ingress::Dns {
1533 host: hostname!("peer-0.local"),
1534 port: base_port,
1535 },
1536 )];
1537 let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
1538 let (mut network1, mut oracle1) =
1539 Network::new(context.child("peer").with_attribute("index", 1), config1);
1540 oracle1.track(0, Set::try_from(addresses.clone()).unwrap());
1541 let (mut sender1, _receiver1) =
1542 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1543 network1.start();
1544
1545 loop {
1547 let checked = sender1.check(Recipients::All).unwrap();
1548 if !checked.recipients().is_empty() {
1549 checked.send(peer1.public_key().as_ref().to_vec(), true);
1550 }
1551
1552 select! {
1553 result = receiver0.recv() => {
1554 let (sender, msg) = result.unwrap();
1555 assert_eq!(sender, peer1.public_key());
1556 assert_eq!(msg, peer1.public_key().as_ref());
1557 break;
1558 },
1559 _ = context.sleep(Duration::from_millis(100)) => {},
1560 }
1561 }
1562 });
1563 }
1564 }
1565
1566 #[test_traced]
1567 fn test_many_peer_restart_with_new_address() {
1568 let base_port = 7500;
1569 let n = 5;
1570
1571 let executor = deterministic::Runner::default();
1572 executor.start(|context| async move {
1573 let peers: Vec<_> = (0..n)
1575 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1576 .collect();
1577 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1578
1579 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1581 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1582 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1583
1584 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1586
1587 for (i, peer) in peers.iter().enumerate() {
1589 let peer_context = context.child("peer").with_attribute("index", i);
1590
1591 let mut bootstrappers = Vec::new();
1593 if i > 0 {
1594 bootstrappers.push((
1595 addresses[0].clone(),
1596 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1597 ));
1598 }
1599
1600 let config = Config::test(
1601 peer.clone(),
1602 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1603 bootstrappers,
1604 MAX_MESSAGE_SIZE,
1605 );
1606 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1607
1608 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1610
1611 let (sender, receiver) =
1612 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1613 senders[i] = Some(sender);
1614 receivers[i] = Some(receiver);
1615
1616 let handle = network.start();
1617 handles[i] = Some(handle);
1618 }
1619
1620 for (i, sender) in senders.iter_mut().enumerate() {
1622 let sender = sender.as_mut().unwrap();
1623 loop {
1624 let sent = sender.send(
1625 Recipients::All,
1626 peers[i].public_key().as_ref().to_vec(),
1627 true,
1628 );
1629 if sent.len() == n - 1 {
1630 break;
1631 }
1632 context.sleep(Duration::from_millis(100)).await;
1633 }
1634 }
1635
1636 for receiver in receivers.iter_mut() {
1638 let receiver = receiver.as_mut().unwrap();
1639 let mut received = HashSet::new();
1640 while received.len() < n - 1 {
1641 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1642 assert_eq!(message, sender.as_ref());
1643 received.insert(sender);
1644 }
1645 }
1646
1647 let mut restart_counter = 0u16;
1649 for round in 0..3 {
1650 for restart_peer_idx in 1..n {
1651 restart_counter += 1;
1653 let new_port = base_port + 100 + restart_counter;
1654 ports[restart_peer_idx] = new_port;
1655
1656 if let Some(handle) = handles[restart_peer_idx].take() {
1658 handle.abort();
1659 }
1660 senders[restart_peer_idx] = None;
1661 receivers[restart_peer_idx] = None;
1662
1663 let peer_context = context
1665 .child("peer_round")
1666 .with_attribute("index", restart_peer_idx)
1667 .with_attribute("round", round);
1668 let bootstrappers = vec![(
1669 addresses[0].clone(),
1670 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1671 )];
1672 let config = Config::test(
1673 peers[restart_peer_idx].clone(),
1674 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1675 bootstrappers,
1676 MAX_MESSAGE_SIZE,
1677 );
1678 let (mut network, mut oracle) =
1679 Network::new(peer_context.child("network"), config);
1680
1681 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1683
1684 let (sender, receiver) = network.register(
1685 0,
1686 Quota::per_second(NZU32!(100)),
1687 DEFAULT_MESSAGE_BACKLOG,
1688 );
1689 senders[restart_peer_idx] = Some(sender);
1690 receivers[restart_peer_idx] = Some(receiver);
1691
1692 let handle = network.start();
1693 handles[restart_peer_idx] = Some(handle);
1694
1695 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1697 loop {
1698 let sent = restarted_sender.send(
1699 Recipients::All,
1700 peers[restart_peer_idx].public_key().as_ref().to_vec(),
1701 true,
1702 );
1703 if sent.len() == n - 1 {
1704 break;
1705 }
1706 context.sleep(Duration::from_millis(100)).await;
1707 }
1708
1709 for i in 0..n {
1711 if i == restart_peer_idx {
1712 continue;
1713 }
1714 let sender = senders[i].as_mut().unwrap();
1715 loop {
1716 let sent = sender.send(
1717 Recipients::One(addresses[restart_peer_idx].clone()),
1718 peers[i].public_key().as_ref().to_vec(),
1719 true,
1720 );
1721 if sent.len() == 1 {
1722 break;
1723 }
1724 context.sleep(Duration::from_millis(100)).await;
1725 }
1726 }
1727
1728 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1730 let mut received = HashSet::new();
1731 while received.len() < n - 1 {
1732 let (sender, message): (ed25519::PublicKey, _) =
1733 restarted_receiver.recv().await.unwrap();
1734 assert_eq!(message, sender.as_ref());
1735 received.insert(sender);
1736 }
1737 }
1738 }
1739
1740 assert_no_rate_limiting(&context.encode());
1741 });
1742 }
1743
1744 #[test_traced]
1745 fn test_simultaneous_peer_restart() {
1746 let base_port = 7700;
1747 let n = 5;
1748
1749 let executor = deterministic::Runner::default();
1750 executor.start(|context| async move {
1751 let peers: Vec<_> = (0..n)
1753 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1754 .collect();
1755 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1756
1757 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1759
1760 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1762 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1763 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1764
1765 for (i, peer) in peers.iter().enumerate() {
1767 let peer_context = context.child("peer").with_attribute("index", i);
1768
1769 let mut bootstrappers = Vec::new();
1771 if i > 0 {
1772 bootstrappers.push((
1773 addresses[0].clone(),
1774 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1775 ));
1776 }
1777
1778 let config = Config::test(
1779 peer.clone(),
1780 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1781 bootstrappers,
1782 MAX_MESSAGE_SIZE,
1783 );
1784 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1785
1786 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1788
1789 let (sender, receiver) =
1790 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1791 senders[i] = Some(sender);
1792 receivers[i] = Some(receiver);
1793
1794 let handle = network.start();
1795 handles[i] = Some(handle);
1796 }
1797
1798 for (i, sender) in senders.iter_mut().enumerate() {
1800 let sender = sender.as_mut().unwrap();
1801 loop {
1802 let sent = sender.send(
1803 Recipients::All,
1804 peers[i].public_key().as_ref().to_vec(),
1805 true,
1806 );
1807 if sent.len() == n - 1 {
1808 break;
1809 }
1810 context.sleep(Duration::from_millis(100)).await;
1811 }
1812 }
1813
1814 for receiver in receivers.iter_mut() {
1816 let receiver = receiver.as_mut().unwrap();
1817 let mut received = HashSet::new();
1818 while received.len() < n - 1 {
1819 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1820 assert_eq!(message, sender.as_ref());
1821 received.insert(sender);
1822 }
1823 }
1824
1825 let restart_peers: Vec<usize> = (1..n).collect();
1830 for &idx in &restart_peers {
1831 if let Some(handle) = handles[idx].take() {
1832 handle.abort();
1833 }
1834 senders[idx] = None;
1835 receivers[idx] = None;
1836 ports[idx] = base_port + 100 + idx as u16;
1838 }
1839
1840 context.sleep(Duration::from_secs(2)).await;
1842
1843 for &idx in &restart_peers {
1845 let peer_context = context.child("peer_restarted").with_attribute("index", idx);
1846 let bootstrappers = vec![(
1847 addresses[0].clone(),
1848 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1849 )];
1850 let config = Config::test(
1851 peers[idx].clone(),
1852 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1853 bootstrappers,
1854 MAX_MESSAGE_SIZE,
1855 );
1856 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1857
1858 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1860
1861 let (sender, receiver) =
1862 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1863 senders[idx] = Some(sender);
1864 receivers[idx] = Some(receiver);
1865
1866 let handle = network.start();
1867 handles[idx] = Some(handle);
1868 }
1869
1870 for (i, sender) in senders.iter_mut().enumerate() {
1872 let sender = sender.as_mut().unwrap();
1873 loop {
1874 let sent = sender.send(
1875 Recipients::All,
1876 peers[i].public_key().as_ref().to_vec(),
1877 true,
1878 );
1879 if sent.len() == n - 1 {
1880 break;
1881 }
1882 context.sleep(Duration::from_millis(100)).await;
1883 }
1884 }
1885
1886 for receiver in receivers.iter_mut() {
1888 let receiver = receiver.as_mut().unwrap();
1889 let mut received = HashSet::new();
1890 while received.len() < n - 1 {
1891 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1892 assert_eq!(message, sender.as_ref());
1893 received.insert(sender);
1894 }
1895 }
1896
1897 assert_no_rate_limiting(&context.encode());
1898 });
1899 }
1900 #[test_traced]
1901 fn test_peer_restart_with_new_address_must_dial() {
1902 let base_port = 3600;
1903 let n: usize = 5;
1904
1905 let executor = deterministic::Runner::default();
1906 executor.start(|context| async move {
1907 let peers: Vec<_> = (0..n)
1909 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1910 .collect();
1911 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1912
1913 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1915 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1916 let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
1917
1918 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1920
1921 let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); let wrong_address_peer_idx = 2;
1927
1928 for (i, peer) in peers.iter().enumerate() {
1929 let peer_context = context.child("peer").with_attribute("index", i);
1930 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
1931
1932 let dialable_addr: Ingress = if i == wrong_address_peer_idx {
1934 SocketAddr::new(wrong_ip, ports[i]).into()
1935 } else {
1936 listen_addr.into()
1937 };
1938
1939 let mut bootstrappers = Vec::new();
1941 if i > 0 {
1942 bootstrappers.push((
1943 addresses[0].clone(),
1944 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
1945 ));
1946 }
1947
1948 let mut config =
1949 Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
1950 config.dialable = dialable_addr;
1951
1952 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1953
1954 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
1955
1956 let (sender, receiver) =
1957 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1958 senders[i] = Some(sender);
1959 receivers[i] = Some(receiver);
1960
1961 let handle = network.start();
1962 handles[i] = Some(handle);
1963 }
1964
1965 for (i, sender) in senders.iter_mut().enumerate() {
1967 let sender = sender.as_mut().unwrap();
1968 loop {
1969 let sent = sender.send(
1970 Recipients::All,
1971 peers[i].public_key().as_ref().to_vec(),
1972 true,
1973 );
1974 if sent.len() == n - 1 {
1975 break;
1976 }
1977 context.sleep(Duration::from_millis(100)).await;
1978 }
1979 }
1980
1981 for receiver in receivers.iter_mut() {
1983 let receiver = receiver.as_mut().unwrap();
1984 let mut received = HashSet::new();
1985 while received.len() < n - 1 {
1986 let (sender, message) = receiver.recv().await.unwrap();
1987 assert_eq!(message, sender.as_ref());
1988 received.insert(sender);
1989 }
1990 }
1991
1992 let restart_peer_idx = 1;
1998 let new_port = base_port + 100;
1999 ports[restart_peer_idx] = new_port;
2000
2001 if let Some(handle) = handles[restart_peer_idx].take() {
2003 handle.abort();
2004 }
2005 senders[restart_peer_idx] = None;
2006 receivers[restart_peer_idx] = None;
2007
2008 let peer_context = context
2010 .child("peer_restarted")
2011 .with_attribute("index", restart_peer_idx);
2012 let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
2013 let bootstrappers = vec![(
2014 addresses[0].clone(),
2015 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
2016 )];
2017
2018 let config = Config::test(
2019 peers[restart_peer_idx].clone(),
2020 listen_addr,
2021 bootstrappers,
2022 1_024 * 1_024,
2023 );
2024
2025 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
2026
2027 oracle.track(0, Set::try_from(addresses.clone()).unwrap());
2028
2029 let (sender, receiver) =
2030 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2031 senders[restart_peer_idx] = Some(sender);
2032 receivers[restart_peer_idx] = Some(receiver);
2033
2034 let handle = network.start();
2035 handles[restart_peer_idx] = Some(handle);
2036
2037 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
2043 loop {
2044 let sent = restarted_sender.send(
2045 Recipients::All,
2046 peers[restart_peer_idx].public_key().as_ref().to_vec(),
2047 true,
2048 );
2049 if sent.len() == n - 1 {
2050 break;
2051 }
2052 context.sleep(Duration::from_millis(100)).await;
2053 }
2054
2055 for i in 0..n {
2057 if i == restart_peer_idx {
2058 continue;
2059 }
2060 let sender = senders[i].as_mut().unwrap();
2061 loop {
2062 let sent = sender.send(
2063 Recipients::One(addresses[restart_peer_idx].clone()),
2064 peers[i].public_key().as_ref().to_vec(),
2065 true,
2066 );
2067 if sent.len() == 1 {
2068 break;
2069 }
2070 context.sleep(Duration::from_millis(100)).await;
2071 }
2072 }
2073
2074 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
2076 let mut received = HashSet::new();
2077 while received.len() < n - 1 {
2078 let (sender, message) = restarted_receiver.recv().await.unwrap();
2079 assert_eq!(message, sender.as_ref());
2080 received.insert(sender);
2081 }
2082
2083 assert_no_rate_limiting(&context.encode());
2084 });
2085 }
2086
2087 fn duplicate_addresses_disconnected(seed: u64) {
2088 let base_port = 6000;
2089 let executor = deterministic::Runner::seeded(seed);
2090 executor.start(|context| async move {
2091 let peer0 = ed25519::PrivateKey::from_seed(0);
2092 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2093 let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2094 let peer1 = ed25519::PrivateKey::from_seed(1);
2095 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2096 let peer2 = ed25519::PrivateKey::from_seed(2);
2097 let addresses: Vec<_> =
2098 vec![peer0.public_key(), peer1.public_key(), peer2.public_key()];
2099
2100 let config0 = Config::test(
2103 peer0.clone(),
2104 socket0,
2105 vec![
2106 (peer1.public_key(), socket1.into()),
2107 (peer2.public_key(), socket1.into()),
2108 ],
2109 MAX_MESSAGE_SIZE,
2110 );
2111 let (mut network0, mut oracle0) =
2112 Network::new(context.child("peer").with_attribute("index", 0), config0);
2113 let (mut sender0, _receiver0) =
2114 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2115 network0.start();
2116
2117 oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
2118
2119 context.sleep(Duration::from_secs(30)).await;
2121
2122 let checked = sender0.check(Recipients::All).unwrap();
2124 assert!(checked.recipients().is_empty());
2125
2126 let config1 = Config::test(
2128 peer1.clone(),
2129 socket1,
2130 vec![(peer0.public_key(), wrong_socket0.into())],
2131 MAX_MESSAGE_SIZE,
2132 );
2133 let (mut network1, mut oracle1) =
2134 Network::new(context.child("peer").with_attribute("index", 1), config1);
2135 let (_sender1, mut receiver1) =
2136 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2137 network1.start();
2138
2139 oracle1.track(0, Set::try_from(addresses).unwrap());
2140
2141 context.sleep(Duration::from_secs(30)).await;
2143
2144 loop {
2146 let sent =
2147 sender0.send(Recipients::All, peer0.public_key().as_ref().to_vec(), true);
2148 if sent.len() == 1 {
2149 assert_eq!(sent[0], peer1.public_key());
2150 break;
2151 }
2152 context.sleep(Duration::from_millis(100)).await;
2153 }
2154 let (sender, _) = receiver1.recv().await.unwrap();
2155 assert_eq!(sender, peer0.public_key());
2156 });
2157 }
2158
2159 #[test_traced]
2160 fn test_duplicate_addresses_disconnected() {
2161 for seed in 0..25 {
2163 duplicate_addresses_disconnected(seed);
2164 }
2165 }
2166
2167 #[test_traced]
2168 fn test_duplicate_addresses_connected() {
2169 let base_port = 6000;
2170 let executor = deterministic::Runner::default();
2171 executor.start(|context| async move {
2172 let peer0 = ed25519::PrivateKey::from_seed(0);
2173 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2174 let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2175 let peer1 = ed25519::PrivateKey::from_seed(1);
2176 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2177 let peer2 = ed25519::PrivateKey::from_seed(2);
2178 let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
2179 let addresses: Vec<_> =
2180 vec![peer0.public_key(), peer1.public_key(), peer2.public_key()];
2181
2182 let config0 = Config::test(
2185 peer0.clone(),
2186 socket0,
2187 vec![
2188 (peer1.public_key(), socket1.into()),
2189 (peer2.public_key(), socket1.into()),
2190 ],
2191 MAX_MESSAGE_SIZE,
2192 );
2193 let (mut network0, mut oracle0) =
2194 Network::new(context.child("peer").with_attribute("index", 0), config0);
2195 let (mut sender0, mut receiver0) =
2196 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2197 network0.start();
2198
2199 let mut config2 = Config::test(
2202 peer2.clone(),
2203 socket2,
2204 vec![(peer0.public_key(), socket0.into())],
2205 MAX_MESSAGE_SIZE,
2206 );
2207 config2.dialable = socket1.into();
2208 let (mut network2, mut oracle2) =
2209 Network::new(context.child("peer").with_attribute("index", 2), config2);
2210 let (_sender2, mut receiver2) =
2211 network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2212 network2.start();
2213
2214 oracle0.track(0, Set::try_from(addresses.clone()).unwrap());
2215 oracle2.track(0, Set::try_from(addresses.clone()).unwrap());
2216
2217 context.sleep(Duration::from_secs(30)).await;
2219
2220 loop {
2222 let sent =
2223 sender0.send(Recipients::All, peer2.public_key().as_ref().to_vec(), true);
2224 if sent.len() == 1 {
2225 assert_eq!(sent[0], peer2.public_key());
2226 break;
2227 }
2228 context.sleep(Duration::from_millis(100)).await;
2229 }
2230 let (sender, _) = receiver2.recv().await.unwrap();
2231 assert_eq!(sender, peer0.public_key());
2232
2233 let config1 = Config::test(
2236 peer1.clone(),
2237 socket1,
2238 vec![(peer0.public_key(), wrong_socket0.into())],
2239 MAX_MESSAGE_SIZE,
2240 );
2241 let (mut network1, mut oracle1) =
2242 Network::new(context.child("peer").with_attribute("index", 1), config1);
2243 let (mut sender1, _receiver1) =
2244 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2245 network1.start();
2246
2247 oracle1.track(0, Set::try_from(addresses).unwrap());
2248
2249 context.sleep(Duration::from_secs(30)).await;
2251
2252 loop {
2254 let sent =
2255 sender1.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
2256 if sent.len() == 2 {
2257 assert!(sent.contains(&peer0.public_key()));
2258 assert!(sent.contains(&peer2.public_key()));
2259 break;
2260 }
2261 context.sleep(Duration::from_millis(100)).await;
2262 }
2263
2264 let mut received0 = false;
2265 while let Ok((sender, _)) = receiver0.recv().await {
2266 if sender == peer1.public_key() {
2268 received0 = true;
2269 break;
2270 }
2271 }
2272 assert!(received0);
2273 let (sender, _) = receiver2.recv().await.unwrap();
2274 assert_eq!(sender, peer1.public_key());
2275 });
2276 }
2277
2278 #[test_traced]
2279 fn test_operations_after_shutdown_do_not_panic() {
2280 let executor = deterministic::Runner::default();
2281 executor.start(|context| async move {
2282 let peer = ed25519::PrivateKey::from_seed(0);
2283 let address = peer.public_key();
2284
2285 let peer_context = context.child("peer");
2286 let config = Config::test(
2287 peer.clone(),
2288 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2289 vec![],
2290 MAX_MESSAGE_SIZE,
2291 );
2292 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
2293
2294 let (mut sender, _receiver) =
2296 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2297 let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
2298 oracle.track(0, peers.clone());
2299
2300 let handle = network.start();
2302 handle.abort();
2303
2304 context.sleep(Duration::from_millis(100)).await;
2306
2307 oracle.track(1, peers.clone());
2309 let _ = oracle.peer_set(0).await;
2310 let _ = oracle.subscribe().await;
2311 crate::block_peer(&mut oracle, address.clone());
2312
2313 let _ = sender.send(Recipients::All, address.as_ref().to_vec(), true);
2315 });
2316 }
2317
2318 fn clean_shutdown(seed: u64) {
2319 let cfg = deterministic::Config::default()
2320 .with_seed(seed)
2321 .with_timeout(Some(Duration::from_secs(30)));
2322 let executor = deterministic::Runner::new(cfg);
2323 executor.start(|context| async move {
2324 let peer = ed25519::PrivateKey::from_seed(0);
2325
2326 let peer_context = context.child("peer");
2327 let config = Config::test(
2328 peer.clone(),
2329 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
2330 vec![],
2331 MAX_MESSAGE_SIZE,
2332 );
2333 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
2334
2335 let (_, _) =
2337 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2338 let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
2339 oracle.track(0, peers);
2340
2341 let handle = network.start();
2343
2344 context.sleep(Duration::from_millis(100)).await;
2346
2347 let running_before = count_running_tasks(&context, "peer_network");
2349 assert!(
2350 running_before > 0,
2351 "at least one network task should be running"
2352 );
2353
2354 handle.abort();
2356 let _ = handle.await;
2357
2358 context.sleep(Duration::from_millis(100)).await;
2360
2361 let running_after = count_running_tasks(&context, "peer_network");
2363 assert_eq!(
2364 running_after, 0,
2365 "all network tasks should be stopped, but {running_after} still running"
2366 );
2367 });
2368 }
2369
2370 #[test_traced]
2371 fn test_clean_shutdown() {
2372 for seed in 0..25 {
2373 clean_shutdown(seed);
2374 }
2375 }
2376
2377 #[test]
2378 fn test_broadcast_slow_peer_no_blocking() {
2379 let executor = deterministic::Runner::default();
2380 executor.start(|context| async move {
2381 let cfg = RouterConfig {
2383 mailbox_size: NZUsize!(1),
2384 };
2385 let (router, mailbox, messenger) =
2386 RouterActor::<_, ed25519::PublicKey>::new(context.child("router"), cfg);
2387
2388 let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2390 let _handle = router.start(channels);
2391
2392 let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2395 let (slow_relay, _slow_receivers) =
2396 Relay::new(context.child("slow_relay"), NZUsize!(10));
2397 assert!(
2398 mailbox.ready(slow_peer.clone(), slow_relay).await.is_some(),
2399 "Failed to register slow peer"
2400 );
2401
2402 let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2404 let (fast_relay, mut fast_receivers) =
2405 Relay::new(context.child("fast_relay"), NZUsize!(100));
2406 assert!(
2407 mailbox.ready(fast_peer.clone(), fast_relay).await.is_some(),
2408 "Failed to register fast peer"
2409 );
2410
2411 let message = IoBuf::from(vec![0u8; 100]);
2412
2413 for i in 0..11 {
2417 let sent = messenger.content(Recipients::All, 0, message.clone().into(), false);
2418 assert_ne!(
2419 sent,
2420 Unreliable::new(Feedback::Closed),
2421 "Broadcast {i} should be accepted"
2422 );
2423
2424 assert!(fast_receivers.low.recv().await.is_some());
2425 }
2426 });
2427 }
2428}