1mod actors;
161mod channels;
162mod config;
163mod metrics;
164mod network;
165mod types;
166
167use thiserror::Error;
168
169#[derive(Error, Debug)]
171pub enum Error {
172 #[error("message too large: {0}")]
173 MessageTooLarge(usize),
174 #[error("network closed")]
175 NetworkClosed,
176}
177
178pub use actors::tracker::Oracle;
179pub use channels::{Receiver, Sender};
180pub use config::Config;
181pub use network::Network;
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186 use crate::{
187 Address, AddressableManager, Blocker, Ingress, Provider, Receiver, Recipients, Sender,
188 };
189 use commonware_cryptography::{ed25519, Signer as _};
190 use commonware_macros::{select, test_group, test_traced};
191 use commonware_runtime::{
192 count_running_tasks, deterministic, tokio, BufferPooler, Clock, Metrics,
193 Network as RNetwork, Quota, Resolver, Runner, Spawner,
194 };
195 use commonware_utils::{
196 channel::mpsc,
197 hostname,
198 ordered::{Map, Set},
199 Hostname, TryCollect, NZU32,
200 };
201 use rand_core::{CryptoRngCore, RngCore};
202 use std::{
203 collections::HashSet,
204 net::{IpAddr, Ipv4Addr, SocketAddr},
205 time::Duration,
206 };
207
208 #[derive(Copy, Clone)]
209 enum Mode {
210 All,
211 Some,
212 One,
213 }
214
215 const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
217
218 fn assert_no_rate_limiting(context: &impl Metrics) {
227 let metrics = context.encode();
228 assert!(
229 !metrics.contains("messages_rate_limited_total{"),
230 "no messages should be rate limited: {metrics}"
231 );
232 }
233
234 async fn run_network(
239 context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
240 max_message_size: u32,
241 base_port: u16,
242 n: usize,
243 mode: Mode,
244 ) {
245 let mut peers_and_sks = Vec::new();
247 for i in 0..n {
248 let private_key = ed25519::PrivateKey::from_seed(i as u64);
249 let public_key = private_key.public_key();
250 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
251 peers_and_sks.push((private_key, public_key, address));
252 }
253 let peers = peers_and_sks
254 .iter()
255 .map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
256 .collect::<Vec<_>>();
257
258 let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
260 for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
261 let public_key = public_key.clone();
262
263 let context = context.with_label(&format!("peer_{i}"));
265
266 let config = Config::test(private_key.clone(), *address, max_message_size);
268 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
269
270 oracle.track(0, peers.clone().try_into().unwrap()).await;
272
273 let (mut sender, mut receiver) =
275 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
276
277 network.start();
279
280 context.with_label("agent").spawn({
282 let complete_sender = complete_sender.clone();
283 let peers = peers.clone();
284 move |context| async move {
285 let receiver = context.with_label("receiver").spawn(move |_| async move {
287 let mut received = HashSet::new();
289 while received.len() < n - 1 {
290 let (sender, message) = receiver.recv().await.unwrap();
292 assert_eq!(message, sender.as_ref());
293
294 received.insert(sender);
296 }
297 complete_sender.send(()).await.unwrap();
298
299 loop {
301 receiver.recv().await.unwrap();
302 }
303 });
304
305 let sender = context
307 .with_label("sender")
308 .spawn(move |context| async move {
309 let mut recipients: Vec<_> = peers
311 .iter()
312 .enumerate()
313 .filter(|(j, _)| i != *j)
314 .map(|(_, (pk, _))| pk.clone())
315 .collect();
316 recipients.sort();
317
318 loop {
320 match mode {
321 Mode::One => {
322 for pub_key in &recipients {
323 loop {
325 let sent = sender
326 .send(
327 Recipients::One(pub_key.clone()),
328 public_key.as_ref().to_vec(),
329 true,
330 )
331 .await
332 .unwrap();
333 if sent.len() != 1 {
334 context.sleep(Duration::from_millis(100)).await;
335 continue;
336 }
337 assert_eq!(&sent[0], pub_key);
338 break;
339 }
340 }
341 }
342 Mode::Some | Mode::All => {
343 loop {
345 let mut sent = sender
346 .send(
347 match mode {
348 Mode::Some => {
349 Recipients::Some(recipients.clone())
350 }
351 Mode::All => Recipients::All,
352 _ => unreachable!(),
353 },
354 public_key.as_ref().to_vec(),
355 true,
356 )
357 .await
358 .unwrap();
359 if sent.len() != recipients.len() {
360 context.sleep(Duration::from_millis(100)).await;
361 continue;
362 }
363
364 sent.sort();
366 assert_eq!(sent, recipients);
367 break;
368 }
369 }
370 };
371
372 context.sleep(Duration::from_secs(10)).await;
374 }
375 });
376
377 select! {
379 receiver = receiver => {
380 panic!("receiver exited: {receiver:?}");
381 },
382 sender = sender => {
383 panic!("sender exited: {sender:?}");
384 },
385 }
386 }
387 });
388 }
389
390 for _ in 0..n {
392 complete_receiver.recv().await.unwrap();
393 }
394
395 assert_no_rate_limiting(&context);
397 }
398
399 fn run_deterministic_test(seed: u64, mode: Mode) {
400 const NUM_PEERS: usize = 25;
402 const BASE_PORT: u16 = 3000;
403
404 let executor = deterministic::Runner::seeded(seed);
406 let state = executor.start(|context| async move {
407 run_network(
408 context.clone(),
409 MAX_MESSAGE_SIZE,
410 BASE_PORT,
411 NUM_PEERS,
412 mode,
413 )
414 .await;
415 context.auditor().state()
416 });
417
418 let executor = deterministic::Runner::seeded(seed);
420 let state2 = executor.start(|context| async move {
421 run_network(
422 context.clone(),
423 MAX_MESSAGE_SIZE,
424 BASE_PORT,
425 NUM_PEERS,
426 mode,
427 )
428 .await;
429 context.auditor().state()
430 });
431 assert_eq!(state, state2);
432 }
433
434 #[test_group("slow")]
435 #[test_traced]
436 fn test_determinism_one() {
437 for i in 0..10 {
438 run_deterministic_test(i, Mode::One);
439 }
440 }
441
442 #[test_group("slow")]
443 #[test_traced]
444 fn test_determinism_some() {
445 for i in 0..10 {
446 run_deterministic_test(i, Mode::Some);
447 }
448 }
449
450 #[test_group("slow")]
451 #[test_traced]
452 fn test_determinism_all() {
453 for i in 0..10 {
454 run_deterministic_test(i, Mode::All);
455 }
456 }
457
458 #[test_traced]
459 fn test_tokio_connectivity() {
460 let executor = tokio::Runner::default();
461 executor.start(|context| async move {
462 let base_port = 4000;
463 let n = 10;
464 run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
465 });
466 }
467
468 #[test_traced]
469 fn test_multi_index_oracle() {
470 let base_port = 3000;
472 let n: usize = 10;
473
474 let executor = deterministic::Runner::default();
476 executor.start(|context| async move {
477 let mut peers_and_sks = Vec::new();
479 for i in 0..n {
480 let sk = ed25519::PrivateKey::from_seed(i as u64);
481 let pk = sk.public_key();
482 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
483 peers_and_sks.push((sk, pk, addr));
484 }
485 let peers = peers_and_sks
486 .iter()
487 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
488 .collect::<Vec<_>>();
489
490 let mut waiters = Vec::new();
492 for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
493 let context = context.with_label(&format!("peer_{i}"));
495
496 let config = Config::test(
498 peer_sk.clone(),
499 *peer_addr,
500 1_024 * 1_024, );
502 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
503
504 oracle
506 .track(0, [peers[0].clone()].try_into().unwrap())
507 .await;
508 oracle
509 .track(1, [peers[1].clone(), peers[2].clone()].try_into().unwrap())
510 .await;
511 oracle
512 .track(2, peers.iter().skip(2).cloned().try_collect().unwrap())
513 .await;
514
515 let (mut sender, mut receiver) =
517 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
518
519 network.start();
521
522 let msg = peer_pk.clone();
524 let handler = context
525 .with_label("agent")
526 .spawn(move |context| async move {
527 if i == 0 {
528 loop {
530 if sender
531 .send(Recipients::All, msg.as_ref().to_vec(), true)
532 .await
533 .unwrap()
534 .len()
535 == n - 1
536 {
537 break;
538 }
539
540 context.sleep(Duration::from_millis(100)).await;
542 }
543 } else {
544 let (sender, message) = receiver.recv().await.unwrap();
546 assert_eq!(message, sender.as_ref());
547 }
548 });
549
550 waiters.push(handler);
552 }
553
554 for waiter in waiters.into_iter().rev() {
556 waiter.await.unwrap();
557 }
558
559 assert_no_rate_limiting(&context);
561 });
562 }
563
564 #[test_traced]
565 fn test_message_too_large() {
566 let base_port = 3000;
568 let n: usize = 2;
569
570 let executor = deterministic::Runner::seeded(0);
572 executor.start(|mut context| async move {
573 let mut peers_and_sks = Vec::new();
575 for i in 0..n {
576 let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
577 let peer_pk = peer_sk.public_key();
578 let peer_addr =
579 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
580 peers_and_sks.push((peer_sk, peer_pk, peer_addr));
581 }
582 let peers: Map<_, _> = peers_and_sks
583 .iter()
584 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
585 .try_collect()
586 .unwrap();
587
588 let (sk, _, addr) = peers_and_sks[0].clone();
590 let config = Config::test(
591 sk,
592 addr,
593 1_024 * 1_024, );
595 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
596
597 oracle.track(0, peers.clone()).await;
599
600 let (mut sender, _) =
602 network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
603
604 network.start();
606
607 let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
610
611 let recipient = Recipients::One(peers[1].clone());
613 let result = sender.send(recipient, msg, true).await;
614 assert!(matches!(result, Err(Error::MessageTooLarge(_))));
615 });
616 }
617
618 #[test_traced]
619 fn test_rate_limiting() {
620 let base_port = 3000;
622 let n: usize = 2;
623
624 let executor = deterministic::Runner::seeded(0);
626 executor.start(|context| async move {
627 let mut peers_and_sks = Vec::new();
629 for i in 0..n {
630 let sk = ed25519::PrivateKey::from_seed(i as u64);
631 let pk = sk.public_key();
632 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
633 peers_and_sks.push((sk, pk, addr));
634 }
635 let peers: Map<_, _> = peers_and_sks
636 .iter()
637 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
638 .try_collect()
639 .unwrap();
640 let (sk0, _, addr0) = peers_and_sks[0].clone();
641 let (sk1, pk1, addr1) = peers_and_sks[1].clone();
642
643 let config0 = Config::test(sk0, addr0, 1_024 * 1_024); let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
646 oracle0.track(0, peers.clone()).await;
647 let (mut sender0, _receiver0) =
648 network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
649 network0.start();
650
651 let config1 = Config::test(sk1, addr1, 1_024 * 1_024); let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
654 oracle1.track(0, peers.clone()).await;
655 let (_sender1, _receiver1) =
656 network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
657 network1.start();
658
659 let msg = vec![0u8; 1024]; loop {
662 let sent = sender0
664 .send(Recipients::One(pk1.clone()), msg.clone(), true)
665 .await
666 .unwrap();
667 if !sent.is_empty() {
668 break;
669 }
670
671 context.sleep(Duration::from_mins(1)).await
674 }
675
676 let sent = sender0.send(Recipients::One(pk1), msg, true).await.unwrap();
680 assert!(sent.is_empty());
681
682 for _ in 0..10 {
684 assert_no_rate_limiting(&context);
685 context.sleep(Duration::from_millis(100)).await;
686 }
687 });
688 }
689
690 #[test_traced]
691 fn test_unordered_peer_sets() {
692 let (n, base_port) = (10, 3000);
693 let executor = deterministic::Runner::default();
694 executor.start(|context| async move {
695 let mut peers_and_sks = Vec::new();
697 for i in 0..n {
698 let sk = ed25519::PrivateKey::from_seed(i as u64);
699 let pk = sk.public_key();
700 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
701 peers_and_sks.push((sk, pk, addr));
702 }
703 let peer0 = peers_and_sks[0].clone();
704 let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
705 let (network, mut oracle) = Network::new(context.with_label("network"), config);
706 network.start();
707
708 let mut subscription = oracle.subscribe().await;
710
711 let set10: Map<_, _> = peers_and_sks
713 .iter()
714 .take(2)
715 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
716 .try_collect()
717 .unwrap();
718 oracle.track(10, set10.clone()).await;
719 let (id, new, all) = subscription.recv().await.unwrap();
720 assert_eq!(id, 10);
721 assert_eq!(&new, set10.keys());
722 assert_eq!(&all, set10.keys());
723
724 let set9: Map<_, _> = peers_and_sks
726 .iter()
727 .skip(2)
728 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
729 .try_collect()
730 .unwrap();
731 oracle.track(9, set9.clone()).await;
732
733 let set11: Map<_, _> = peers_and_sks
735 .iter()
736 .skip(4)
737 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
738 .try_collect()
739 .unwrap();
740 oracle.track(11, set11.clone()).await;
741 let (id, new, all) = subscription.recv().await.unwrap();
742 assert_eq!(id, 11);
743 assert_eq!(&new, set11.keys());
744 let all_keys: Set<_> = set10
745 .into_keys()
746 .into_iter()
747 .chain(set11.into_keys().into_iter())
748 .try_collect()
749 .unwrap();
750 assert_eq!(all, all_keys);
751 });
752 }
753
754 #[test_traced]
755 fn test_graceful_shutdown() {
756 let base_port = 3000;
757 let n: usize = 5;
758
759 let executor = deterministic::Runner::default();
760 executor.start(|context| async move {
761 let mut peers_and_sks = Vec::new();
763 for i in 0..n {
764 let sk = ed25519::PrivateKey::from_seed(i as u64);
765 let pk = sk.public_key();
766 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
767 peers_and_sks.push((sk, pk, addr));
768 }
769 let peers: Map<_, _> = peers_and_sks
770 .iter()
771 .map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
772 .try_collect()
773 .unwrap();
774
775 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
777 for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
778 let peer_context = context.with_label(&format!("peer_{i}"));
779 let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
780 let (mut network, mut oracle) =
781 Network::new(peer_context.with_label("network"), config);
782
783 oracle.track(0, peers.clone()).await;
785
786 let (mut sender, mut receiver) =
787 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
788 network.start();
789
790 peer_context.with_label("agent").spawn({
791 let complete_sender = complete_sender.clone();
792 let pk = pk.clone();
793 move |context| async move {
794 let expected_connections = if i == 0 { n - 1 } else { 1 };
796
797 loop {
799 let sent = sender
800 .send(Recipients::All, pk.as_ref().to_vec(), true)
801 .await
802 .unwrap();
803 if sent.len() >= expected_connections {
804 break;
805 }
806 context.sleep(Duration::from_millis(100)).await;
807 }
808
809 complete_sender.send(()).await.unwrap();
811
812 loop {
814 select! {
815 result = receiver.recv() => {
816 if result.is_err() {
817 break;
819 }
820 },
821 _ = context.stopped() => {
822 break;
824 },
825 }
826 }
827 }
828 });
829 }
830
831 for _ in 0..n {
833 complete_receiver.recv().await.unwrap();
834 }
835
836 let metrics_before = context.encode();
838 let is_running = |name: &str| -> bool {
839 metrics_before.lines().any(|line| {
840 line.starts_with("runtime_tasks_running{")
841 && line.contains(&format!("name=\"{name}\""))
842 && line.contains("kind=\"Task\"")
843 && line.trim_end().ends_with(" 1")
844 })
845 };
846 for i in 0..n {
847 let prefix = format!("peer_{i}_network");
848 assert!(
849 is_running(&format!("{prefix}_tracker")),
850 "peer_{i} tracker should be running"
851 );
852 assert!(
853 is_running(&format!("{prefix}_router")),
854 "peer_{i} router should be running"
855 );
856 assert!(
857 is_running(&format!("{prefix}_spawner")),
858 "peer_{i} spawner should be running"
859 );
860 assert!(
861 is_running(&format!("{prefix}_listener")),
862 "peer_{i} listener should be running"
863 );
864 assert!(
865 is_running(&format!("{prefix}_dialer")),
866 "peer_{i} dialer should be running"
867 );
868 }
869
870 let shutdown_context = context.clone();
872 context.with_label("shutdown").spawn(move |_| async move {
873 let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
875
876 assert!(
878 result.is_ok(),
879 "graceful shutdown should complete: {result:?}"
880 );
881 });
882
883 context.stopped().await.unwrap();
885
886 context.sleep(Duration::from_millis(100)).await;
888
889 let metrics_after = context.encode();
891 let is_stopped = |name: &str| -> bool {
892 metrics_after.lines().any(|line| {
893 line.starts_with("runtime_tasks_running{")
894 && line.contains(&format!("name=\"{name}\""))
895 && line.contains("kind=\"Task\"")
896 && line.trim_end().ends_with(" 0")
897 })
898 };
899 for i in 0..n {
900 let prefix = format!("peer_{i}_network");
901 assert!(
902 is_stopped(&format!("{prefix}_tracker")),
903 "peer_{i} tracker should be stopped"
904 );
905 assert!(
906 is_stopped(&format!("{prefix}_router")),
907 "peer_{i} router should be stopped"
908 );
909 assert!(
910 is_stopped(&format!("{prefix}_spawner")),
911 "peer_{i} spawner should be stopped"
912 );
913 assert!(
914 is_stopped(&format!("{prefix}_listener")),
915 "peer_{i} listener should be stopped"
916 );
917 assert!(
918 is_stopped(&format!("{prefix}_dialer")),
919 "peer_{i} dialer should be stopped"
920 );
921 }
922 });
923 }
924
925 #[test_traced]
926 fn test_subscription_includes_self_when_registered() {
927 let base_port = 3000;
928 let executor = deterministic::Runner::default();
929 executor.start(|context| async move {
930 let self_sk = ed25519::PrivateKey::from_seed(0);
932 let self_pk = self_sk.public_key();
933 let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
934
935 let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
936 let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
937
938 let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
940 let (network, mut oracle) = Network::new(context.with_label("network"), config);
941 network.start();
942
943 let mut subscription = oracle.subscribe().await;
945
946 let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
948 oracle.track(1, peer_set.clone()).await;
949
950 let (id, new, all) = subscription.recv().await.unwrap();
952 assert_eq!(id, 1);
953 assert_eq!(new.len(), 1);
954 assert_eq!(all.len(), 1);
955
956 assert!(
958 new.position(&self_pk).is_none(),
959 "new set should not include self"
960 );
961 assert!(
962 new.position(&other_pk).is_some(),
963 "new set should include other"
964 );
965
966 assert!(
968 all.position(&self_pk).is_none(),
969 "tracked peers should not include self"
970 );
971 assert!(
972 all.position(&other_pk).is_some(),
973 "tracked peers should include other"
974 );
975
976 let peer_set: Map<_, _> = [
978 (self_pk.clone(), self_addr.into()),
979 (other_pk.clone(), other_addr.into()),
980 ]
981 .try_into()
982 .unwrap();
983 oracle.track(2, peer_set.clone()).await;
984
985 let (id, new, all) = subscription.recv().await.unwrap();
987 assert_eq!(id, 2);
988 assert_eq!(new.len(), 2);
989 assert_eq!(all.len(), 2);
990
991 assert!(
993 new.position(&self_pk).is_some(),
994 "new set should include self"
995 );
996 assert!(
997 new.position(&other_pk).is_some(),
998 "new set should include other"
999 );
1000
1001 assert!(
1003 all.position(&self_pk).is_some(),
1004 "tracked peers should include self"
1005 );
1006 assert!(
1007 all.position(&other_pk).is_some(),
1008 "tracked peers should include other"
1009 );
1010 });
1011 }
1012
1013 #[test_traced]
1014 fn test_dns_peer_addresses() {
1015 let base_port = 3200;
1016 let n: usize = 3;
1017
1018 let executor = deterministic::Runner::default();
1019 executor.start(|context| async move {
1020 let mut peers_and_sks = Vec::new();
1022 for i in 0..n {
1023 let private_key = ed25519::PrivateKey::from_seed(i as u64);
1024 let public_key = private_key.public_key();
1025 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1026 let host_str = format!("peer-{i}.local");
1027 let host = Hostname::new(&host_str).unwrap();
1028 peers_and_sks.push((private_key, public_key, socket, host_str, host));
1029 }
1030
1031 for (_, _, socket, host_str, _) in &peers_and_sks {
1033 context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
1034 }
1035
1036 let peers: Vec<(_, Address)> = peers_and_sks
1038 .iter()
1039 .map(|(_, pk, socket, _, host)| {
1040 (
1041 pk.clone(),
1042 Address::Asymmetric {
1043 ingress: Ingress::Dns {
1044 host: host.clone(),
1045 port: socket.port(),
1046 },
1047 egress: *socket,
1048 },
1049 )
1050 })
1051 .collect();
1052
1053 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1055 for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
1056 let context = context.with_label(&format!("peer_{i}"));
1057
1058 let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1060 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1061
1062 oracle.track(0, peers.clone().try_into().unwrap()).await;
1064
1065 let (mut sender, mut receiver) =
1067 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1068
1069 network.start();
1070
1071 let pk = public_key.clone();
1073 context.with_label("agent").spawn({
1074 let complete_sender = complete_sender.clone();
1075 let peers = peers.clone();
1076 move |context| async move {
1077 let receiver = context.with_label("receiver").spawn(move |_| async move {
1079 let mut received = HashSet::new();
1080 while received.len() < n - 1 {
1081 let (sender, message) = receiver.recv().await.unwrap();
1082 assert_eq!(message, sender.as_ref());
1083 received.insert(sender);
1084 }
1085 complete_sender.send(()).await.unwrap();
1086
1087 loop {
1088 receiver.recv().await.unwrap();
1089 }
1090 });
1091
1092 let sender_task =
1094 context
1095 .with_label("sender")
1096 .spawn(move |context| async move {
1097 loop {
1098 let mut recipients: Vec<_> = peers
1099 .iter()
1100 .filter(|(p, _)| p != &pk)
1101 .map(|(p, _)| p.clone())
1102 .collect();
1103 recipients.sort();
1104
1105 loop {
1106 let mut sent = sender
1107 .send(Recipients::All, pk.as_ref().to_vec(), true)
1108 .await
1109 .unwrap();
1110 if sent.len() != n - 1 {
1111 context.sleep(Duration::from_millis(100)).await;
1112 continue;
1113 }
1114 sent.sort();
1115 assert_eq!(sent, recipients);
1116 break;
1117 }
1118
1119 context.sleep(Duration::from_secs(10)).await;
1120 }
1121 });
1122
1123 select! {
1124 receiver = receiver => {
1125 panic!("receiver exited: {receiver:?}")
1126 },
1127 sender = sender_task => {
1128 panic!("sender exited: {sender:?}")
1129 },
1130 }
1131 }
1132 });
1133 }
1134
1135 for _ in 0..n {
1137 complete_receiver.recv().await.unwrap();
1138 }
1139
1140 assert_no_rate_limiting(&context);
1141 });
1142 }
1143
1144 #[test_traced]
1145 fn test_mixed_socket_and_dns_addresses() {
1146 let base_port = 3300;
1147 let n: usize = 4;
1148
1149 let executor = deterministic::Runner::default();
1150 executor.start(|context| async move {
1151 let mut peers_and_sks = Vec::new();
1153 for i in 0..n {
1154 let private_key = ed25519::PrivateKey::from_seed(i as u64);
1155 let public_key = private_key.public_key();
1156 let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
1157 peers_and_sks.push((private_key, public_key, socket));
1158 }
1159
1160 for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
1162 context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
1163 }
1164
1165 let peers: Vec<(_, Address)> = peers_and_sks
1167 .iter()
1168 .enumerate()
1169 .map(|(i, (_, pk, socket))| {
1170 let addr = if i < 2 {
1171 Address::Symmetric(*socket)
1173 } else {
1174 Address::Asymmetric {
1176 ingress: Ingress::Dns {
1177 host: hostname!(&format!("peer-{i}.local")),
1178 port: socket.port(),
1179 },
1180 egress: *socket,
1181 }
1182 };
1183 (pk.clone(), addr)
1184 })
1185 .collect();
1186
1187 let (complete_sender, mut complete_receiver) = mpsc::channel(n);
1189 for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
1190 let context = context.with_label(&format!("peer_{i}"));
1191
1192 let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
1194 let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
1195
1196 oracle.track(0, peers.clone().try_into().unwrap()).await;
1198
1199 let (mut sender, mut receiver) =
1201 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1202
1203 network.start();
1204
1205 let pk = public_key.clone();
1207 context.with_label("agent").spawn({
1208 let complete_sender = complete_sender.clone();
1209 let peers = peers.clone();
1210 move |context| async move {
1211 let receiver = context.with_label("receiver").spawn(move |_| async move {
1213 let mut received = HashSet::new();
1214 while received.len() < n - 1 {
1215 let (sender, message) = receiver.recv().await.unwrap();
1216 assert_eq!(message, sender.as_ref());
1217 received.insert(sender);
1218 }
1219 complete_sender.send(()).await.unwrap();
1220
1221 loop {
1222 receiver.recv().await.unwrap();
1223 }
1224 });
1225
1226 let sender_task =
1228 context
1229 .with_label("sender")
1230 .spawn(move |context| async move {
1231 loop {
1232 let mut recipients: Vec<_> = peers
1233 .iter()
1234 .filter(|(p, _)| p != &pk)
1235 .map(|(p, _)| p.clone())
1236 .collect();
1237 recipients.sort();
1238
1239 loop {
1240 let mut sent = sender
1241 .send(Recipients::All, pk.as_ref().to_vec(), true)
1242 .await
1243 .unwrap();
1244 if sent.len() != n - 1 {
1245 context.sleep(Duration::from_millis(100)).await;
1246 continue;
1247 }
1248 sent.sort();
1249 assert_eq!(sent, recipients);
1250 break;
1251 }
1252
1253 context.sleep(Duration::from_secs(10)).await;
1254 }
1255 });
1256
1257 select! {
1258 receiver = receiver => {
1259 panic!("receiver exited: {receiver:?}")
1260 },
1261 sender = sender_task => {
1262 panic!("sender exited: {sender:?}")
1263 },
1264 }
1265 }
1266 });
1267 }
1268
1269 for _ in 0..n {
1271 complete_receiver.recv().await.unwrap();
1272 }
1273
1274 assert_no_rate_limiting(&context);
1275 });
1276 }
1277
1278 #[test_traced]
1279 fn test_dns_resolving_to_private_ip_not_dialed() {
1280 let base_port = 4400;
1283 let executor = deterministic::Runner::timed(Duration::from_secs(10));
1284 executor.start(|context| async move {
1285 let peer0 = ed25519::PrivateKey::from_seed(0);
1286 let peer1 = ed25519::PrivateKey::from_seed(1);
1287
1288 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1289 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1290
1291 context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
1293
1294 let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1296 config0.allow_private_ips = true;
1297 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1298
1299 let peers0: Vec<(_, Address)> = vec![
1301 (peer0.public_key(), Address::Symmetric(socket0)),
1302 (peer1.public_key(), Address::Symmetric(socket1)),
1303 ];
1304 oracle0.track(0, peers0.try_into().unwrap()).await;
1305
1306 let (_sender0, mut receiver0) =
1307 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1308 network0.start();
1309
1310 let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1312 config1.allow_private_ips = false; let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
1314
1315 let peers1: Vec<(_, Address)> = vec![
1317 (
1318 peer0.public_key(),
1319 Address::Asymmetric {
1320 ingress: Ingress::Dns {
1321 host: hostname!("peer-0.local"),
1322 port: socket0.port(),
1323 },
1324 egress: socket0,
1325 },
1326 ),
1327 (peer1.public_key(), Address::Symmetric(socket1)),
1328 ];
1329 oracle1.track(0, peers1.try_into().unwrap()).await;
1330
1331 let (mut sender1, _receiver1) =
1332 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1333 network1.start();
1334
1335 context.sleep(Duration::from_secs(5)).await;
1337
1338 let sent = sender1
1340 .send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
1341 .await
1342 .unwrap();
1343 assert!(
1344 sent.is_empty(),
1345 "peer 1 should not have connected to peer 0 (private IP)"
1346 );
1347
1348 select! {
1350 msg = receiver0.recv() => {
1351 panic!("peer 0 should not have received any message, got: {msg:?}");
1352 },
1353 _ = context.sleep(Duration::from_secs(1)) => {
1354 },
1356 }
1357 });
1358 }
1359
1360 #[test_traced]
1361 fn test_dns_mixed_ips_connectivity() {
1362 for seed in 0..25 {
1368 let base_port = 3500;
1369
1370 let cfg = deterministic::Config::default()
1371 .with_seed(seed)
1372 .with_timeout(Some(Duration::from_secs(120)));
1373 let executor = deterministic::Runner::new(cfg);
1374 executor.start(|context| async move {
1375 let peer0 = ed25519::PrivateKey::from_seed(0);
1376 let peer1 = ed25519::PrivateKey::from_seed(1);
1377
1378 let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
1379 let socket0 = SocketAddr::new(good_ip, base_port);
1380 let socket1 = SocketAddr::new(good_ip, base_port + 1);
1381
1382 let mut all_ips0: Vec<IpAddr> = (1..=3)
1384 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
1385 .collect();
1386 all_ips0.push(good_ip);
1387 context.resolver_register("peer-0.local", Some(all_ips0));
1388
1389 let mut all_ips1: Vec<IpAddr> = (1..=3)
1390 .map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
1391 .collect();
1392 all_ips1.push(good_ip);
1393 context.resolver_register("peer-1.local", Some(all_ips1));
1394
1395 let peers: Vec<(_, Address)> = vec![
1397 (
1398 peer0.public_key(),
1399 Address::Asymmetric {
1400 ingress: Ingress::Dns {
1401 host: hostname!("peer-0.local"),
1402 port: base_port,
1403 },
1404 egress: socket0,
1405 },
1406 ),
1407 (
1408 peer1.public_key(),
1409 Address::Asymmetric {
1410 ingress: Ingress::Dns {
1411 host: hostname!("peer-1.local"),
1412 port: base_port + 1,
1413 },
1414 egress: socket1,
1415 },
1416 ),
1417 ];
1418
1419 let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
1421 let (mut network0, mut oracle0) =
1422 Network::new(context.with_label("peer_0"), config0);
1423 oracle0.track(0, peers.clone().try_into().unwrap()).await;
1424 let (_sender0, mut receiver0) =
1425 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1426 network0.start();
1427
1428 let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
1430 let (mut network1, mut oracle1) =
1431 Network::new(context.with_label("peer_1"), config1);
1432 oracle1.track(0, peers.clone().try_into().unwrap()).await;
1433 let (mut sender1, _receiver1) =
1434 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1435 network1.start();
1436
1437 let pk0 = peer0.public_key();
1439 loop {
1440 let sent = sender1
1441 .send(
1442 Recipients::One(pk0.clone()),
1443 peer1.public_key().as_ref().to_vec(),
1444 true,
1445 )
1446 .await
1447 .unwrap();
1448 if !sent.is_empty() {
1449 break;
1450 }
1451 context.sleep(Duration::from_millis(100)).await;
1452 }
1453
1454 let (sender, msg) = receiver0.recv().await.unwrap();
1456 assert_eq!(sender, peer1.public_key());
1457 assert_eq!(msg, peer1.public_key().as_ref());
1458 });
1459 }
1460 }
1461
1462 #[test_traced]
1463 fn test_many_peer_restart_with_new_address() {
1464 let base_port = 9500;
1465 let n = 5;
1466
1467 let executor = deterministic::Runner::default();
1468 executor.start(|context| async move {
1469 let peers: Vec<_> = (0..n)
1471 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1472 .collect();
1473 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1474
1475 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1477 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1478 let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1479 let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1480 (0..n).map(|_| None).collect();
1481
1482 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1484
1485 let peer_set: Vec<(_, Address)> = addresses
1487 .iter()
1488 .enumerate()
1489 .map(|(i, pk)| {
1490 (
1491 pk.clone(),
1492 Address::Symmetric(SocketAddr::new(
1493 IpAddr::V4(Ipv4Addr::LOCALHOST),
1494 ports[i],
1495 )),
1496 )
1497 })
1498 .collect();
1499
1500 for (i, peer) in peers.iter().enumerate() {
1502 let peer_context = context.with_label(&format!("peer_{i}"));
1503
1504 let config = Config::test(
1505 peer.clone(),
1506 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1507 MAX_MESSAGE_SIZE,
1508 );
1509 let (mut network, mut oracle) =
1510 Network::new(peer_context.with_label("network"), config);
1511
1512 oracle.track(0, peer_set.clone().try_into().unwrap()).await;
1514
1515 let (sender, receiver) =
1516 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1517 senders[i] = Some(sender);
1518 receivers[i] = Some(receiver);
1519 oracles[i] = Some(oracle);
1520
1521 let handle = network.start();
1522 handles[i] = Some(handle);
1523 }
1524
1525 for (i, sender) in senders.iter_mut().enumerate() {
1527 let sender = sender.as_mut().unwrap();
1528 loop {
1529 let sent = sender
1530 .send(
1531 Recipients::All,
1532 peers[i].public_key().as_ref().to_vec(),
1533 true,
1534 )
1535 .await
1536 .unwrap();
1537 if sent.len() == n - 1 {
1538 break;
1539 }
1540 context.sleep(Duration::from_millis(100)).await;
1541 }
1542 }
1543
1544 for receiver in receivers.iter_mut() {
1546 let receiver = receiver.as_mut().unwrap();
1547 let mut received = HashSet::new();
1548 while received.len() < n - 1 {
1549 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1550 assert_eq!(message, sender.as_ref());
1551 received.insert(sender);
1552 }
1553 }
1554
1555 let mut restart_counter = 0u16;
1557 for round in 0..3 {
1558 for restart_peer_idx in 1..n {
1559 restart_counter += 1;
1561 let new_port = base_port + 100 + restart_counter;
1562 ports[restart_peer_idx] = new_port;
1563
1564 if let Some(handle) = handles[restart_peer_idx].take() {
1566 handle.abort();
1567 }
1568 senders[restart_peer_idx] = None;
1569 receivers[restart_peer_idx] = None;
1570 oracles[restart_peer_idx] = None;
1571
1572 let updated_peer_set: Vec<(_, Address)> = addresses
1574 .iter()
1575 .enumerate()
1576 .map(|(i, pk)| {
1577 (
1578 pk.clone(),
1579 Address::Symmetric(SocketAddr::new(
1580 IpAddr::V4(Ipv4Addr::LOCALHOST),
1581 ports[i],
1582 )),
1583 )
1584 })
1585 .collect();
1586
1587 for oracle in oracles.iter_mut().flatten() {
1589 oracle
1590 .track(
1591 (round * (n - 1) + restart_peer_idx) as u64,
1592 updated_peer_set.clone().try_into().unwrap(),
1593 )
1594 .await;
1595 }
1596
1597 let peer_context =
1599 context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
1600 let config = Config::test(
1601 peers[restart_peer_idx].clone(),
1602 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
1603 MAX_MESSAGE_SIZE,
1604 );
1605 let (mut network, mut oracle) =
1606 Network::new(peer_context.with_label("network"), config);
1607
1608 oracle
1609 .track(
1610 (round * (n - 1) + restart_peer_idx) as u64,
1611 updated_peer_set.clone().try_into().unwrap(),
1612 )
1613 .await;
1614
1615 let (sender, receiver) = network.register(
1616 0,
1617 Quota::per_second(NZU32!(100)),
1618 DEFAULT_MESSAGE_BACKLOG,
1619 );
1620 senders[restart_peer_idx] = Some(sender);
1621 receivers[restart_peer_idx] = Some(receiver);
1622 oracles[restart_peer_idx] = Some(oracle);
1623
1624 let handle = network.start();
1625 handles[restart_peer_idx] = Some(handle);
1626
1627 let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
1629 loop {
1630 let sent = restarted_sender
1631 .send(
1632 Recipients::All,
1633 peers[restart_peer_idx].public_key().as_ref().to_vec(),
1634 true,
1635 )
1636 .await
1637 .unwrap();
1638 if sent.len() == n - 1 {
1639 break;
1640 }
1641 context.sleep(Duration::from_millis(100)).await;
1642 }
1643
1644 for i in 0..n {
1646 if i == restart_peer_idx {
1647 continue;
1648 }
1649 let sender = senders[i].as_mut().unwrap();
1650 loop {
1651 let sent = sender
1652 .send(
1653 Recipients::One(addresses[restart_peer_idx].clone()),
1654 peers[i].public_key().as_ref().to_vec(),
1655 true,
1656 )
1657 .await
1658 .unwrap();
1659 if sent.len() == 1 {
1660 break;
1661 }
1662 context.sleep(Duration::from_millis(100)).await;
1663 }
1664 }
1665
1666 let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
1668 let mut received = HashSet::new();
1669 while received.len() < n - 1 {
1670 let (sender, message): (ed25519::PublicKey, _) =
1671 restarted_receiver.recv().await.unwrap();
1672 assert_eq!(message, sender.as_ref());
1673 received.insert(sender);
1674 }
1675 }
1676 }
1677
1678 assert_no_rate_limiting(&context);
1679 });
1680 }
1681
1682 #[test_traced]
1683 fn test_simultaneous_peer_restart() {
1684 let base_port = 9700;
1685 let n = 5;
1686
1687 let executor = deterministic::Runner::default();
1688 executor.start(|context| async move {
1689 let peers: Vec<_> = (0..n)
1691 .map(|i| ed25519::PrivateKey::from_seed(i as u64))
1692 .collect();
1693 let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
1694
1695 let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
1697
1698 let peer_set: Vec<(_, Address)> = addresses
1700 .iter()
1701 .enumerate()
1702 .map(|(i, pk)| {
1703 (
1704 pk.clone(),
1705 Address::Symmetric(SocketAddr::new(
1706 IpAddr::V4(Ipv4Addr::LOCALHOST),
1707 ports[i],
1708 )),
1709 )
1710 })
1711 .collect();
1712
1713 let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
1715 let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
1716 let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
1717 let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
1718 (0..n).map(|_| None).collect();
1719
1720 for (i, peer) in peers.iter().enumerate() {
1722 let peer_context = context.with_label(&format!("peer_{i}"));
1723
1724 let config = Config::test(
1725 peer.clone(),
1726 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
1727 MAX_MESSAGE_SIZE,
1728 );
1729 let (mut network, mut oracle) =
1730 Network::new(peer_context.with_label("network"), config);
1731
1732 oracle.track(0, peer_set.clone().try_into().unwrap()).await;
1734
1735 let (sender, receiver) =
1736 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1737 senders[i] = Some(sender);
1738 receivers[i] = Some(receiver);
1739 oracles[i] = Some(oracle);
1740
1741 let handle = network.start();
1742 handles[i] = Some(handle);
1743 }
1744
1745 for (i, sender) in senders.iter_mut().enumerate() {
1747 let sender = sender.as_mut().unwrap();
1748 loop {
1749 let sent = sender
1750 .send(
1751 Recipients::All,
1752 peers[i].public_key().as_ref().to_vec(),
1753 true,
1754 )
1755 .await
1756 .unwrap();
1757 if sent.len() == n - 1 {
1758 break;
1759 }
1760 context.sleep(Duration::from_millis(100)).await;
1761 }
1762 }
1763
1764 for receiver in receivers.iter_mut() {
1766 let receiver = receiver.as_mut().unwrap();
1767 let mut received = HashSet::new();
1768 while received.len() < n - 1 {
1769 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1770 assert_eq!(message, sender.as_ref());
1771 received.insert(sender);
1772 }
1773 }
1774
1775 let restart_peers: Vec<usize> = (1..n).collect();
1780 for &idx in &restart_peers {
1781 if let Some(handle) = handles[idx].take() {
1782 handle.abort();
1783 }
1784 senders[idx] = None;
1785 receivers[idx] = None;
1786 oracles[idx] = None;
1787 ports[idx] = base_port + 100 + idx as u16;
1788 }
1789
1790 context.sleep(Duration::from_secs(2)).await;
1792
1793 let updated_peer_set: Vec<(_, Address)> = addresses
1795 .iter()
1796 .enumerate()
1797 .map(|(i, pk)| {
1798 (
1799 pk.clone(),
1800 Address::Symmetric(SocketAddr::new(
1801 IpAddr::V4(Ipv4Addr::LOCALHOST),
1802 ports[i],
1803 )),
1804 )
1805 })
1806 .collect();
1807
1808 oracles[0]
1810 .as_mut()
1811 .unwrap()
1812 .track(1, updated_peer_set.clone().try_into().unwrap())
1813 .await;
1814
1815 for &idx in &restart_peers {
1817 let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
1818 let config = Config::test(
1819 peers[idx].clone(),
1820 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
1821 MAX_MESSAGE_SIZE,
1822 );
1823 let (mut network, mut oracle) =
1824 Network::new(peer_context.with_label("network"), config);
1825
1826 oracle
1827 .track(1, updated_peer_set.clone().try_into().unwrap())
1828 .await;
1829
1830 let (sender, receiver) =
1831 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1832 senders[idx] = Some(sender);
1833 receivers[idx] = Some(receiver);
1834 oracles[idx] = Some(oracle);
1835
1836 let handle = network.start();
1837 handles[idx] = Some(handle);
1838 }
1839
1840 for (i, sender) in senders.iter_mut().enumerate() {
1842 let sender = sender.as_mut().unwrap();
1843 loop {
1844 let sent = sender
1845 .send(
1846 Recipients::All,
1847 peers[i].public_key().as_ref().to_vec(),
1848 true,
1849 )
1850 .await
1851 .unwrap();
1852 if sent.len() == n - 1 {
1853 break;
1854 }
1855 context.sleep(Duration::from_millis(100)).await;
1856 }
1857 }
1858
1859 for receiver in receivers.iter_mut() {
1861 let receiver = receiver.as_mut().unwrap();
1862 let mut received = HashSet::new();
1863 while received.len() < n - 1 {
1864 let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
1865 assert_eq!(message, sender.as_ref());
1866 received.insert(sender);
1867 }
1868 }
1869
1870 assert_no_rate_limiting(&context);
1871 });
1872 }
1873 #[test_traced]
1874 fn test_operations_after_shutdown_do_not_panic() {
1875 let executor = deterministic::Runner::default();
1876 executor.start(|context| async move {
1877 let peer = ed25519::PrivateKey::from_seed(0);
1878 let address = peer.public_key();
1879
1880 let peer_context = context.with_label("peer");
1881 let config = Config::test(
1882 peer.clone(),
1883 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1884 MAX_MESSAGE_SIZE,
1885 );
1886 let (mut network, mut oracle) =
1887 Network::new(peer_context.with_label("network"), config);
1888
1889 let (mut sender, _receiver) =
1891 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1892 let peer_addr =
1893 Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1894 let peers: Map<ed25519::PublicKey, Address> =
1895 vec![(address.clone(), peer_addr)].try_into().unwrap();
1896 oracle.track(0, peers.clone()).await;
1897
1898 let handle = network.start();
1900 handle.abort();
1901
1902 context.sleep(Duration::from_millis(100)).await;
1904
1905 oracle.track(1, peers.clone()).await;
1907 let _ = oracle.peer_set(0).await;
1908 let _ = oracle.subscribe().await;
1909 oracle.block(address.clone()).await;
1910
1911 let sent = sender
1913 .send(Recipients::All, address.as_ref().to_vec(), true)
1914 .await
1915 .unwrap();
1916 assert!(sent.is_empty());
1917 });
1918 }
1919
1920 fn clean_shutdown(seed: u64) {
1921 let cfg = deterministic::Config::default()
1922 .with_seed(seed)
1923 .with_timeout(Some(Duration::from_secs(30)));
1924 let executor = deterministic::Runner::new(cfg);
1925 executor.start(|context| async move {
1926 let peer = ed25519::PrivateKey::from_seed(0);
1927
1928 let peer_context = context.with_label("peer");
1929 let config = Config::test(
1930 peer.clone(),
1931 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
1932 MAX_MESSAGE_SIZE,
1933 );
1934 let (mut network, mut oracle) =
1935 Network::new(peer_context.with_label("network"), config);
1936
1937 let (_, _) =
1939 network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1940 let peer_addr =
1941 Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
1942 let peers: Map<ed25519::PublicKey, Address> =
1943 vec![(peer.public_key(), peer_addr)].try_into().unwrap();
1944 oracle.track(0, peers).await;
1945
1946 let handle = network.start();
1948
1949 context.sleep(Duration::from_millis(100)).await;
1951
1952 let running_before = count_running_tasks(&context, "peer_network");
1954 assert!(
1955 running_before > 0,
1956 "at least one network task should be running"
1957 );
1958
1959 handle.abort();
1961 let _ = handle.await;
1962
1963 context.sleep(Duration::from_millis(100)).await;
1965
1966 let running_after = count_running_tasks(&context, "peer_network");
1968 assert_eq!(
1969 running_after, 0,
1970 "all network tasks should be stopped, but {running_after} still running"
1971 );
1972 });
1973 }
1974
1975 #[test_traced]
1976 fn test_clean_shutdown() {
1977 for seed in 0..25 {
1978 clean_shutdown(seed);
1979 }
1980 }
1981
1982 fn duplicate_addresses_disconnected(seed: u64) {
1983 let base_port = 6000;
1984 let executor = deterministic::Runner::seeded(seed);
1985 executor.start(|context| async move {
1986 let peer0 = ed25519::PrivateKey::from_seed(0);
1987 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
1988 let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
1989 let peer1 = ed25519::PrivateKey::from_seed(1);
1990 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
1991 let peer2 = ed25519::PrivateKey::from_seed(2);
1992
1993 let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
1995 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
1996 let (mut sender0, _receiver0) =
1997 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
1998 network0.start();
1999
2000 let peer_set0: Vec<(_, Address)> = vec![
2002 (peer0.public_key(), Address::Symmetric(socket0)),
2003 (peer1.public_key(), Address::Symmetric(socket1)),
2004 (peer2.public_key(), Address::Symmetric(socket1)),
2005 ];
2006 oracle0.track(0, peer_set0.try_into().unwrap()).await;
2007
2008 context.sleep(Duration::from_secs(30)).await;
2010
2011 let sent = sender0
2013 .send(Recipients::All, peer1.public_key().as_ref(), true)
2014 .await
2015 .unwrap();
2016 assert!(sent.is_empty());
2017
2018 let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
2020 let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
2021 let (_sender1, mut receiver1) =
2022 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2023 network1.start();
2024
2025 let peer_set1: Vec<(_, Address)> = vec![
2027 (peer0.public_key(), Address::Symmetric(wrong_socket0)),
2028 (peer1.public_key(), Address::Symmetric(socket1)),
2029 (peer2.public_key(), Address::Symmetric(socket1)),
2030 ];
2031 oracle1.track(0, peer_set1.try_into().unwrap()).await;
2032
2033 context.sleep(Duration::from_secs(30)).await;
2035
2036 loop {
2038 let sent = sender0
2039 .send(Recipients::All, peer0.public_key().as_ref(), true)
2040 .await
2041 .unwrap();
2042 if sent.len() == 1 {
2043 assert_eq!(sent[0], peer1.public_key());
2044 break;
2045 }
2046 context.sleep(Duration::from_millis(100)).await;
2047 }
2048 let (sender, _) = receiver1.recv().await.unwrap();
2049 assert_eq!(sender, peer0.public_key());
2050 });
2051 }
2052
2053 #[test_traced]
2054 fn test_duplicate_addresses_disconnected() {
2055 for seed in 0..25 {
2057 duplicate_addresses_disconnected(seed);
2058 }
2059 }
2060
2061 #[test_traced]
2062 fn test_duplicate_addresses_connected() {
2063 let base_port = 6000;
2064 let executor = deterministic::Runner::default();
2065 executor.start(|context| async move {
2066 let peer0 = ed25519::PrivateKey::from_seed(0);
2067 let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
2068 let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
2069 let peer1 = ed25519::PrivateKey::from_seed(1);
2070 let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
2071 let peer2 = ed25519::PrivateKey::from_seed(2);
2072 let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
2073
2074 let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
2076 let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
2077 let (mut sender0, mut receiver0) =
2078 network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2079 network0.start();
2080
2081 let config2 = Config::test(peer2.clone(), socket2, MAX_MESSAGE_SIZE);
2083 let (mut network2, mut oracle2) = Network::new(context.with_label("peer_2"), config2);
2084 let (_sender2, mut receiver2) =
2085 network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2086 network2.start();
2087
2088 let peer_set: Vec<(_, Address)> = vec![
2090 (peer0.public_key(), Address::Symmetric(socket0)),
2091 (peer1.public_key(), Address::Symmetric(socket1)),
2092 (peer2.public_key(), Address::Symmetric(socket1)),
2093 ];
2094 oracle0.track(0, peer_set.clone().try_into().unwrap()).await;
2095 oracle2.track(0, peer_set.try_into().unwrap()).await;
2096
2097 context.sleep(Duration::from_secs(30)).await;
2099
2100 loop {
2102 let sent = sender0
2103 .send(Recipients::All, peer2.public_key().as_ref(), true)
2104 .await
2105 .unwrap();
2106 if sent.len() == 1 {
2107 assert_eq!(sent[0], peer2.public_key());
2108 break;
2109 }
2110 context.sleep(Duration::from_millis(100)).await;
2111 }
2112 let (sender, _) = receiver2.recv().await.unwrap();
2113 assert_eq!(sender, peer0.public_key());
2114
2115 let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
2117 let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
2118 let (mut sender1, _receiver1) =
2119 network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
2120 network1.start();
2121
2122 let peer_set1: Vec<(_, Address)> = vec![
2124 (peer0.public_key(), Address::Symmetric(wrong_socket0)),
2125 (peer1.public_key(), Address::Symmetric(socket1)),
2126 (peer2.public_key(), Address::Symmetric(socket1)),
2127 ];
2128 oracle1.track(0, peer_set1.try_into().unwrap()).await;
2129
2130 context.sleep(Duration::from_secs(30)).await;
2132
2133 loop {
2135 let sent = sender1
2136 .send(Recipients::All, peer1.public_key().as_ref(), true)
2137 .await
2138 .unwrap();
2139 if sent.len() == 2 {
2140 assert!(sent.contains(&peer0.public_key()));
2141 assert!(sent.contains(&peer2.public_key()));
2142 break;
2143 }
2144 context.sleep(Duration::from_millis(100)).await;
2145 }
2146 let mut received0 = false;
2147 while let Ok((sender, _)) = receiver0.recv().await {
2148 if sender == peer1.public_key() {
2150 received0 = true;
2151 break;
2152 }
2153 }
2154 assert!(received0);
2155 let (sender, _) = receiver2.recv().await.unwrap();
2156 assert_eq!(sender, peer1.public_key());
2157 });
2158 }
2159}