mod actors;
mod channels;
mod config;
mod metrics;
mod network;
mod types;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("message too large: {0}")]
MessageTooLarge(usize),
#[error("network closed")]
NetworkClosed,
}
pub use actors::tracker::Oracle;
pub use channels::{Receiver, Sender};
pub use config::Config;
pub use network::Network;
#[cfg(test)]
mod tests {
use super::*;
use crate::{Address, AddressableManager, Ingress, Provider, Receiver, Recipients, Sender};
use commonware_cryptography::{ed25519, Signer as _};
use commonware_macros::{select, test_group, test_traced};
use commonware_runtime::{
count_running_tasks, deterministic, tokio, BufferPooler, Clock, Metrics,
Network as RNetwork, Quota, Resolver, Runner, Spawner,
};
use commonware_utils::{
channel::mpsc,
hostname,
ordered::{Map, Set},
Hostname, TryCollect, NZU32,
};
use rand_core::{CryptoRngCore, RngCore};
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
#[derive(Copy, Clone)]
enum Mode {
All,
Some,
One,
}
const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
fn assert_no_rate_limiting(context: &impl Metrics) {
let metrics = context.encode();
assert!(
!metrics.contains("messages_rate_limited_total{"),
"no messages should be rate limited: {metrics}"
);
}
async fn run_network(
context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
max_message_size: u32,
base_port: u16,
n: usize,
mode: Mode,
) {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let private_key = ed25519::PrivateKey::from_seed(i as u64);
let public_key = private_key.public_key();
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((private_key, public_key, address));
}
let peers: Vec<(ed25519::PublicKey, Address)> = peers_and_sks
.iter()
.map(|(_, pub_key, addr)| (pub_key.clone(), (*addr).into()))
.collect::<Vec<_>>();
let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
for (i, (private_key, public_key, address)) in peers_and_sks.iter().enumerate() {
let public_key = public_key.clone();
let context = context.with_label(&format!("peer_{i}"));
let config = Config::test(private_key.clone(), *address, max_message_size);
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle.track(0, Map::try_from(peers.clone()).unwrap()).await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let peers = peers.clone();
move |context| async move {
let receiver = context.with_label("receiver").spawn(move |_| async move {
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
complete_sender.send(()).await.unwrap();
loop {
receiver.recv().await.unwrap();
}
});
let sender = context
.with_label("sender")
.spawn(move |context| async move {
let mut recipients: Vec<_> = peers
.iter()
.enumerate()
.filter(|(j, _)| i != *j)
.map(|(_, (pk, _))| pk.clone())
.collect();
recipients.sort();
loop {
match mode {
Mode::One => {
for pub_key in &recipients {
loop {
let sent = sender
.send(
Recipients::One(pub_key.clone()),
public_key.as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() != 1 {
context.sleep(Duration::from_millis(100)).await;
continue;
}
assert_eq!(&sent[0], pub_key);
break;
}
}
}
Mode::Some | Mode::All => {
loop {
let mut sent = sender
.send(
match mode {
Mode::Some => {
Recipients::Some(recipients.clone())
}
Mode::All => Recipients::All,
_ => unreachable!(),
},
public_key.as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() != recipients.len() {
context.sleep(Duration::from_millis(100)).await;
continue;
}
sent.sort();
assert_eq!(sent, recipients);
break;
}
}
};
context.sleep(Duration::from_secs(10)).await;
}
});
select! {
receiver = receiver => {
panic!("receiver exited: {receiver:?}");
},
sender = sender => {
panic!("sender exited: {sender:?}");
},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
assert_no_rate_limiting(&context);
}
fn run_deterministic_test(seed: u64, mode: Mode) {
const NUM_PEERS: usize = 25;
const BASE_PORT: u16 = 3000;
let executor = deterministic::Runner::seeded(seed);
let state = executor.start(|context| async move {
run_network(
context.clone(),
MAX_MESSAGE_SIZE,
BASE_PORT,
NUM_PEERS,
mode,
)
.await;
context.auditor().state()
});
let executor = deterministic::Runner::seeded(seed);
let state2 = executor.start(|context| async move {
run_network(
context.clone(),
MAX_MESSAGE_SIZE,
BASE_PORT,
NUM_PEERS,
mode,
)
.await;
context.auditor().state()
});
assert_eq!(state, state2);
}
#[test_group("slow")]
#[test_traced]
fn test_determinism_one() {
for i in 0..10 {
run_deterministic_test(i, Mode::One);
}
}
#[test_group("slow")]
#[test_traced]
fn test_determinism_some() {
for i in 0..10 {
run_deterministic_test(i, Mode::Some);
}
}
#[test_group("slow")]
#[test_traced]
fn test_determinism_all() {
for i in 0..10 {
run_deterministic_test(i, Mode::All);
}
}
#[test_traced]
fn test_tokio_connectivity() {
let executor = tokio::Runner::default();
executor.start(|context| async move {
let base_port = 4000;
let n = 10;
run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
});
}
#[test_traced]
fn test_multi_index_oracle() {
let base_port = 3000;
let n: usize = 10;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let sk = ed25519::PrivateKey::from_seed(i as u64);
let pk = sk.public_key();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((sk, pk, addr));
}
let peers = peers_and_sks
.iter()
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.collect::<Vec<_>>();
let mut waiters = Vec::new();
for (i, (peer_sk, peer_pk, peer_addr)) in peers_and_sks.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let config = Config::test(
peer_sk.clone(),
*peer_addr,
1_024 * 1_024, );
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle
.track(0, Map::try_from([peers[0].clone()]).unwrap())
.await;
oracle
.track(
1,
Map::try_from([peers[1].clone(), peers[2].clone()]).unwrap(),
)
.await;
oracle
.track(
2,
peers
.iter()
.skip(2)
.cloned()
.try_collect::<Map<_, _>>()
.unwrap(),
)
.await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
network.start();
let msg = peer_pk.clone();
let handler = context
.with_label("agent")
.spawn(move |context| async move {
if i == 0 {
loop {
if sender
.send(Recipients::All, msg.as_ref().to_vec(), true)
.await
.unwrap()
.len()
== n - 1
{
break;
}
context.sleep(Duration::from_millis(100)).await;
}
} else {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
}
});
waiters.push(handler);
}
for waiter in waiters.into_iter().rev() {
waiter.await.unwrap();
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_message_too_large() {
let base_port = 3000;
let n: usize = 2;
let executor = deterministic::Runner::seeded(0);
executor.start(|mut context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let peer_sk = ed25519::PrivateKey::from_seed(i as u64);
let peer_pk = peer_sk.public_key();
let peer_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((peer_sk, peer_pk, peer_addr));
}
let peers: Map<_, _> = peers_and_sks
.iter()
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.try_collect()
.unwrap();
let (sk, _, addr) = peers_and_sks[0].clone();
let config = Config::test(
sk,
addr,
1_024 * 1_024, );
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle.track(0, peers.clone()).await;
let (mut sender, _) =
network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
network.start();
let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
let recipient = Recipients::One(peers[1].clone());
let result = sender.send(recipient, msg, true).await;
assert!(matches!(result, Err(Error::MessageTooLarge(_))));
});
}
#[test_traced]
fn test_rate_limiting() {
let base_port = 3000;
let n: usize = 2;
let executor = deterministic::Runner::seeded(0);
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let sk = ed25519::PrivateKey::from_seed(i as u64);
let pk = sk.public_key();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((sk, pk, addr));
}
let peers: Map<_, _> = peers_and_sks
.iter()
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.try_collect()
.unwrap();
let (sk0, _, addr0) = peers_and_sks[0].clone();
let (sk1, pk1, addr1) = peers_and_sks[1].clone();
let config0 = Config::test(sk0, addr0, 1_024 * 1_024); let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
oracle0.track(0, peers.clone()).await;
let (mut sender0, _receiver0) =
network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let config1 = Config::test(sk1, addr1, 1_024 * 1_024); let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
oracle1.track(0, peers.clone()).await;
let (_sender1, _receiver1) =
network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
let msg = vec![0u8; 1024]; loop {
let sent = sender0
.send(Recipients::One(pk1.clone()), msg.clone(), true)
.await
.unwrap();
if !sent.is_empty() {
break;
}
context.sleep(Duration::from_mins(1)).await
}
let sent = sender0.send(Recipients::One(pk1), msg, true).await.unwrap();
assert!(sent.is_empty());
for _ in 0..10 {
assert_no_rate_limiting(&context);
context.sleep(Duration::from_millis(100)).await;
}
});
}
#[test_traced]
fn test_unordered_peer_sets() {
let (n, base_port) = (10, 3000);
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let sk = ed25519::PrivateKey::from_seed(i as u64);
let pk = sk.public_key();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((sk, pk, addr));
}
let peer0 = peers_and_sks[0].clone();
let config = Config::test(peer0.0, peer0.2, 1_024 * 1_024);
let (network, mut oracle) = Network::new(context.with_label("network"), config);
network.start();
let mut subscription = oracle.subscribe().await;
let set10: Map<_, _> = peers_and_sks
.iter()
.take(2)
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.try_collect()
.unwrap();
oracle.track(10, set10.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 10);
assert_eq!(&update.latest.primary, set10.keys());
assert!(update.latest.secondary.is_empty());
assert_eq!(&update.all.primary, set10.keys());
assert!(update.all.secondary.is_empty());
let set9: Map<_, _> = peers_and_sks
.iter()
.skip(2)
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.try_collect()
.unwrap();
oracle.track(9, set9.clone()).await;
let set11: Map<_, _> = peers_and_sks
.iter()
.skip(4)
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.try_collect()
.unwrap();
oracle.track(11, set11.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 11);
assert_eq!(&update.latest.primary, set11.keys());
assert!(update.latest.secondary.is_empty());
let all_keys: Set<_> = set10
.into_keys()
.into_iter()
.chain(set11.into_keys().into_iter())
.try_collect()
.unwrap();
assert_eq!(update.all.primary, all_keys);
assert!(update.all.secondary.is_empty());
});
}
#[test_traced]
fn test_graceful_shutdown() {
let base_port = 3000;
let n: usize = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let sk = ed25519::PrivateKey::from_seed(i as u64);
let pk = sk.public_key();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((sk, pk, addr));
}
let peers: Map<_, _> = peers_and_sks
.iter()
.map(|(_, pk, addr)| (pk.clone(), (*addr).into()))
.try_collect()
.unwrap();
let (complete_sender, mut complete_receiver) = mpsc::channel(n);
for (i, (sk, pk, addr)) in peers_and_sks.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let config = Config::test(sk.clone(), *addr, 1_024 * 1_024);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle.track(0, peers.clone()).await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
peer_context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let pk = pk.clone();
move |context| async move {
let expected_connections = if i == 0 { n - 1 } else { 1 };
loop {
let sent = sender
.send(Recipients::All, pk.as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() >= expected_connections {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
complete_sender.send(()).await.unwrap();
loop {
select! {
result = receiver.recv() => {
if result.is_err() {
break;
}
},
_ = context.stopped() => {
break;
},
}
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
let metrics_before = context.encode();
let is_running = |name: &str| -> bool {
metrics_before.lines().any(|line| {
line.starts_with("runtime_tasks_running{")
&& line.contains(&format!("name=\"{name}\""))
&& line.contains("kind=\"Task\"")
&& line.trim_end().ends_with(" 1")
})
};
for i in 0..n {
let prefix = format!("peer_{i}_network");
assert!(
is_running(&format!("{prefix}_tracker")),
"peer_{i} tracker should be running"
);
assert!(
is_running(&format!("{prefix}_router")),
"peer_{i} router should be running"
);
assert!(
is_running(&format!("{prefix}_spawner")),
"peer_{i} spawner should be running"
);
assert!(
is_running(&format!("{prefix}_listener")),
"peer_{i} listener should be running"
);
assert!(
is_running(&format!("{prefix}_dialer")),
"peer_{i} dialer should be running"
);
}
let shutdown_context = context.clone();
context.with_label("shutdown").spawn(move |_| async move {
let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
assert!(
result.is_ok(),
"graceful shutdown should complete: {result:?}"
);
});
context.stopped().await.unwrap();
context.sleep(Duration::from_millis(100)).await;
let metrics_after = context.encode();
let is_stopped = |name: &str| -> bool {
metrics_after.lines().any(|line| {
line.starts_with("runtime_tasks_running{")
&& line.contains(&format!("name=\"{name}\""))
&& line.contains("kind=\"Task\"")
&& line.trim_end().ends_with(" 0")
})
};
for i in 0..n {
let prefix = format!("peer_{i}_network");
assert!(
is_stopped(&format!("{prefix}_tracker")),
"peer_{i} tracker should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_router")),
"peer_{i} router should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_spawner")),
"peer_{i} spawner should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_listener")),
"peer_{i} listener should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_dialer")),
"peer_{i} dialer should be stopped"
);
}
});
}
#[test_traced]
fn test_subscription_includes_self_when_registered() {
let base_port = 3000;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let self_sk = ed25519::PrivateKey::from_seed(0);
let self_pk = self_sk.public_key();
let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
let other_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let config = Config::test(self_sk, self_addr, 1_024 * 1_024);
let (network, mut oracle) = Network::new(context.with_label("network"), config);
network.start();
let mut subscription = oracle.subscribe().await;
let peer_set: Map<_, _> = [(other_pk.clone(), other_addr.into())].try_into().unwrap();
oracle.track(1, peer_set.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
assert_eq!(update.latest.primary.len(), 1);
assert_eq!(update.all.primary.len(), 1);
assert!(update.all.secondary.is_empty());
assert!(
update.latest.primary.position(&self_pk).is_none(),
"new set should not include self"
);
assert!(
update.latest.primary.position(&other_pk).is_some(),
"new set should include other"
);
assert!(
update.latest.secondary.is_empty(),
"new secondary set should be empty"
);
assert!(
update.all.primary.position(&self_pk).is_none(),
"peer set should not include self"
);
assert!(
update.all.primary.position(&other_pk).is_some(),
"peer set should include other"
);
let peer_set: Map<_, _> = [
(self_pk.clone(), self_addr.into()),
(other_pk.clone(), other_addr.into()),
]
.try_into()
.unwrap();
oracle.track(2, peer_set.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 2);
assert_eq!(update.latest.primary.len(), 2);
assert_eq!(update.all.primary.len(), 2);
assert!(update.all.secondary.is_empty());
assert!(
update.latest.primary.position(&self_pk).is_some(),
"new set should include self"
);
assert!(
update.latest.primary.position(&other_pk).is_some(),
"new set should include other"
);
assert!(
update.latest.secondary.is_empty(),
"new secondary set should be empty"
);
assert!(
update.all.primary.position(&self_pk).is_some(),
"peer set should include self"
);
assert!(
update.all.primary.position(&other_pk).is_some(),
"peer set should include other"
);
});
}
#[test_traced]
fn test_dns_peer_addresses() {
let base_port = 3200;
let n: usize = 3;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let private_key = ed25519::PrivateKey::from_seed(i as u64);
let public_key = private_key.public_key();
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
let host_str = format!("peer-{i}.local");
let host = Hostname::new(&host_str).unwrap();
peers_and_sks.push((private_key, public_key, socket, host_str, host));
}
for (_, _, socket, host_str, _) in &peers_and_sks {
context.resolver_register(host_str.clone(), Some(vec![socket.ip()]));
}
let peers: Vec<(_, Address)> = peers_and_sks
.iter()
.map(|(_, pk, socket, _, host)| {
(
pk.clone(),
Address::Asymmetric {
ingress: Ingress::Dns {
host: host.clone(),
port: socket.port(),
},
egress: *socket,
},
)
})
.collect();
let (complete_sender, mut complete_receiver) = mpsc::channel(n);
for (i, (private_key, public_key, socket, _, _)) in peers_and_sks.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle.track(0, Map::try_from(peers.clone()).unwrap()).await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
let pk = public_key.clone();
context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let peers = peers.clone();
move |context| async move {
let receiver = context.with_label("receiver").spawn(move |_| async move {
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
complete_sender.send(()).await.unwrap();
loop {
receiver.recv().await.unwrap();
}
});
let sender_task =
context
.with_label("sender")
.spawn(move |context| async move {
loop {
let mut recipients: Vec<_> = peers
.iter()
.filter(|(p, _)| p != &pk)
.map(|(p, _)| p.clone())
.collect();
recipients.sort();
loop {
let mut sent = sender
.send(Recipients::All, pk.as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() != n - 1 {
context.sleep(Duration::from_millis(100)).await;
continue;
}
sent.sort();
assert_eq!(sent, recipients);
break;
}
context.sleep(Duration::from_secs(10)).await;
}
});
select! {
receiver = receiver => {
panic!("receiver exited: {receiver:?}")
},
sender = sender_task => {
panic!("sender exited: {sender:?}")
},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_mixed_socket_and_dns_addresses() {
let base_port = 3300;
let n: usize = 4;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let private_key = ed25519::PrivateKey::from_seed(i as u64);
let public_key = private_key.public_key();
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((private_key, public_key, socket));
}
for (i, (_, _, socket)) in peers_and_sks.iter().enumerate().skip(2) {
context.resolver_register(format!("peer-{i}.local"), Some(vec![socket.ip()]));
}
let peers: Vec<(_, Address)> = peers_and_sks
.iter()
.enumerate()
.map(|(i, (_, pk, socket))| {
let addr = if i < 2 {
Address::Symmetric(*socket)
} else {
Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!(&format!("peer-{i}.local")),
port: socket.port(),
},
egress: *socket,
}
};
(pk.clone(), addr)
})
.collect();
let (complete_sender, mut complete_receiver) = mpsc::channel(n);
for (i, (private_key, public_key, socket)) in peers_and_sks.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let config = Config::test(private_key.clone(), *socket, 1_024 * 1_024);
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle.track(0, Map::try_from(peers.clone()).unwrap()).await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
let pk = public_key.clone();
context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let peers = peers.clone();
move |context| async move {
let receiver = context.with_label("receiver").spawn(move |_| async move {
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
complete_sender.send(()).await.unwrap();
loop {
receiver.recv().await.unwrap();
}
});
let sender_task =
context
.with_label("sender")
.spawn(move |context| async move {
loop {
let mut recipients: Vec<_> = peers
.iter()
.filter(|(p, _)| p != &pk)
.map(|(p, _)| p.clone())
.collect();
recipients.sort();
loop {
let mut sent = sender
.send(Recipients::All, pk.as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() != n - 1 {
context.sleep(Duration::from_millis(100)).await;
continue;
}
sent.sort();
assert_eq!(sent, recipients);
break;
}
context.sleep(Duration::from_secs(10)).await;
}
});
select! {
receiver = receiver => {
panic!("receiver exited: {receiver:?}")
},
sender = sender_task => {
panic!("sender exited: {sender:?}")
},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_dns_resolving_to_private_ip_not_dialed() {
let base_port = 4400;
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let peer1 = ed25519::PrivateKey::from_seed(1);
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
context.resolver_register("peer-0.local".to_string(), Some(vec![socket0.ip()]));
let mut config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
config0.allow_private_ips = true;
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
let peers0: Vec<(_, Address)> = vec![
(peer0.public_key(), Address::Symmetric(socket0)),
(peer1.public_key(), Address::Symmetric(socket1)),
];
oracle0.track(0, Map::try_from(peers0).unwrap()).await;
let (_sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let mut config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
config1.allow_private_ips = false; let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
let peers1: Vec<(_, Address)> = vec![
(
peer0.public_key(),
Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!("peer-0.local"),
port: socket0.port(),
},
egress: socket0,
},
),
(peer1.public_key(), Address::Symmetric(socket1)),
];
oracle1.track(0, Map::try_from(peers1).unwrap()).await;
let (mut sender1, _receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
context.sleep(Duration::from_secs(5)).await;
let sent = sender1
.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
.await
.unwrap();
assert!(
sent.is_empty(),
"peer 1 should not have connected to peer 0 (private IP)"
);
select! {
msg = receiver0.recv() => {
panic!("peer 0 should not have received any message, got: {msg:?}");
},
_ = context.sleep(Duration::from_secs(1)) => {
},
}
});
}
#[test_traced]
fn test_dns_mixed_ips_connectivity() {
for seed in 0..25 {
let base_port = 3500;
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(120)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let peer1 = ed25519::PrivateKey::from_seed(1);
let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
let socket0 = SocketAddr::new(good_ip, base_port);
let socket1 = SocketAddr::new(good_ip, base_port + 1);
let mut all_ips0: Vec<IpAddr> = (1..=3)
.map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
.collect();
all_ips0.push(good_ip);
context.resolver_register("peer-0.local", Some(all_ips0));
let mut all_ips1: Vec<IpAddr> = (1..=3)
.map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
.collect();
all_ips1.push(good_ip);
context.resolver_register("peer-1.local", Some(all_ips1));
let peers: Vec<(_, Address)> = vec![
(
peer0.public_key(),
Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!("peer-0.local"),
port: base_port,
},
egress: socket0,
},
),
(
peer1.public_key(),
Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!("peer-1.local"),
port: base_port + 1,
},
egress: socket1,
},
),
];
let config0 = Config::test(peer0.clone(), socket0, 1_024 * 1_024);
let (mut network0, mut oracle0) =
Network::new(context.with_label("peer_0"), config0);
oracle0
.track(0, Map::try_from(peers.clone()).unwrap())
.await;
let (_sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let config1 = Config::test(peer1.clone(), socket1, 1_024 * 1_024);
let (mut network1, mut oracle1) =
Network::new(context.with_label("peer_1"), config1);
oracle1
.track(0, Map::try_from(peers.clone()).unwrap())
.await;
let (mut sender1, _receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
let pk0 = peer0.public_key();
loop {
let sent = sender1
.send(
Recipients::One(pk0.clone()),
peer1.public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if !sent.is_empty() {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let (sender, msg) = receiver0.recv().await.unwrap();
assert_eq!(sender, peer1.public_key());
assert_eq!(msg, peer1.public_key().as_ref());
});
}
}
#[test_traced]
fn test_many_peer_restart_with_new_address() {
let base_port = 9500;
let n = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peers: Vec<_> = (0..n)
.map(|i| ed25519::PrivateKey::from_seed(i as u64))
.collect();
let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
(0..n).map(|_| None).collect();
let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
let peer_set: Vec<(_, Address)> = addresses
.iter()
.enumerate()
.map(|(i, pk)| {
(
pk.clone(),
Address::Symmetric(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
ports[i],
)),
)
})
.collect();
for (i, peer) in peers.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Map::try_from(peer_set.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[i] = Some(sender);
receivers[i] = Some(receiver);
oracles[i] = Some(oracle);
let handle = network.start();
handles[i] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
let mut restart_counter = 0u16;
for round in 0..3 {
for restart_peer_idx in 1..n {
restart_counter += 1;
let new_port = base_port + 100 + restart_counter;
ports[restart_peer_idx] = new_port;
if let Some(handle) = handles[restart_peer_idx].take() {
handle.abort();
}
senders[restart_peer_idx] = None;
receivers[restart_peer_idx] = None;
oracles[restart_peer_idx] = None;
let updated_peer_set: Vec<(_, Address)> = addresses
.iter()
.enumerate()
.map(|(i, pk)| {
(
pk.clone(),
Address::Symmetric(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
ports[i],
)),
)
})
.collect();
for oracle in oracles.iter_mut().flatten() {
oracle
.track(
(round * (n - 1) + restart_peer_idx) as u64,
Map::try_from(updated_peer_set.clone()).unwrap(),
)
.await;
}
let peer_context =
context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
let config = Config::test(
peers[restart_peer_idx].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(
(round * (n - 1) + restart_peer_idx) as u64,
Map::try_from(updated_peer_set.clone()).unwrap(),
)
.await;
let (sender, receiver) = network.register(
0,
Quota::per_second(NZU32!(100)),
DEFAULT_MESSAGE_BACKLOG,
);
senders[restart_peer_idx] = Some(sender);
receivers[restart_peer_idx] = Some(receiver);
oracles[restart_peer_idx] = Some(oracle);
let handle = network.start();
handles[restart_peer_idx] = Some(handle);
let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
loop {
let sent = restarted_sender
.send(
Recipients::All,
peers[restart_peer_idx].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
for i in 0..n {
if i == restart_peer_idx {
continue;
}
let sender = senders[i].as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::One(addresses[restart_peer_idx].clone()),
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) =
restarted_receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_simultaneous_peer_restart() {
let base_port = 9700;
let n = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peers: Vec<_> = (0..n)
.map(|i| ed25519::PrivateKey::from_seed(i as u64))
.collect();
let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
let peer_set: Vec<(_, Address)> = addresses
.iter()
.enumerate()
.map(|(i, pk)| {
(
pk.clone(),
Address::Symmetric(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
ports[i],
)),
)
})
.collect();
let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
let mut oracles: Vec<Option<Oracle<_>>> = (0..n).map(|_| None).collect();
let mut handles: Vec<Option<commonware_runtime::Handle<()>>> =
(0..n).map(|_| None).collect();
for (i, peer) in peers.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Map::try_from(peer_set.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[i] = Some(sender);
receivers[i] = Some(receiver);
oracles[i] = Some(oracle);
let handle = network.start();
handles[i] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
let restart_peers: Vec<usize> = (1..n).collect();
for &idx in &restart_peers {
if let Some(handle) = handles[idx].take() {
handle.abort();
}
senders[idx] = None;
receivers[idx] = None;
oracles[idx] = None;
ports[idx] = base_port + 100 + idx as u16;
}
context.sleep(Duration::from_secs(2)).await;
let updated_peer_set: Vec<(_, Address)> = addresses
.iter()
.enumerate()
.map(|(i, pk)| {
(
pk.clone(),
Address::Symmetric(SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
ports[i],
)),
)
})
.collect();
oracles[0]
.as_mut()
.unwrap()
.track(1, Map::try_from(updated_peer_set.clone()).unwrap())
.await;
for &idx in &restart_peers {
let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
let config = Config::test(
peers[idx].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(1, Map::try_from(updated_peer_set.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[idx] = Some(sender);
receivers[idx] = Some(receiver);
oracles[idx] = Some(oracle);
let handle = network.start();
handles[idx] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_operations_after_shutdown_do_not_panic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peer = ed25519::PrivateKey::from_seed(0);
let address = peer.public_key();
let peer_context = context.with_label("peer");
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
let (mut sender, _receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
let peer_addr =
Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
let peers: Map<ed25519::PublicKey, Address> =
vec![(address.clone(), peer_addr)].try_into().unwrap();
oracle.track(0, peers.clone()).await;
let handle = network.start();
handle.abort();
context.sleep(Duration::from_millis(100)).await;
oracle.track(1, peers.clone()).await;
let _ = oracle.peer_set(0).await;
let _ = oracle.subscribe().await;
crate::block_peer(&mut oracle, address.clone()).await;
let sent = sender
.send(Recipients::All, address.as_ref().to_vec(), true)
.await
.unwrap();
assert!(sent.is_empty());
});
}
fn clean_shutdown(seed: u64) {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let peer = ed25519::PrivateKey::from_seed(0);
let peer_context = context.with_label("peer");
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200),
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
let (_, _) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
let peer_addr =
Address::Symmetric(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5200));
let peers: Map<ed25519::PublicKey, Address> =
vec![(peer.public_key(), peer_addr)].try_into().unwrap();
oracle.track(0, peers).await;
let handle = network.start();
context.sleep(Duration::from_millis(100)).await;
let running_before = count_running_tasks(&context, "peer_network");
assert!(
running_before > 0,
"at least one network task should be running"
);
handle.abort();
let _ = handle.await;
context.sleep(Duration::from_millis(100)).await;
let running_after = count_running_tasks(&context, "peer_network");
assert_eq!(
running_after, 0,
"all network tasks should be stopped, but {running_after} still running"
);
});
}
#[test_traced]
fn test_clean_shutdown() {
for seed in 0..25 {
clean_shutdown(seed);
}
}
fn duplicate_addresses_disconnected(seed: u64) {
let base_port = 6000;
let executor = deterministic::Runner::seeded(seed);
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
let peer1 = ed25519::PrivateKey::from_seed(1);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let peer2 = ed25519::PrivateKey::from_seed(2);
let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
let (mut sender0, _receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let peer_set0: Vec<(_, Address)> = vec![
(peer0.public_key(), Address::Symmetric(socket0)),
(peer1.public_key(), Address::Symmetric(socket1)),
(peer2.public_key(), Address::Symmetric(socket1)),
];
oracle0.track(0, Map::try_from(peer_set0).unwrap()).await;
context.sleep(Duration::from_secs(30)).await;
let sent = sender0
.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
.await
.unwrap();
assert!(sent.is_empty());
let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
let (_sender1, mut receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
let peer_set1: Vec<(_, Address)> = vec![
(peer0.public_key(), Address::Symmetric(wrong_socket0)),
(peer1.public_key(), Address::Symmetric(socket1)),
(peer2.public_key(), Address::Symmetric(socket1)),
];
oracle1.track(0, Map::try_from(peer_set1).unwrap()).await;
context.sleep(Duration::from_secs(30)).await;
loop {
let sent = sender0
.send(Recipients::All, peer0.public_key().as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() == 1 {
assert_eq!(sent[0], peer1.public_key());
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let (sender, _) = receiver1.recv().await.unwrap();
assert_eq!(sender, peer0.public_key());
});
}
#[test_traced]
fn test_duplicate_addresses_disconnected() {
for seed in 0..25 {
duplicate_addresses_disconnected(seed);
}
}
#[test_traced]
fn test_duplicate_addresses_connected() {
let base_port = 6000;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
let peer1 = ed25519::PrivateKey::from_seed(1);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let peer2 = ed25519::PrivateKey::from_seed(2);
let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
let config0 = Config::test(peer0.clone(), socket0, MAX_MESSAGE_SIZE);
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
let (mut sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let config2 = Config::test(peer2.clone(), socket2, MAX_MESSAGE_SIZE);
let (mut network2, mut oracle2) = Network::new(context.with_label("peer_2"), config2);
let (_sender2, mut receiver2) =
network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network2.start();
let peer_set: Vec<(_, Address)> = vec![
(peer0.public_key(), Address::Symmetric(socket0)),
(peer1.public_key(), Address::Symmetric(socket1)),
(peer2.public_key(), Address::Symmetric(socket1)),
];
oracle0
.track(0, Map::try_from(peer_set.clone()).unwrap())
.await;
oracle2.track(0, Map::try_from(peer_set).unwrap()).await;
context.sleep(Duration::from_secs(30)).await;
loop {
let sent = sender0
.send(Recipients::All, peer2.public_key().as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() == 1 {
assert_eq!(sent[0], peer2.public_key());
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let (sender, _) = receiver2.recv().await.unwrap();
assert_eq!(sender, peer0.public_key());
let config1 = Config::test(peer1.clone(), socket1, MAX_MESSAGE_SIZE);
let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
let (mut sender1, _receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
let peer_set1: Vec<(_, Address)> = vec![
(peer0.public_key(), Address::Symmetric(wrong_socket0)),
(peer1.public_key(), Address::Symmetric(socket1)),
(peer2.public_key(), Address::Symmetric(socket1)),
];
oracle1.track(0, Map::try_from(peer_set1).unwrap()).await;
context.sleep(Duration::from_secs(30)).await;
loop {
let sent = sender1
.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() == 2 {
assert!(sent.contains(&peer0.public_key()));
assert!(sent.contains(&peer2.public_key()));
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let mut received0 = false;
while let Ok((sender, _)) = receiver0.recv().await {
if sender == peer1.public_key() {
received0 = true;
break;
}
}
assert!(received0);
let (sender, _) = receiver2.recv().await.unwrap();
assert_eq!(sender, peer1.public_key());
});
}
}