1mod actors;
191mod channels;
192mod config;
193mod metrics;
194mod network;
195mod types;
196
197use thiserror::Error;
198
199#[derive(Error, Debug)]
201pub enum Error {
202 #[error("network closed")]
203 NetworkClosed,
204}
205
206pub use actors::tracker::Oracle;
207pub use channels::{Receiver, Sender};
208pub use config::Config;
209pub use network::Network;
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::{
215 authenticated::{
216 lookup::actors::router::{Actor as RouterActor, Config as RouterConfig},
217 relay::Relay,
218 },
219 Address, AddressableManager, CheckedSender as _, Ingress, LimitedSender as _, Provider,
220 Receiver, Recipients, Sender,
221 };
222 use commonware_actor::{Feedback, Unreliable};
223 use commonware_cryptography::{ed25519, Signer as _};
224 use commonware_macros::{select, test_group, test_traced};
225 use commonware_runtime::{
226 deterministic, telemetry::metrics::count_running_tasks, tokio, BufferPooler, Clock, IoBuf,
227 Metrics, Network as RNetwork, Quota, Resolver, Runner, Spawner, Supervisor as _,
228 };
229 use commonware_utils::{
230 channel::mpsc,
231 hostname,
232 ordered::{Map, Set},
233 Hostname, NZUsize, TryCollect, NZU32,
234 };
235 use rand_core::{CryptoRngCore, RngCore};
236 use std::{
237 collections::HashSet,
238 net::{IpAddr, Ipv4Addr, SocketAddr},
239 time::Duration,
240 };
241
242 #[derive(Copy, Clone)]
243 enum Mode {
244 All,
245 Some,
246 One,
247 }
248
249 const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
251
252 fn assert_no_rate_limiting(metrics: &str) {
261 assert!(
262 !metrics.contains("messages_rate_limited_total{"),
263 "no messages should be rate limited: {metrics}"
264 );
265 }
266
267 async fn run_network(
272 context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
273 max_message_size: u32,
274 base_port: u16,
275 n: usize,
276 mode: Mode,
277 ) {
278 let mut peers_and_sks = Vec::new();
280 for i in 0..n {
281 let private_key = ed25519::PrivateKey::from_seed(i as u64);
282 let public_key = private_key.public_key();
283 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
284 peers_and_sks.push((private_key, public_key, address));
285 }
286 let peers: Vec<(ed25519::PublicKey, Address)> = peers_and_sks
287 .iter()
288 .map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
289 .collect::<Vec<_>>();
290
291 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
293 for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
294 let public_key = public_key.clone();
295
296 let context = context.child("peer").with_attribute("index", i);
298
299 let config = Config::test(private_key.clone(), *address, max_message_size);
301 let (mut network, mut oracle) = Network::new(context.child("network"), config);
302
303 oracle.track(0, Map::try_from(peers.clone()).unwrap());
305
306 let (mut sender, mut receiver) =
308 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
309
310 network.start();
312
313 context.child("agent").spawn({
315 let complete_sender = complete_sender.clone();
316 let peers = peers.clone();
317 move |context| async move {
318 let receiver = context.child("receiver").spawn(move |_| async move {
320 let mut received = HashSet::new();
322 while received.len() < n - 1 {
323 let (sender, message) = receiver.recv().await.unwrap();
325 assert_eq!(message, sender.as_ref());
326
327 received.insert(sender);
329 }
330 complete_sender.send(()).await.unwrap();
331
332 loop {
334 receiver.recv().await.unwrap();
335 }
336 });
337
338 let sender = context.child("sender").spawn(move |context| async move {
340 let mut recipients: Vec<_> = peers
342 .iter()
343 .enumerate()
344 .filter(|(j, _)| i != *j)
345 .map(|(_, (pk, _))| pk.clone())
346 .collect();
347 recipients.sort();
348
349 loop {
351 match mode {
352 Mode::One => {
353 for pub_key in &recipients {
354 loop {
356 let sent = sender.send(
357 Recipients::One(pub_key.clone()),
358 public_key.as_ref().to_vec(),
359 true,
360 );
361 if sent.len() != 1 {
362 context.sleep(Duration::from_millis(100)).await;
363 continue;
364 }
365 break;
366 }
367 }
368 }
369 Mode::Some | Mode::All => {
370 loop {
372 let sent = sender.send(
373 match mode {
374 Mode::Some => Recipients::Some(recipients.clone()),
375 Mode::All => Recipients::All,
376 _ => unreachable!(),
377 },
378 public_key.as_ref().to_vec(),
379 true,
380 );
381 if sent.len() != recipients.len() {
382 context.sleep(Duration::from_millis(100)).await;
383 continue;
384 }
385 break;
386 }
387 }
388 };
389
390 context.sleep(Duration::from_secs(10)).await;
392 }
393 });
394
395 select! {
397 receiver = receiver => {
398 panic!("receiver exited: {receiver:?}");
399 },
400 sender = sender => {
401 panic!("sender exited: {sender:?}");
402 },
403 }
404 }
405 });
406 }
407
408 for _ in 0..n {
410 complete_receiver.recv().await.unwrap();
411 }
412
413 assert_no_rate_limiting(&context.encode());
415 }
416
417 fn run_deterministic_test(seed: u64, mode: Mode) {
418 const NUM_PEERS: usize = 25;
420 const BASE_PORT: u16 = 3000;
421
422 let executor = deterministic::Runner::seeded(seed);
424 let state = executor.start(|context| async move {
425 run_network(
426 context.child("network"),
427 MAX_MESSAGE_SIZE,
428 BASE_PORT,
429 NUM_PEERS,
430 mode,
431 )
432 .await;
433 context.auditor().state()
434 });
435
436 let executor = deterministic::Runner::seeded(seed);
438 let state2 = executor.start(|context| async move {
439 run_network(
440 context.child("network"),
441 MAX_MESSAGE_SIZE,
442 BASE_PORT,
443 NUM_PEERS,
444 mode,
445 )
446 .await;
447 context.auditor().state()
448 });
449 assert_eq!(state, state2);
450 }
451
452 #[test_group("slow")]
453 #[test_traced]
454 fn test_determinism_one() {
455 for i in 0..10 {
456 run_deterministic_test(i, Mode::One);
457 }
458 }
459
460 #[test_group("slow")]
461 #[test_traced]
462 fn test_determinism_some() {
463 for i in 0..10 {
464 run_deterministic_test(i, Mode::Some);
465 }
466 }
467
468 #[test_group("slow")]
469 #[test_traced]
470 fn test_determinism_all() {
471 for i in 0..10 {
472 run_deterministic_test(i, Mode::All);
473 }
474 }
475
476 #[test_traced]
477 fn test_tokio_connectivity() {
478 let executor = tokio::Runner::default();
479 executor.start(|context| async move {
480 let base_port = 4000;
481 let n = 10;
482 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
483 });
484 }
485
486 #[test_traced]
487 fn test_multi_index_oracle() {
488 let base_port = 3000;
490 let n: usize = 10;
491
492 let executor = deterministic::Runner::default();
494 executor.start(|context| async move {
495 let mut peers_and_sks = Vec::new();
497 for i in 0..n {
498 let sk = ed25519::PrivateKey::from_seed(i as u64);
499 let pk = sk.public_key();
500 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
501 peers_and_sks.push((sk, pk, addr));
502 }
503 let peers = peers_and_sks
504 .iter()
505 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
506 .collect::<Vec<_>>();
507
508 let mut waiters = Vec::new();
510 for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
511 let context = context.child("peer").with_attribute("index", i);
513
514 let config = Config::test(
516 peer_sk.clone(),
517 *peer_addr,
518 1_024 * 1_024, );
520 let (mut network, mut oracle) = Network::new(context.child("network"), config);
521
522 oracle.track(0, Map::try_from([peers[0].clone()]).unwrap());
524 oracle.track(
525 1,
526 Map::try_from([peers[1].clone(), peers[2].clone()]).unwrap(),
527 );
528 oracle.track(
529 2,
530 peers
531 .iter()
532 .skip(2)
533 .cloned()
534 .try_collect::<Map<_, _>>()
535 .unwrap(),
536 );
537
538 let (mut sender, mut receiver) =
540 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
541
542 network.start();
544
545 let msg = peer_pk.clone();
547 let handler = context.child("agent").spawn(move |context| async move {
548 if i == 0 {
549 loop {
551 if sender
552 .send(Recipients::All, msg.as_ref().to_vec(), true)
553 .len()
554 == n - 1
555 {
556 break;
557 }
558
559 context.sleep(Duration::from_millis(100)).await;
561 }
562 } else {
563 let (sender, message) = receiver.recv().await.unwrap();
565 assert_eq!(message, sender.as_ref());
566 }
567 });
568
569 waiters.push(handler);
571 }
572
573 for waiter in waiters.into_iter().rev() {
575 waiter.await.unwrap();
576 }
577
578 assert_no_rate_limiting(&context.encode());
580 });
581 }
582
583 #[test_traced]
584 #[should_panic(expected = "message too large")]
585 fn test_message_too_large() {
586 let base_port = 3000;
588 let n: usize = 2;
589
590 let executor = deterministic::Runner::seeded(0);
592 executor.start(|mut context| async move {
593 let mut peers_and_sks = Vec::new();
595 for i in 0..n {
596 let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
597 let peer_pk = peer_sk.public_key();
598 let peer_addr =
599 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
600 peers_and_sks.push((peer_sk, peer_pk, peer_addr));
601 }
602 let peers: Map<_, _> = peers_and_sks
603 .iter()
604 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
605 .try_collect()
606 .unwrap();
607
608 let (sk, _, addr) = peers_and_sks[0].clone();
610 let config = Config::test(
611 sk,
612 addr,
613 1_024 * 1_024, );
615 let (mut network, mut oracle) = Network::new(context.child("network"), config);
616
617 oracle.track(0, peers.clone());
619
620 let (mut sender, _) =
622 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
623
624 network.start();
626
627 let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
630
631 let recipient = Recipients::One(peers[1].clone());
633 sender.send(recipient, msg, true);
634 });
635 }
636
637 #[test_traced]
638 fn test_rate_limiting() {
639 let base_port = 3000;
641 let n: usize = 2;
642
643 let executor = deterministic::Runner::seeded(0);
645 executor.start(|context| async move {
646 let mut peers_and_sks = Vec::new();
648 for i in 0..n {
649 let sk = ed25519::PrivateKey::from_seed(i as u64);
650 let pk = sk.public_key();
651 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
652 peers_and_sks.push((sk, pk, addr));
653 }
654 let peers: Map<_, _> = peers_and_sks
655 .iter()
656 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
657 .try_collect()
658 .unwrap();
659 let (sk0, _, addr0) = peers_and_sks[0].clone();
660 let (sk1, pk1, addr1) = peers_and_sks[1].clone();
661
662 let config0 = Config::test(sk0, addr0, 1_024 * 1_024); let (mut network0, mut oracle0) =
665 Network::new(context.child("peer").with_attribute("index", 0), config0);
666 oracle0.track(0, peers.clone());
667 let (mut sender0, _receiver0) =
668 network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
669 network0.start();
670
671 let config1 = Config::test(sk1, addr1, 1_024 * 1_024); let (mut network1, mut oracle1) =
674 Network::new(context.child("peer").with_attribute("index", 1), config1);
675 oracle1.track(0, peers.clone());
676 let (_sender1, _receiver1) =
677 network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
678 network1.start();
679
680 let msg = vec![0u8; 1024]; loop {
683 let checked = sender0.check(Recipients::All).unwrap();
685 if !checked.recipients().is_empty() {
686 checked.send(msg.clone(), true);
687 break;
688 }
689
690 context.sleep(Duration::from_mins(1)).await
693 }
694
695 let sent = sender0.send(Recipients::One(pk1), msg, true);
697 assert!(sent.is_empty());
698
699 for _ in 0..10 {
701 assert_no_rate_limiting(&context.encode());
702 context.sleep(Duration::from_millis(100)).await;
703 }
704 });
705 }
706
707 #[test_traced]
708 fn test_unordered_peer_sets() {
709 let (n, base_port) = (10, 3000);
710 let executor = deterministic::Runner::default();
711 executor.start(|context| async move {
712 let mut peers_and_sks = Vec::new();
714 for i in 0..n {
715 let sk = ed25519::PrivateKey::from_seed(i as u64);
716 let pk = sk.public_key();
717 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
718 peers_and_sks.push((sk, pk, addr));
719 }
720 let peer0 = peers_and_sks[0].clone();
721 let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
722 let (network, mut oracle) = Network::new(context.child("network"), config);
723 network.start();
724
725 let mut subscription = oracle.subscribe().await;
727
728 let set10: Map<_, _> = peers_and_sks
730 .iter()
731 .take(2)
732 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
733 .try_collect()
734 .unwrap();
735 oracle.track(10, set10.clone());
736 let update = subscription.recv().await.unwrap();
737 assert_eq!(update.index, 10);
738 assert_eq!(&update.latest.primary, set10.keys());
739 assert!(update.latest.secondary.is_empty());
740 assert_eq!(&update.all.primary, set10.keys());
741 assert!(update.all.secondary.is_empty());
742
743 let set9: Map<_, _> = peers_and_sks
745 .iter()
746 .skip(2)
747 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
748 .try_collect()
749 .unwrap();
750 oracle.track(9, set9.clone());
751
752 let set11: Map<_, _> = peers_and_sks
754 .iter()
755 .skip(4)
756 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
757 .try_collect()
758 .unwrap();
759 oracle.track(11, set11.clone());
760 let update = subscription.recv().await.unwrap();
761 assert_eq!(update.index, 11);
762 assert_eq!(&update.latest.primary, set11.keys());
763 assert!(update.latest.secondary.is_empty());
764 let all_keys: Set<_> = set10
765 .into_keys()
766 .into_iter()
767 .chain(set11.into_keys().into_iter())
768 .try_collect()
769 .unwrap();
770 assert_eq!(update.all.primary, all_keys);
771 assert!(update.all.secondary.is_empty());
772 });
773 }
774
775 #[test_traced]
776 fn test_graceful_shutdown() {
777 let base_port = 3000;
778 let n: usize = 5;
779
780 let executor = deterministic::Runner::default();
781 executor.start(|context| async move {
782 let mut peers_and_sks = Vec::new();
784 for i in 0..n {
785 let sk = ed25519::PrivateKey::from_seed(i as u64);
786 let pk = sk.public_key();
787 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
788 peers_and_sks.push((sk, pk, addr));
789 }
790 let peers: Map<_, _> = peers_and_sks
791 .iter()
792 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
793 .try_collect()
794 .unwrap();
795
796 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
798 for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
799 let peer_context = context.child("peer").with_attribute("index", i);
800 let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
801 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
802
803 oracle.track(0, peers.clone());
805
806 let (mut sender, mut receiver) =
807 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
808 network.start();
809
810 peer_context.child("agent").spawn({
811 let complete_sender = complete_sender.clone();
812 let pk = pk.clone();
813 move |context| async move {
814 let expected_connections = if i == 0 { n - 1 } else { 1 };
816
817 loop {
819 let sent = sender.send(Recipients::All, pk.as_ref().to_vec(), true);
820 if sent.len() >= expected_connections {
821 break;
822 }
823 context.sleep(Duration::from_millis(100)).await;
824 }
825
826 complete_sender.send(()).await.unwrap();
828
829 loop {
831 select! {
832 result = receiver.recv() => {
833 if result.is_err() {
834 break;
836 }
837 },
838 _ = context.stopped() => {
839 break;
841 },
842 }
843 }
844 }
845 });
846 }
847
848 for _ in 0..n {
850 complete_receiver.recv().await.unwrap();
851 }
852
853 let metrics_before = context.encode();
855 let tasks_running = |metrics: &str, name: &str| -> Option<u64> {
856 metrics.lines().find_map(|line| {
857 (line.starts_with("runtime_tasks_running{")
858 && line.contains(&format!("name=\"{name}\""))
859 && line.contains("kind=\"Task\""))
860 .then(|| {
861 line.split_whitespace()
862 .next_back()
863 .expect("metric line must have a value")
864 .parse::<u64>()
865 .expect("running task count must be an integer")
866 })
867 })
868 };
869
870 for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
871 let name = format!("peer_network_{actor}");
872 assert_eq!(
873 tasks_running(&metrics_before, &name),
874 Some(n as u64),
875 "{name} should have {n} running tasks before shutdown"
876 );
877 }
878
879 context.child("shutdown").spawn(move |context| async move {
881 let result = context.stop(0, Some(Duration::from_secs(5))).await;
883
884 assert!(
886 result.is_ok(),
887 "graceful shutdown should complete: {result:?}"
888 );
889 });
890
891 context.stopped().await.unwrap();
893
894 context.sleep(Duration::from_millis(100)).await;
896
897 let metrics_after = context.encode();
899 for actor in ["tracker", "router", "spawner", "listener", "dialer"] {
900 let name = format!("peer_network_{actor}");
901 assert_eq!(
902 tasks_running(&metrics_after, &name),
903 Some(0),
904 "{name} should be stopped"
905 );
906 }
907 });
908 }
909
910 #[test_traced]
911 fn test_subscription_includes_self_when_registered() {
912 let base_port = 3000;
913 let executor = deterministic::Runner::default();
914 executor.start(|context| async move {
915 let self_sk = ed25519::PrivateKey::from_seed(0);
917 let self_pk = self_sk.public_key();
918 let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
919
920 let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
921 let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
922
923 let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
925 let (network, mut oracle) = Network::new(context.child("network"), config);
926 network.start();
927
928 let mut subscription = oracle.subscribe().await;
930
931 let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
933 oracle.track(1, peer_set.clone());
934
935 let update = subscription.recv().await.unwrap();
937 assert_eq!(update.index, 1);
938 assert_eq!(update.latest.primary.len(), 1);
939 assert_eq!(update.all.primary.len(), 1);
940 assert!(update.all.secondary.is_empty());
941
942 assert!(
944 update.latest.primary.position(&self_pk).is_none(),
945 "new set should not include self"
946 );
947 assert!(
948 update.latest.primary.position(&other_pk).is_some(),
949 "new set should include other"
950 );
951 assert!(
952 update.latest.secondary.is_empty(),
953 "new secondary set should be empty"
954 );
955
956 assert!(
958 update.all.primary.position(&self_pk).is_none(),
959 "peer set should not include self"
960 );
961 assert!(
962 update.all.primary.position(&other_pk).is_some(),
963 "peer set should include other"
964 );
965
966 let peer_set: Map<_, _> = [
968 (self_pk.clone(), self_addr.into()),
969 (other_pk.clone(), other_addr.into()),
970 ]
971 .try_into()
972 .unwrap();
973 oracle.track(2, peer_set.clone());
974
975 let update = subscription.recv().await.unwrap();
977 assert_eq!(update.index, 2);
978 assert_eq!(update.latest.primary.len(), 2);
979 assert_eq!(update.all.primary.len(), 2);
980 assert!(update.all.secondary.is_empty());
981
982 assert!(
984 update.latest.primary.position(&self_pk).is_some(),
985 "new set should include self"
986 );
987 assert!(
988 update.latest.primary.position(&other_pk).is_some(),
989 "new set should include other"
990 );
991 assert!(
992 update.latest.secondary.is_empty(),
993 "new secondary set should be empty"
994 );
995
996 assert!(
998 update.all.primary.position(&self_pk).is_some(),
999 "peer set should include self"
1000 );
1001 assert!(
1002 update.all.primary.position(&other_pk).is_some(),
1003 "peer set should include other"
1004 );
1005 });
1006 }
1007
1008 #[test_traced]
1009 fn test_dns_peer_addresses() {
1010 let base_port = 3200;
1011 let n: usize = 3;
1012
1013 let executor = deterministic::Runner::default();
1014 executor.start(|context| async move {
1015 let mut peers_and_sks = Vec::new();
1017 for i in 0..n {
1018 let private_key = ed25519::PrivateKey::from_seed(i as u64);
1019 let public_key = private_key.public_key();
1020 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1021 let host_str = format!("peer-{i}.local");
1022 let host = Hostname::new(&host_str).unwrap();
1023 peers_and_sks.push((private_key, public_key, socket, host_str, host));
1024 }
1025
1026 for (_, _, socket, host_str, _) in &peers_and_sks {
1028 context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
1029 }
1030
1031 let peers: Vec<(_, Address)> = peers_and_sks
1033 .iter()
1034 .map(|(_, pk, socket, _, host)| {
1035 (
1036 pk.clone(),
1037 Address::Asymmetric {
1038 ingress: Ingress::Dns {
1039 host: host.clone(),
1040 port: socket.port(),
1041 },
1042 egress: *socket,
1043 },
1044 )
1045 })
1046 .collect();
1047
1048 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1050 for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
1051 let context = context.child("peer").with_attribute("index", i);
1052
1053 let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1055 let (mut network, mut oracle) = Network::new(context.child("network"), config);
1056
1057 oracle.track(0, Map::try_from(peers.clone()).unwrap());
1059
1060 let (mut sender, mut receiver) =
1062 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1063
1064 network.start();
1065
1066 let pk = public_key.clone();
1068 context.child("agent").spawn({
1069 let complete_sender = complete_sender.clone();
1070 let peers = peers.clone();
1071 move |context| async move {
1072 let receiver = context.child("receiver").spawn(move |_| async move {
1074 let mut received = HashSet::new();
1075 while received.len() < n - 1 {
1076 let (sender, message) = receiver.recv().await.unwrap();
1077 assert_eq!(message, sender.as_ref());
1078 received.insert(sender);
1079 }
1080 complete_sender.send(()).await.unwrap();
1081
1082 loop {
1083 receiver.recv().await.unwrap();
1084 }
1085 });
1086
1087 let sender_task =
1089 context.child("sender").spawn(move |context| async move {
1090 loop {
1091 let mut recipients: Vec<_> = peers
1092 .iter()
1093 .filter(|(p, _)| p != &pk)
1094 .map(|(p, _)| p.clone())
1095 .collect();
1096 recipients.sort();
1097
1098 loop {
1099 let mut sent = sender.send(
1100 Recipients::All,
1101 pk.as_ref().to_vec(),
1102 true,
1103 );
1104 if sent.len() != n - 1 {
1105 context.sleep(Duration::from_millis(100)).await;
1106 continue;
1107 }
1108 sent.sort();
1109 assert_eq!(sent, recipients);
1110 break;
1111 }
1112
1113 context.sleep(Duration::from_secs(10)).await;
1114 }
1115 });
1116
1117 select! {
1118 receiver = receiver => {
1119 panic!("receiver exited: {receiver:?}")
1120 },
1121 sender = sender_task => {
1122 panic!("sender exited: {sender:?}")
1123 },
1124 }
1125 }
1126 });
1127 }
1128
1129 for _ in 0..n {
1131 complete_receiver.recv().await.unwrap();
1132 }
1133
1134 assert_no_rate_limiting(&context.encode());
1135 });
1136 }
1137
1138 #[test_traced]
1139 fn test_mixed_socket_and_dns_addresses() {
1140 let base_port = 3300;
1141 let n: usize = 4;
1142
1143 let executor = deterministic::Runner::default();
1144 executor.start(|context| async move {
1145 let mut peers_and_sks = Vec::new();
1147 for i in 0..n {
1148 let private_key = ed25519::PrivateKey::from_seed(i as u64);
1149 let public_key = private_key.public_key();
1150 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1151 peers_and_sks.push((private_key, public_key, socket));
1152 }
1153
1154 for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
1156 context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
1157 }
1158
1159 let peers: Vec<(_, Address)> = peers_and_sks
1161 .iter()
1162 .enumerate()
1163 .map(|(i, (_, pk, socket))| {
1164 let addr = if i < 2 {
1165 Address::Symmetric(*socket)
1167 } else {
1168 Address::Asymmetric {
1170 ingress: Ingress::Dns {
1171 host: hostname!(&format!("peer-{i}.local")),
1172 port: socket.port(),
1173 },
1174 egress: *socket,
1175 }
1176 };
1177 (pk.clone(), addr)
1178 })
1179 .collect();
1180
1181 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1183 for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
1184 let context = context.child("peer").with_attribute("index", i);
1185
1186 let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1188 let (mut network, mut oracle) = Network::new(context.child("network"), config);
1189
1190 oracle.track(0, Map::try_from(peers.clone()).unwrap());
1192
1193 let (mut sender, mut receiver) =
1195 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1196
1197 network.start();
1198
1199 let pk = public_key.clone();
1201 context.child("agent").spawn({
1202 let complete_sender = complete_sender.clone();
1203 let peers = peers.clone();
1204 move |context| async move {
1205 let receiver = context.child("receiver").spawn(move |_| async move {
1207 let mut received = HashSet::new();
1208 while received.len() < n - 1 {
1209 let (sender, message) = receiver.recv().await.unwrap();
1210 assert_eq!(message, sender.as_ref());
1211 received.insert(sender);
1212 }
1213 complete_sender.send(()).await.unwrap();
1214
1215 loop {
1216 receiver.recv().await.unwrap();
1217 }
1218 });
1219
1220 let sender_task =
1222 context.child("sender").spawn(move |context| async move {
1223 loop {
1224 let mut recipients: Vec<_> = peers
1225 .iter()
1226 .filter(|(p, _)| p != &pk)
1227 .map(|(p, _)| p.clone())
1228 .collect();
1229 recipients.sort();
1230
1231 loop {
1232 let mut sent = sender.send(
1233 Recipients::All,
1234 pk.as_ref().to_vec(),
1235 true,
1236 );
1237 if sent.len() != n - 1 {
1238 context.sleep(Duration::from_millis(100)).await;
1239 continue;
1240 }
1241 sent.sort();
1242 assert_eq!(sent, recipients);
1243 break;
1244 }
1245
1246 context.sleep(Duration::from_secs(10)).await;
1247 }
1248 });
1249
1250 select! {
1251 receiver = receiver => {
1252 panic!("receiver exited: {receiver:?}")
1253 },
1254 sender = sender_task => {
1255 panic!("sender exited: {sender:?}")
1256 },
1257 }
1258 }
1259 });
1260 }
1261
1262 for _ in 0..n {
1264 complete_receiver.recv().await.unwrap();
1265 }
1266
1267 assert_no_rate_limiting(&context.encode());
1268 });
1269 }
1270
1271 #[test_traced]
1272 fn test_dns_resolving_to_private_ip_not_dialed() {
1273 let base_port = 4400;
1276 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1277 executor.start(|context| async move {
1278 let peer0 = ed25519::PrivateKey::from_seed(0);
1279 let peer1 = ed25519::PrivateKey::from_seed(1);
1280
1281 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1282 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1283
1284 context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
1286
1287 let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1289 config0.allow_private_ips = true;
1290 let (mut network0, mut oracle0) =
1291 Network::new(context.child("peer").with_attribute("index", 0), config0);
1292
1293 let peers0: Vec<(_, Address)> = vec![
1295 (peer0.public_key(), Address::Symmetric(socket0)),
1296 (peer1.public_key(), Address::Symmetric(socket1)),
1297 ];
1298 oracle0.track(0, Map::try_from(peers0).unwrap());
1299
1300 let (_sender0, mut receiver0) =
1301 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1302 network0.start();
1303
1304 let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1306 config1.allow_private_ips = false; let (mut network1, mut oracle1) =
1308 Network::new(context.child("peer").with_attribute("index", 1), config1);
1309
1310 let peers1: Vec<(_, Address)> = vec![
1312 (
1313 peer0.public_key(),
1314 Address::Asymmetric {
1315 ingress: Ingress::Dns {
1316 host: hostname!("peer-0.local"),
1317 port: socket0.port(),
1318 },
1319 egress: socket0,
1320 },
1321 ),
1322 (peer1.public_key(), Address::Symmetric(socket1)),
1323 ];
1324 oracle1.track(0, Map::try_from(peers1).unwrap());
1325
1326 let (mut sender1, _receiver1) =
1327 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1328 network1.start();
1329
1330 context.sleep(Duration::from_secs(5)).await;
1332
1333 let sent = sender1.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
1335 assert!(
1336 sent.is_empty(),
1337 "peer 1 should not have connected to peer 0 (private IP)"
1338 );
1339
1340 select! {
1342 msg = receiver0.recv() => {
1343 panic!("peer 0 should not have received any message, got: {msg:?}");
1344 },
1345 _ = context.sleep(Duration::from_secs(1)) => {
1346 },
1348 }
1349 });
1350 }
1351
1352 #[test_traced]
1353 fn test_dns_mixed_ips_connectivity() {
1354 for seed in 0..25 {
1360 let base_port = 3500;
1361
1362 let cfg = deterministic::Config::default()
1363 .with_seed(seed)
1364 .with_timeout(Some(Duration::from_secs(120)));
1365 let executor = deterministic::Runner::new(cfg);
1366 executor.start(|context| async move {
1367 let peer0 = ed25519::PrivateKey::from_seed(0);
1368 let peer1 = ed25519::PrivateKey::from_seed(1);
1369
1370 let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1371 let socket0 = SocketAddr::new(good_ip, base_port);
1372 let socket1 = SocketAddr::new(good_ip, base_port + 1);
1373
1374 let mut all_ips0: Vec<IpAddr> = (1..=3)
1376 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1377 .collect();
1378 all_ips0.push(good_ip);
1379 context.resolver_register("peer-0.local", Some(all_ips0));
1380
1381 let mut all_ips1: Vec<IpAddr> = (1..=3)
1382 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1383 .collect();
1384 all_ips1.push(good_ip);
1385 context.resolver_register("peer-1.local", Some(all_ips1));
1386
1387 let peers: Vec<(_, Address)> = vec![
1389 (
1390 peer0.public_key(),
1391 Address::Asymmetric {
1392 ingress: Ingress::Dns {
1393 host: hostname!("peer-0.local"),
1394 port: base_port,
1395 },
1396 egress: socket0,
1397 },
1398 ),
1399 (
1400 peer1.public_key(),
1401 Address::Asymmetric {
1402 ingress: Ingress::Dns {
1403 host: hostname!("peer-1.local"),
1404 port: base_port + 1,
1405 },
1406 egress: socket1,
1407 },
1408 ),
1409 ];
1410
1411 let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1413 let (mut network0, mut oracle0) =
1414 Network::new(context.child("peer").with_attribute("index", 0), config0);
1415 oracle0.track(0, Map::try_from(peers.clone()).unwrap());
1416 let (_sender0, mut receiver0) =
1417 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1418 network0.start();
1419
1420 let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1422 let (mut network1, mut oracle1) =
1423 Network::new(context.child("peer").with_attribute("index", 1), config1);
1424 oracle1.track(0, Map::try_from(peers.clone()).unwrap());
1425 let (mut sender1, _receiver1) =
1426 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1427 network1.start();
1428
1429 loop {
1431 let checked = sender1.check(Recipients::All).unwrap();
1432 if !checked.recipients().is_empty() {
1433 checked.send(peer1.public_key().as_ref().to_vec(), true);
1434 }
1435
1436 select! {
1437 result = receiver0.recv() => {
1438 let (sender, msg) = result.unwrap();
1439 assert_eq!(sender, peer1.public_key());
1440 assert_eq!(msg, peer1.public_key().as_ref());
1441 break;
1442 },
1443 _ = context.sleep(Duration::from_millis(100)) => {},
1444 }
1445 }
1446 });
1447 }
1448 }
1449
1450 #[test_traced]
1451 fn test_many_peer_restart_with_new_address() {
1452 let base_port = 9500;
1453 let n = 5;
1454
1455 let executor = deterministic::Runner::default();
1456 executor.start(|context| async move {
1457 let peers: Vec<_> = (0..n)
1459 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1460 .collect();
1461 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1462
1463 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1465 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1466 let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1467 let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1468 (0..n).map(|_| None).collect();
1469
1470 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1472
1473 let peer_set: Vec<(_, Address)> = addresses
1475 .iter()
1476 .enumerate()
1477 .map(|(i, pk)| {
1478 (
1479 pk.clone(),
1480 Address::Symmetric(SocketAddr::new(
1481 IpAddr::V4(Ipv4Addr::LOCALHOST),
1482 ports[i],
1483 )),
1484 )
1485 })
1486 .collect();
1487
1488 for (i, peer) in peers.iter().enumerate() {
1490 let peer_context = context.child("peer").with_attribute("index", i);
1491
1492 let config = Config::test(
1493 peer.clone(),
1494 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1495 MAX_MESSAGE_SIZE,
1496 );
1497 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1498
1499 oracle.track(0, Map::try_from(peer_set.clone()).unwrap());
1501
1502 let (sender, receiver) =
1503 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1504 senders[i] = Some(sender);
1505 receivers[i] = Some(receiver);
1506 oracles[i] = Some(oracle);
1507
1508 let handle = network.start();
1509 handles[i] = Some(handle);
1510 }
1511
1512 for (i, sender) in senders.iter_mut().enumerate() {
1514 let sender = sender.as_mut().unwrap();
1515 loop {
1516 let sent = sender.send(
1517 Recipients::All,
1518 peers[i].public_key().as_ref().to_vec(),
1519 true,
1520 );
1521 if sent.len() == n - 1 {
1522 break;
1523 }
1524 context.sleep(Duration::from_millis(100)).await;
1525 }
1526 }
1527
1528 for receiver in receivers.iter_mut() {
1530 let receiver = receiver.as_mut().unwrap();
1531 let mut received = HashSet::new();
1532 while received.len() < n - 1 {
1533 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1534 assert_eq!(message, sender.as_ref());
1535 received.insert(sender);
1536 }
1537 }
1538
1539 let mut restart_counter = 0u16;
1541 for round in 0..3 {
1542 for restart_peer_idx in 1..n {
1543 restart_counter += 1;
1545 let new_port = base_port + 100 + restart_counter;
1546 ports[restart_peer_idx] = new_port;
1547
1548 if let Some(handle) = handles[restart_peer_idx].take() {
1550 handle.abort();
1551 }
1552 senders[restart_peer_idx] = None;
1553 receivers[restart_peer_idx] = None;
1554 oracles[restart_peer_idx] = None;
1555
1556 let updated_peer_set: Vec<(_, Address)> = addresses
1558 .iter()
1559 .enumerate()
1560 .map(|(i, pk)| {
1561 (
1562 pk.clone(),
1563 Address::Symmetric(SocketAddr::new(
1564 IpAddr::V4(Ipv4Addr::LOCALHOST),
1565 ports[i],
1566 )),
1567 )
1568 })
1569 .collect();
1570
1571 for oracle in oracles.iter_mut().flatten() {
1573 oracle.track(
1574 (round * (n - 1) + restart_peer_idx) as u64,
1575 Map::try_from(updated_peer_set.clone()).unwrap(),
1576 );
1577 }
1578
1579 let peer_context = context
1581 .child("peer_round")
1582 .with_attribute("index", restart_peer_idx)
1583 .with_attribute("round", round);
1584 let config = Config::test(
1585 peers[restart_peer_idx].clone(),
1586 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1587 MAX_MESSAGE_SIZE,
1588 );
1589 let (mut network, mut oracle) =
1590 Network::new(peer_context.child("network"), config);
1591
1592 oracle.track(
1593 (round * (n - 1) + restart_peer_idx) as u64,
1594 Map::try_from(updated_peer_set.clone()).unwrap(),
1595 );
1596
1597 let (sender, receiver) = network.register(
1598 0,
1599 Quota::per_second(NZU32!(100)),
1600 DEFAULT_MESSAGE_BACKLOG,
1601 );
1602 senders[restart_peer_idx] = Some(sender);
1603 receivers[restart_peer_idx] = Some(receiver);
1604 oracles[restart_peer_idx] = Some(oracle);
1605
1606 let handle = network.start();
1607 handles[restart_peer_idx] = Some(handle);
1608
1609 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1611 loop {
1612 let sent = restarted_sender.send(
1613 Recipients::All,
1614 peers[restart_peer_idx].public_key().as_ref().to_vec(),
1615 true,
1616 );
1617 if sent.len() == n - 1 {
1618 break;
1619 }
1620 context.sleep(Duration::from_millis(100)).await;
1621 }
1622
1623 for i in 0..n {
1625 if i == restart_peer_idx {
1626 continue;
1627 }
1628 let sender = senders[i].as_mut().unwrap();
1629 loop {
1630 let sent = sender.send(
1631 Recipients::One(addresses[restart_peer_idx].clone()),
1632 peers[i].public_key().as_ref().to_vec(),
1633 true,
1634 );
1635 if sent.len() == 1 {
1636 break;
1637 }
1638 context.sleep(Duration::from_millis(100)).await;
1639 }
1640 }
1641
1642 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1644 let mut received = HashSet::new();
1645 while received.len() < n - 1 {
1646 let (sender, message): (ed25519::PublicKey, _) =
1647 restarted_receiver.recv().await.unwrap();
1648 assert_eq!(message, sender.as_ref());
1649 received.insert(sender);
1650 }
1651 }
1652 }
1653
1654 assert_no_rate_limiting(&context.encode());
1655 });
1656 }
1657
1658 #[test_traced]
1659 fn test_simultaneous_peer_restart() {
1660 let base_port = 9700;
1661 let n = 5;
1662
1663 let executor = deterministic::Runner::default();
1664 executor.start(|context| async move {
1665 let peers: Vec<_> = (0..n)
1667 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1668 .collect();
1669 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1670
1671 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1673
1674 let peer_set: Vec<(_, Address)> = addresses
1676 .iter()
1677 .enumerate()
1678 .map(|(i, pk)| {
1679 (
1680 pk.clone(),
1681 Address::Symmetric(SocketAddr::new(
1682 IpAddr::V4(Ipv4Addr::LOCALHOST),
1683 ports[i],
1684 )),
1685 )
1686 })
1687 .collect();
1688
1689 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1691 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1692 let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1693 let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1694 (0..n).map(|_| None).collect();
1695
1696 for (i, peer) in peers.iter().enumerate() {
1698 let peer_context = context.child("peer").with_attribute("index", i);
1699
1700 let config = Config::test(
1701 peer.clone(),
1702 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1703 MAX_MESSAGE_SIZE,
1704 );
1705 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1706
1707 oracle.track(0, Map::try_from(peer_set.clone()).unwrap());
1709
1710 let (sender, receiver) =
1711 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1712 senders[i] = Some(sender);
1713 receivers[i] = Some(receiver);
1714 oracles[i] = Some(oracle);
1715
1716 let handle = network.start();
1717 handles[i] = Some(handle);
1718 }
1719
1720 for (i, sender) in senders.iter_mut().enumerate() {
1722 let sender = sender.as_mut().unwrap();
1723 loop {
1724 let sent = sender.send(
1725 Recipients::All,
1726 peers[i].public_key().as_ref().to_vec(),
1727 true,
1728 );
1729 if sent.len() == n - 1 {
1730 break;
1731 }
1732 context.sleep(Duration::from_millis(100)).await;
1733 }
1734 }
1735
1736 for receiver in receivers.iter_mut() {
1738 let receiver = receiver.as_mut().unwrap();
1739 let mut received = HashSet::new();
1740 while received.len() < n - 1 {
1741 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1742 assert_eq!(message, sender.as_ref());
1743 received.insert(sender);
1744 }
1745 }
1746
1747 let restart_peers: Vec<usize> = (1..n).collect();
1752 for &idx in &restart_peers {
1753 if let Some(handle) = handles[idx].take() {
1754 handle.abort();
1755 }
1756 senders[idx] = None;
1757 receivers[idx] = None;
1758 oracles[idx] = None;
1759 ports[idx] = base_port + 100 + idx as u16;
1760 }
1761
1762 context.sleep(Duration::from_secs(2)).await;
1764
1765 let updated_peer_set: Vec<(_, Address)> = addresses
1767 .iter()
1768 .enumerate()
1769 .map(|(i, pk)| {
1770 (
1771 pk.clone(),
1772 Address::Symmetric(SocketAddr::new(
1773 IpAddr::V4(Ipv4Addr::LOCALHOST),
1774 ports[i],
1775 )),
1776 )
1777 })
1778 .collect();
1779
1780 oracles[0]
1782 .as_mut()
1783 .unwrap()
1784 .track(1, Map::try_from(updated_peer_set.clone()).unwrap());
1785
1786 for &idx in &restart_peers {
1788 let peer_context = context.child("peer_restarted").with_attribute("index", idx);
1789 let config = Config::test(
1790 peers[idx].clone(),
1791 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1792 MAX_MESSAGE_SIZE,
1793 );
1794 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1795
1796 oracle.track(1, Map::try_from(updated_peer_set.clone()).unwrap());
1797
1798 let (sender, receiver) =
1799 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1800 senders[idx] = Some(sender);
1801 receivers[idx] = Some(receiver);
1802 oracles[idx] = Some(oracle);
1803
1804 let handle = network.start();
1805 handles[idx] = Some(handle);
1806 }
1807
1808 for (i, sender) in senders.iter_mut().enumerate() {
1810 let sender = sender.as_mut().unwrap();
1811 loop {
1812 let sent = sender.send(
1813 Recipients::All,
1814 peers[i].public_key().as_ref().to_vec(),
1815 true,
1816 );
1817 if sent.len() == n - 1 {
1818 break;
1819 }
1820 context.sleep(Duration::from_millis(100)).await;
1821 }
1822 }
1823
1824 for receiver in receivers.iter_mut() {
1826 let receiver = receiver.as_mut().unwrap();
1827 let mut received = HashSet::new();
1828 while received.len() < n - 1 {
1829 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1830 assert_eq!(message, sender.as_ref());
1831 received.insert(sender);
1832 }
1833 }
1834
1835 assert_no_rate_limiting(&context.encode());
1836 });
1837 }
1838 #[test_traced]
1839 fn test_operations_after_shutdown_do_not_panic() {
1840 let executor = deterministic::Runner::default();
1841 executor.start(|context| async move {
1842 let peer = ed25519::PrivateKey::from_seed(0);
1843 let address = peer.public_key();
1844
1845 let peer_context = context.child("peer");
1846 let config = Config::test(
1847 peer.clone(),
1848 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1849 MAX_MESSAGE_SIZE,
1850 );
1851 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1852
1853 let (mut sender, _receiver) =
1855 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1856 let peer_addr =
1857 Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1858 let peers: Map<ed25519::PublicKey, Address> =
1859 vec![(address.clone(), peer_addr)].try_into().unwrap();
1860 oracle.track(0, peers.clone());
1861
1862 let handle = network.start();
1864 handle.abort();
1865
1866 context.sleep(Duration::from_millis(100)).await;
1868
1869 oracle.track(1, peers.clone());
1871 let _ = oracle.peer_set(0).await;
1872 let _ = oracle.subscribe().await;
1873 crate::block_peer(&mut oracle, address.clone());
1874
1875 let sent = sender.send(Recipients::All, address.as_ref().to_vec(), true);
1877 assert!(sent.is_empty());
1878 });
1879 }
1880
1881 fn clean_shutdown(seed: u64) {
1882 let cfg = deterministic::Config::default()
1883 .with_seed(seed)
1884 .with_timeout(Some(Duration::from_secs(30)));
1885 let executor = deterministic::Runner::new(cfg);
1886 executor.start(|context| async move {
1887 let peer = ed25519::PrivateKey::from_seed(0);
1888
1889 let peer_context = context.child("peer");
1890 let config = Config::test(
1891 peer.clone(),
1892 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1893 MAX_MESSAGE_SIZE,
1894 );
1895 let (mut network, mut oracle) = Network::new(peer_context.child("network"), config);
1896
1897 let (_, _) =
1899 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1900 let peer_addr =
1901 Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1902 let peers: Map<ed25519::PublicKey, Address> =
1903 vec![(peer.public_key(), peer_addr)].try_into().unwrap();
1904 oracle.track(0, peers);
1905
1906 let handle = network.start();
1908
1909 context.sleep(Duration::from_millis(100)).await;
1911
1912 let running_before = count_running_tasks(&context, "peer_network");
1914 assert!(
1915 running_before > 0,
1916 "at least one network task should be running"
1917 );
1918
1919 handle.abort();
1921 let _ = handle.await;
1922
1923 context.sleep(Duration::from_millis(100)).await;
1925
1926 let running_after = count_running_tasks(&context, "peer_network");
1928 assert_eq!(
1929 running_after, 0,
1930 "all network tasks should be stopped, but {running_after} still running"
1931 );
1932 });
1933 }
1934
1935 #[test_traced]
1936 fn test_clean_shutdown() {
1937 for seed in 0..25 {
1938 clean_shutdown(seed);
1939 }
1940 }
1941
1942 fn duplicate_addresses_disconnected(seed: u64) {
1943 let base_port = 6000;
1944 let executor = deterministic::Runner::seeded(seed);
1945 executor.start(|context| async move {
1946 let peer0 = ed25519::PrivateKey::from_seed(0);
1947 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1948 let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
1949 let peer1 = ed25519::PrivateKey::from_seed(1);
1950 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1951 let peer2 = ed25519::PrivateKey::from_seed(2);
1952
1953 let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
1955 let (mut network0, mut oracle0) =
1956 Network::new(context.child("peer").with_attribute("index", 0), config0);
1957 let (mut sender0, _receiver0) =
1958 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1959 network0.start();
1960
1961 let peer_set0: Vec<(_, Address)> = vec![
1963 (peer0.public_key(), Address::Symmetric(socket0)),
1964 (peer1.public_key(), Address::Symmetric(socket1)),
1965 (peer2.public_key(), Address::Symmetric(socket1)),
1966 ];
1967 oracle0.track(0, Map::try_from(peer_set0).unwrap());
1968
1969 context.sleep(Duration::from_secs(30)).await;
1971
1972 let sent = sender0.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
1974 assert!(sent.is_empty());
1975
1976 let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
1978 let (mut network1, mut oracle1) =
1979 Network::new(context.child("peer").with_attribute("index", 1), config1);
1980 let (_sender1, mut receiver1) =
1981 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1982 network1.start();
1983
1984 let peer_set1: Vec<(_, Address)> = vec![
1986 (peer0.public_key(), Address::Symmetric(wrong_socket0)),
1987 (peer1.public_key(), Address::Symmetric(socket1)),
1988 (peer2.public_key(), Address::Symmetric(socket1)),
1989 ];
1990 oracle1.track(0, Map::try_from(peer_set1).unwrap());
1991
1992 context.sleep(Duration::from_secs(30)).await;
1994
1995 loop {
1997 let sent =
1998 sender0.send(Recipients::All, peer0.public_key().as_ref().to_vec(), true);
1999 if sent.len() == 1 {
2000 assert_eq!(sent[0], peer1.public_key());
2001 break;
2002 }
2003 context.sleep(Duration::from_millis(100)).await;
2004 }
2005 let (sender, _) = receiver1.recv().await.unwrap();
2006 assert_eq!(sender, peer0.public_key());
2007 });
2008 }
2009
2010 #[test_traced]
2011 fn test_duplicate_addresses_disconnected() {
2012 for seed in 0..25 {
2014 duplicate_addresses_disconnected(seed);
2015 }
2016 }
2017
2018 #[test_traced]
2019 fn test_duplicate_addresses_connected() {
2020 let base_port = 6000;
2021 let executor = deterministic::Runner::default();
2022 executor.start(|context| async move {
2023 let peer0 = ed25519::PrivateKey::from_seed(0);
2024 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2025 let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2026 let peer1 = ed25519::PrivateKey::from_seed(1);
2027 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2028 let peer2 = ed25519::PrivateKey::from_seed(2);
2029 let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
2030
2031 let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
2033 let (mut network0, mut oracle0) =
2034 Network::new(context.child("peer").with_attribute("index", 0), config0);
2035 let (mut sender0, mut receiver0) =
2036 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2037 network0.start();
2038
2039 let config2 = Config::test(peer2.clone(), socket2, MAX_MESSAGE_SIZE);
2041 let (mut network2, mut oracle2) =
2042 Network::new(context.child("peer").with_attribute("index", 2), config2);
2043 let (_sender2, mut receiver2) =
2044 network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2045 network2.start();
2046
2047 let peer_set: Vec<(_, Address)> = vec![
2049 (peer0.public_key(), Address::Symmetric(socket0)),
2050 (peer1.public_key(), Address::Symmetric(socket1)),
2051 (peer2.public_key(), Address::Symmetric(socket1)),
2052 ];
2053 oracle0.track(0, Map::try_from(peer_set.clone()).unwrap());
2054 oracle2.track(0, Map::try_from(peer_set).unwrap());
2055
2056 context.sleep(Duration::from_secs(30)).await;
2058
2059 loop {
2061 let sent =
2062 sender0.send(Recipients::All, peer2.public_key().as_ref().to_vec(), true);
2063 if sent.len() == 1 {
2064 assert_eq!(sent[0], peer2.public_key());
2065 break;
2066 }
2067 context.sleep(Duration::from_millis(100)).await;
2068 }
2069 let (sender, _) = receiver2.recv().await.unwrap();
2070 assert_eq!(sender, peer0.public_key());
2071
2072 let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
2074 let (mut network1, mut oracle1) =
2075 Network::new(context.child("peer").with_attribute("index", 1), config1);
2076 let (mut sender1, _receiver1) =
2077 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2078 network1.start();
2079
2080 let peer_set1: Vec<(_, Address)> = vec![
2082 (peer0.public_key(), Address::Symmetric(wrong_socket0)),
2083 (peer1.public_key(), Address::Symmetric(socket1)),
2084 (peer2.public_key(), Address::Symmetric(socket1)),
2085 ];
2086 oracle1.track(0, Map::try_from(peer_set1).unwrap());
2087
2088 context.sleep(Duration::from_secs(30)).await;
2090
2091 loop {
2093 let sent =
2094 sender1.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true);
2095 if sent.len() == 2 {
2096 assert!(sent.contains(&peer0.public_key()));
2097 assert!(sent.contains(&peer2.public_key()));
2098 break;
2099 }
2100 context.sleep(Duration::from_millis(100)).await;
2101 }
2102 let mut received0 = false;
2103 while let Ok((sender, _)) = receiver0.recv().await {
2104 if sender == peer1.public_key() {
2106 received0 = true;
2107 break;
2108 }
2109 }
2110 assert!(received0);
2111 let (sender, _) = receiver2.recv().await.unwrap();
2112 assert_eq!(sender, peer1.public_key());
2113 });
2114 }
2115
2116 #[test]
2117 fn test_broadcast_slow_peer_no_blocking() {
2118 let executor = deterministic::Runner::default();
2119 executor.start(|context| async move {
2120 let cfg = RouterConfig {
2121 mailbox_size: NZUsize!(1),
2122 };
2123 let (router, mailbox, messenger) =
2124 RouterActor::<_, ed25519::PublicKey>::new(context.child("router"), cfg);
2125
2126 let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
2127 let _handle = router.start(channels);
2128
2129 let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
2130 let (slow_relay, _slow_receivers) =
2131 Relay::new(context.child("slow_relay"), NZUsize!(10));
2132 assert!(
2133 mailbox.ready(slow_peer, slow_relay).await.is_some(),
2134 "Failed to register slow peer"
2135 );
2136
2137 let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
2138 let (fast_relay, mut fast_receivers) =
2139 Relay::new(context.child("fast_relay"), NZUsize!(100));
2140 assert!(
2141 mailbox.ready(fast_peer, fast_relay).await.is_some(),
2142 "Failed to register fast peer"
2143 );
2144
2145 let message = IoBuf::from(vec![0u8; 100]);
2146 for i in 0..11 {
2147 let sent = messenger.content(Recipients::All, 0, message.clone().into(), false);
2148 assert_ne!(
2149 sent,
2150 Unreliable::new(Feedback::Closed),
2151 "Broadcast {i} should be accepted"
2152 );
2153
2154 assert!(fast_receivers.low.recv().await.is_some());
2155 }
2156 });
2157 }
2158}