mod actors;
mod channels;
mod config;
mod metrics;
mod network;
mod types;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("message too large: {0}")]
MessageTooLarge(usize),
#[error("network closed")]
NetworkClosed,
}
pub use actors::tracker::Oracle;
pub use channels::{Receiver, Sender};
pub use config::{Bootstrapper, Config};
pub use network::Network;
#[cfg(test)]
mod tests {
use super::*;
use crate::{
authenticated::{
discovery::actors::router::{Actor as RouterActor, Config as RouterConfig},
relay::Relay,
},
Ingress, Manager, Provider, Receiver, Recipients, Sender,
};
use commonware_cryptography::{ed25519, Signer as _};
use commonware_macros::{select, select_loop, test_group, test_traced};
use commonware_runtime::{
count_running_tasks, deterministic, tokio, BufferPooler, Clock, Handle, IoBuf, Metrics,
Network as RNetwork, Quota, Resolver, Runner, Spawner,
};
use commonware_utils::{channel::mpsc, hostname, ordered::Set, TryCollect, NZU32};
use rand_core::{CryptoRngCore, RngCore};
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
#[derive(Copy, Clone)]
enum Mode {
All,
Some,
One,
}
const MAX_MESSAGE_SIZE: u32 = 1_024 * 1_024; const DEFAULT_MESSAGE_BACKLOG: usize = 128;
fn assert_no_rate_limiting(context: &impl Metrics) {
let metrics = context.encode();
assert!(
!metrics.contains("messages_rate_limited_total{"),
"no messages should be rate limited: {metrics}"
);
}
async fn run_network(
context: impl Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
max_message_size: u32,
base_port: u16,
n: usize,
mode: Mode,
) {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
let (complete_sender, mut complete_receiver) = mpsc::channel(peers.len());
for (i, peer) in peers.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let port = base_port + i as u16;
let mut bootstrappers = Vec::new();
if i > 0 {
bootstrappers.push((
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
));
}
let signer = peer.clone();
let config = Config::test(
signer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
bootstrappers,
max_message_size,
);
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let addresses = addresses.clone();
move |context| async move {
let receiver = context.with_label("receiver").spawn(move |_| async move {
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
complete_sender.send(()).await.unwrap();
loop {
receiver.recv().await.unwrap();
}
});
let msg = signer.public_key();
let sender = context
.with_label("sender")
.spawn(move |context| async move {
let mut recipients = addresses.clone();
recipients.remove(i);
recipients.sort();
loop {
match mode {
Mode::One => {
for recipient in &recipients {
loop {
let sent = sender
.send(
Recipients::One(recipient.clone()),
msg.as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() != 1 {
context.sleep(Duration::from_millis(100)).await;
continue;
}
assert_eq!(&sent[0], recipient);
break;
}
}
}
Mode::Some | Mode::All => {
loop {
let mut sent = sender
.send(
match mode {
Mode::Some => {
Recipients::Some(recipients.clone())
}
Mode::All => Recipients::All,
_ => unreachable!(),
},
msg.as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() != recipients.len() {
context.sleep(Duration::from_millis(100)).await;
continue;
}
sent.sort();
assert_eq!(sent, recipients);
break;
}
}
};
context.sleep(Duration::from_secs(10)).await;
}
});
select! {
receiver = receiver => {
panic!("receiver exited: {receiver:?}");
},
sender = sender => {
panic!("sender exited: {sender:?}");
},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
assert_no_rate_limiting(&context);
}
fn run_deterministic_test(seed: u64, mode: Mode) {
const NUM_PEERS: usize = 25;
const BASE_PORT: u16 = 3000;
let executor = deterministic::Runner::seeded(seed);
let state = executor.start(|context| async move {
run_network(
context.clone(),
MAX_MESSAGE_SIZE,
BASE_PORT,
NUM_PEERS,
mode,
)
.await;
context.auditor().state()
});
let executor = deterministic::Runner::seeded(seed);
let state2 = executor.start(|context| async move {
run_network(
context.clone(),
MAX_MESSAGE_SIZE,
BASE_PORT,
NUM_PEERS,
mode,
)
.await;
context.auditor().state()
});
assert_eq!(state, state2);
}
#[test_group("slow")]
#[test_traced]
fn test_determinism_one() {
for i in 0..10 {
run_deterministic_test(i, Mode::One);
}
}
#[test_group("slow")]
#[test_traced]
fn test_determinism_some() {
for i in 0..10 {
run_deterministic_test(i, Mode::Some);
}
}
#[test_group("slow")]
#[test_traced]
fn test_determinism_all() {
for i in 0..10 {
run_deterministic_test(i, Mode::All);
}
}
#[test_traced]
fn test_tokio_connectivity() {
let executor = tokio::Runner::default();
executor.start(|context| async move {
let base_port = 3000;
let n = 10;
run_network(context, MAX_MESSAGE_SIZE, base_port, n, Mode::One).await;
});
}
#[test_traced]
fn test_multi_index_oracle() {
let base_port = 3000;
let n: usize = 100;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
let mut waiters = Vec::new();
for (i, peer) in peers.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let port = base_port + i as u16;
let mut bootstrappers = Vec::new();
if i > 0 {
bootstrappers.push((
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
));
}
let signer = peer.clone();
let config = Config::test(
signer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
bootstrappers,
1_024 * 1_024, );
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle
.track(0, Set::try_from([addresses[0].clone()]).unwrap())
.await;
oracle
.track(
1,
Set::try_from([addresses[1].clone(), addresses[2].clone()]).unwrap(),
)
.await;
oracle
.track(
2,
addresses
.iter()
.skip(2)
.cloned()
.try_collect::<Set<_>>()
.unwrap(),
)
.await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
network.start();
let handler = context
.with_label("agent")
.spawn(move |context| async move {
if i == 0 {
let msg = signer.public_key();
loop {
if sender
.send(Recipients::All, msg.as_ref().to_vec(), true)
.await
.unwrap()
.len()
== n - 1
{
break;
}
context.sleep(Duration::from_millis(100)).await;
}
} else {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
}
});
waiters.push(handler);
}
for waiter in waiters.into_iter().rev() {
waiter.await.unwrap();
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_message_too_large() {
let base_port = 3000;
let n: usize = 2;
let executor = deterministic::Runner::seeded(0);
executor.start(|mut context| async move {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses: Set<_> = peers.iter().map(|p| p.public_key()).try_collect().unwrap();
let signer = peers[0].clone();
let config = Config::test(
signer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port),
Vec::new(),
1_024 * 1_024, );
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle.track(0, addresses.clone()).await;
let (mut sender, _) =
network.register(0, Quota::per_second(NZU32!(10)), DEFAULT_MESSAGE_BACKLOG);
network.start();
let mut msg = vec![0u8; 10 * 1024 * 1024]; context.fill_bytes(&mut msg[..]);
let recipient = Recipients::One(addresses[1].clone());
let result = sender.send(recipient, msg, true).await;
assert!(matches!(result, Err(Error::MessageTooLarge(_))));
});
}
#[test_traced]
fn test_rate_limiting() {
let base_port = 3000;
let n: usize = 2;
let executor = deterministic::Runner::seeded(0);
executor.start(|context| async move {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let signer0 = peers[0].clone();
let config0 = Config::test(
signer0.clone(),
socket0,
vec![(peers[1].public_key(), socket1.into())],
1_024 * 1_024, );
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
oracle0
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender0, _receiver0) =
network0.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let signer1 = peers[1].clone();
let config1 = Config::test(
signer1.clone(),
socket1,
vec![(peers[0].public_key(), socket0.into())],
1_024 * 1_024, );
let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
oracle1
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (_sender1, _receiver1) =
network1.register(0, Quota::per_minute(NZU32!(1)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
let msg = vec![0u8; 1024]; loop {
let sent = sender0
.send(Recipients::One(addresses[1].clone()), msg.clone(), true)
.await
.unwrap();
if !sent.is_empty() {
break;
}
context.sleep(Duration::from_mins(1)).await
}
let sent = sender0
.send(Recipients::One(addresses[1].clone()), msg, true)
.await
.unwrap();
assert!(sent.is_empty());
for _ in 0..10 {
assert_no_rate_limiting(&context);
context.sleep(Duration::from_millis(100)).await;
}
});
}
#[test_traced]
fn test_unordered_peer_sets() {
let (n, base_port) = (10, 3000);
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers_and_sks = Vec::new();
for i in 0..n {
let sk = ed25519::PrivateKey::from_seed(i as u64);
let pk = sk.public_key();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + i as u16);
peers_and_sks.push((sk, pk, addr));
}
let peer0 = peers_and_sks[0].clone();
let config = Config::test(
peer0.0,
peer0.2,
vec![(peer0.1.clone(), peer0.2.into())],
1_024 * 1_024,
);
let (network, mut oracle) = Network::new(context.with_label("network"), config);
network.start();
let mut subscription = oracle.subscribe().await;
let set10: Set<_> = peers_and_sks
.iter()
.take(2)
.map(|(_, pk, _)| pk.clone())
.try_collect()
.unwrap();
oracle.track(10, set10.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 10);
assert_eq!(update.latest.primary, set10);
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary, set10);
assert!(update.all.secondary.is_empty());
let set9: Set<_> = peers_and_sks
.iter()
.skip(2)
.map(|(_, pk, _)| pk.clone())
.try_collect()
.unwrap();
oracle.track(9, set9.clone()).await;
let set11: Set<_> = peers_and_sks
.iter()
.skip(4)
.map(|(_, pk, _)| pk.clone())
.try_collect()
.unwrap();
oracle.track(11, set11.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 11);
assert_eq!(update.latest.primary, set11);
assert!(update.latest.secondary.is_empty());
let all_keys: Set<_> = set10
.into_iter()
.chain(set11.into_iter())
.try_collect()
.unwrap();
assert_eq!(update.all.primary, all_keys);
assert!(update.all.secondary.is_empty());
});
}
#[test_traced]
fn test_graceful_shutdown() {
let base_port = 3000;
let n: usize = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
let (complete_sender, mut complete_receiver) = mpsc::channel(n);
for (i, peer) in peers.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let port = base_port + i as u16;
let mut bootstrappers = Vec::new();
if i > 0 {
bootstrappers.push((
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port).into(),
));
}
let signer = peer.clone();
let config = Config::test(
signer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
bootstrappers,
1_024 * 1_024, );
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
peer_context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
move |context| async move {
let expected_connections = if i == 0 { n - 1 } else { 1 };
let msg = signer.public_key();
loop {
let sent = sender
.send(Recipients::All, msg.as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() >= expected_connections {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
complete_sender.send(()).await.unwrap();
select_loop! {
context,
on_stopped => {},
Ok(_) = receiver.recv() else break => {},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
let metrics_before = context.encode();
let is_running = |name: &str| -> bool {
metrics_before.lines().any(|line| {
line.starts_with("runtime_tasks_running{")
&& line.contains(&format!("name=\"{name}\""))
&& line.contains("kind=\"Task\"")
&& line.trim_end().ends_with(" 1")
})
};
for i in 0..n {
let prefix = format!("peer_{i}_network");
assert!(
is_running(&format!("{prefix}_tracker")),
"peer_{i} tracker should be running"
);
assert!(
is_running(&format!("{prefix}_router")),
"peer_{i} router should be running"
);
assert!(
is_running(&format!("{prefix}_spawner")),
"peer_{i} spawner should be running"
);
assert!(
is_running(&format!("{prefix}_listener")),
"peer_{i} listener should be running"
);
assert!(
is_running(&format!("{prefix}_dialer")),
"peer_{i} dialer should be running"
);
}
let shutdown_context = context.clone();
context.with_label("shutdown").spawn(move |_| async move {
let result = shutdown_context.stop(0, Some(Duration::from_secs(5))).await;
assert!(
result.is_ok(),
"graceful shutdown should complete: {result:?}"
);
});
context.stopped().await.unwrap();
context.sleep(Duration::from_millis(100)).await;
let metrics_after = context.encode();
let is_stopped = |name: &str| -> bool {
metrics_after.lines().any(|line| {
line.starts_with("runtime_tasks_running{")
&& line.contains(&format!("name=\"{name}\""))
&& line.contains("kind=\"Task\"")
&& line.trim_end().ends_with(" 0")
})
};
for i in 0..n {
let prefix = format!("peer_{i}_network");
assert!(
is_stopped(&format!("{prefix}_tracker")),
"peer_{i} tracker should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_router")),
"peer_{i} router should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_spawner")),
"peer_{i} spawner should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_listener")),
"peer_{i} listener should be stopped"
);
assert!(
is_stopped(&format!("{prefix}_dialer")),
"peer_{i} dialer should be stopped"
);
}
});
}
#[test_traced]
fn test_subscription_includes_self_when_registered() {
let base_port = 3000;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let self_sk = ed25519::PrivateKey::from_seed(0);
let self_pk = self_sk.public_key();
let self_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let other_pk = ed25519::PrivateKey::from_seed(1).public_key();
let config = Config::test(
self_sk,
self_addr,
vec![], 1_024 * 1_024,
);
let (network, mut oracle) = Network::new(context.with_label("network"), config);
network.start();
let mut subscription = oracle.subscribe().await;
let peer_set: Set<_> = [other_pk.clone()].try_into().unwrap();
oracle.track(1, peer_set.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary.len(), 1);
assert!(update.all.secondary.is_empty());
assert!(
update.latest.primary.position(&self_pk).is_none(),
"latest set should not include self"
);
assert!(
update.latest.primary.position(&other_pk).is_some(),
"latest set should include other"
);
assert!(
update.all.primary.position(&self_pk).is_none(),
"peer set should not include self"
);
assert!(
update.all.primary.position(&other_pk).is_some(),
"peer set should include other"
);
let peer_set: Set<_> = [self_pk.clone(), other_pk.clone()].try_into().unwrap();
oracle.track(2, peer_set.clone()).await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 2);
assert_eq!(update.latest.primary.len(), 2);
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary.len(), 2);
assert!(update.all.secondary.is_empty());
assert!(
update.latest.primary.position(&self_pk).is_some(),
"latest set should include self"
);
assert!(
update.latest.primary.position(&other_pk).is_some(),
"latest set should include other"
);
assert!(
update.all.primary.position(&self_pk).is_some(),
"peer set should include self"
);
assert!(
update.all.primary.position(&other_pk).is_some(),
"peer set should include other"
);
});
}
#[test_traced]
fn test_dns_bootstrapper_resolution() {
let base_port = 3000;
let n: usize = 3;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
let (complete_sender, mut complete_receiver) = mpsc::channel(n);
for (i, peer) in peers.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let port = base_port + i as u16;
let bootstrappers = if i > 0 {
vec![(
addresses[0].clone(),
Ingress::Dns {
host: hostname!("boot.local"),
port: base_port,
},
)]
} else {
vec![]
};
let signer = peer.clone();
let config = Config::test(
signer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
bootstrappers,
1_024 * 1_024,
);
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let addresses = addresses.clone();
move |context| async move {
let receiver = context.with_label("receiver").spawn(move |_| async move {
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
complete_sender.send(()).await.unwrap();
loop {
receiver.recv().await.unwrap();
}
});
let msg = signer.public_key();
let sender =
context
.with_label("sender")
.spawn(move |context| async move {
loop {
let mut recipients = addresses.clone();
recipients.remove(i);
recipients.sort();
loop {
let mut sent = sender
.send(Recipients::All, msg.as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() != recipients.len() {
context.sleep(Duration::from_millis(100)).await;
continue;
}
sent.sort();
assert_eq!(sent, recipients);
break;
}
context.sleep(Duration::from_secs(10)).await;
}
});
select! {
receiver = receiver => {
panic!("receiver exited: {receiver:?}")
},
sender = sender => {
panic!("sender exited: {sender:?}")
},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_dns_resolution_failure_then_success() {
let base_port = 3100;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let peer1 = ed25519::PrivateKey::from_seed(1);
let addresses = vec![peer0.public_key(), peer1.public_key()];
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
oracle0
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let config1 = Config::test(
peer1.clone(),
socket1,
vec![(
peer0.public_key(),
Ingress::Dns {
host: hostname!("boot.local"),
port: base_port,
},
)],
1_024 * 1_024,
);
let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
oracle1
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender1, mut receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
context.sleep(Duration::from_secs(2)).await;
let sent = sender0
.send(Recipients::One(peer1.public_key()), b"test", true)
.await
.unwrap();
assert!(sent.is_empty(), "should not be connected yet");
context.resolver_register("boot.local", Some(vec![IpAddr::V4(Ipv4Addr::LOCALHOST)]));
let pk0 = peer0.public_key();
let pk1 = peer1.public_key();
let msg0 = pk0.to_vec();
let msg1 = pk1.to_vec();
let (done_sender, mut done_receiver) = mpsc::channel::<()>(2);
let done0 = done_sender.clone();
let pk1_clone = pk1.clone();
context.with_label("recv0").spawn(move |_| async move {
let (sender, message) = receiver0.recv().await.unwrap();
assert_eq!(sender, pk1_clone);
assert_eq!(message, msg1.as_slice());
done0.send(()).await.unwrap();
});
let done1 = done_sender.clone();
let pk0_clone = pk0.clone();
context.with_label("recv1").spawn(move |_| async move {
let (sender, message) = receiver1.recv().await.unwrap();
assert_eq!(sender, pk0_clone);
assert_eq!(message, msg0.as_slice());
done1.send(()).await.unwrap();
});
context.with_label("sender").spawn({
let pk0 = pk0.clone();
let pk1 = pk1.clone();
move |context| async move {
loop {
let sent0 = sender0
.send(Recipients::One(pk1.clone()), pk0.as_ref().to_vec(), true)
.await
.unwrap();
let sent1 = sender1
.send(Recipients::One(pk0.clone()), pk1.as_ref().to_vec(), true)
.await
.unwrap();
if !sent0.is_empty() && !sent1.is_empty() {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
});
done_receiver.recv().await.unwrap();
done_receiver.recv().await.unwrap();
});
}
fn run_dns_connectivity(seed: u64) -> String {
let base_port = 3400;
let n: usize = 3;
let executor = deterministic::Runner::seeded(seed);
executor.start(|context| async move {
let mut peers = Vec::new();
for i in 0..n {
peers.push(ed25519::PrivateKey::from_seed(i as u64));
}
let addresses = peers.iter().map(|p| p.public_key()).collect::<Vec<_>>();
let bootstrapper_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
context.resolver_register("boot.local", Some(vec![bootstrapper_ip]));
let (complete_sender, mut complete_receiver) = mpsc::channel(n);
for (i, peer) in peers.iter().enumerate() {
let context = context.with_label(&format!("peer_{i}"));
let port = base_port + i as u16;
let bootstrappers = if i > 0 {
vec![(
addresses[0].clone(),
Ingress::Dns {
host: hostname!("boot.local"),
port: base_port,
},
)]
} else {
vec![]
};
let signer = peer.clone();
let config = Config::test(
signer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
bootstrappers,
1_024 * 1_024,
);
let (mut network, mut oracle) = Network::new(context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender, mut receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network.start();
context.with_label("agent").spawn({
let complete_sender = complete_sender.clone();
let addresses = addresses.clone();
move |context| async move {
let receiver = context.with_label("receiver").spawn(move |_| async move {
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
complete_sender.send(()).await.unwrap();
loop {
receiver.recv().await.unwrap();
}
});
let msg = signer.public_key();
let sender =
context
.with_label("sender")
.spawn(move |context| async move {
loop {
let mut recipients = addresses.clone();
recipients.remove(i);
recipients.sort();
loop {
let mut sent = sender
.send(Recipients::All, msg.as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() != recipients.len() {
context.sleep(Duration::from_millis(100)).await;
continue;
}
sent.sort();
assert_eq!(sent, recipients);
break;
}
context.sleep(Duration::from_secs(10)).await;
}
});
select! {
receiver = receiver => {
panic!("receiver exited: {receiver:?}")
},
sender = sender => {
panic!("sender exited: {sender:?}")
},
}
}
});
}
for _ in 0..n {
complete_receiver.recv().await.unwrap();
}
context.auditor().state()
})
}
#[test_traced]
fn test_dns_resolution_determinism() {
let state1 = run_dns_connectivity(42);
let state2 = run_dns_connectivity(42);
assert_eq!(state1, state2, "DNS resolution should be deterministic");
}
#[test_traced]
fn test_dns_resolving_to_private_ip_not_dialed() {
let base_port = 3300;
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let peer1 = ed25519::PrivateKey::from_seed(1);
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
context.resolver_register("boot.local".to_string(), Some(vec![socket0.ip()]));
let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
let mut config0 = Config::test(peer0.clone(), socket0, vec![], 1_024 * 1_024);
config0.allow_private_ips = true;
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
oracle0
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (_sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let bootstrappers = vec![(
peer0.public_key(),
Ingress::Dns {
host: hostname!("boot.local"),
port: base_port,
},
)];
let mut config1 = Config::test(peer1.clone(), socket1, bootstrappers, 1_024 * 1_024);
config1.allow_private_ips = false; let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
oracle1
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender1, _receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
context.sleep(Duration::from_secs(5)).await;
let sent = sender1
.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
.await
.unwrap();
assert!(
sent.is_empty(),
"peer 1 should not have connected to peer 0 (private IP)"
);
select! {
msg = receiver0.recv() => {
panic!("peer 0 should not have received any message, got: {msg:?}");
},
_ = context.sleep(Duration::from_secs(1)) => {
},
}
});
}
#[test_traced]
fn test_dns_mixed_ips_connectivity() {
for seed in 0..25 {
let base_port = 3400;
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(120)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let peer1 = ed25519::PrivateKey::from_seed(1);
let good_ip = IpAddr::V4(Ipv4Addr::LOCALHOST);
let socket0 = SocketAddr::new(good_ip, base_port);
let socket1 = SocketAddr::new(good_ip, base_port + 1);
let mut all_ips0: Vec<IpAddr> = (1..=3)
.map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 100 + i)))
.collect();
all_ips0.push(good_ip);
context.resolver_register("peer-0.local", Some(all_ips0));
let mut all_ips1: Vec<IpAddr> = (1..=3)
.map(|i| IpAddr::V4(Ipv4Addr::new(127, 0, 0, 110 + i)))
.collect();
all_ips1.push(good_ip);
context.resolver_register("peer-1.local", Some(all_ips1));
let addresses: Vec<_> = vec![peer0.public_key(), peer1.public_key()];
let bootstrappers0 = vec![(
peer1.public_key(),
Ingress::Dns {
host: hostname!("peer-1.local"),
port: base_port + 1,
},
)];
let config0 = Config::test(peer0.clone(), socket0, bootstrappers0, 1_024 * 1_024);
let (mut network0, mut oracle0) =
Network::new(context.with_label("peer_0"), config0);
oracle0
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (_sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let bootstrappers1 = vec![(
peer0.public_key(),
Ingress::Dns {
host: hostname!("peer-0.local"),
port: base_port,
},
)];
let config1 = Config::test(peer1.clone(), socket1, bootstrappers1, 1_024 * 1_024);
let (mut network1, mut oracle1) =
Network::new(context.with_label("peer_1"), config1);
oracle1
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (mut sender1, _receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
let pk0 = peer0.public_key();
loop {
let sent = sender1
.send(
Recipients::One(pk0.clone()),
peer1.public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if !sent.is_empty() {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let (sender, msg) = receiver0.recv().await.unwrap();
assert_eq!(sender, peer1.public_key());
assert_eq!(msg, peer1.public_key().as_ref());
});
}
}
#[test_traced]
fn test_many_peer_restart_with_new_address() {
let base_port = 7500;
let n = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peers: Vec<_> = (0..n)
.map(|i| ed25519::PrivateKey::from_seed(i as u64))
.collect();
let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
for (i, peer) in peers.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let mut bootstrappers = Vec::new();
if i > 0 {
bootstrappers.push((
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
));
}
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
bootstrappers,
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[i] = Some(sender);
receivers[i] = Some(receiver);
let handle = network.start();
handles[i] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
let mut restart_counter = 0u16;
for round in 0..3 {
for restart_peer_idx in 1..n {
restart_counter += 1;
let new_port = base_port + 100 + restart_counter;
ports[restart_peer_idx] = new_port;
if let Some(handle) = handles[restart_peer_idx].take() {
handle.abort();
}
senders[restart_peer_idx] = None;
receivers[restart_peer_idx] = None;
let peer_context =
context.with_label(&format!("peer_{restart_peer_idx}_round_{round}"));
let bootstrappers = vec![(
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
)];
let config = Config::test(
peers[restart_peer_idx].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port),
bootstrappers,
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (sender, receiver) = network.register(
0,
Quota::per_second(NZU32!(100)),
DEFAULT_MESSAGE_BACKLOG,
);
senders[restart_peer_idx] = Some(sender);
receivers[restart_peer_idx] = Some(receiver);
let handle = network.start();
handles[restart_peer_idx] = Some(handle);
let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
loop {
let sent = restarted_sender
.send(
Recipients::All,
peers[restart_peer_idx].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
for i in 0..n {
if i == restart_peer_idx {
continue;
}
let sender = senders[i].as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::One(addresses[restart_peer_idx].clone()),
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) =
restarted_receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_simultaneous_peer_restart() {
let base_port = 7700;
let n = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peers: Vec<_> = (0..n)
.map(|i| ed25519::PrivateKey::from_seed(i as u64))
.collect();
let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
for (i, peer) in peers.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let mut bootstrappers = Vec::new();
if i > 0 {
bootstrappers.push((
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
));
}
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]),
bootstrappers,
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[i] = Some(sender);
receivers[i] = Some(receiver);
let handle = network.start();
handles[i] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
let restart_peers: Vec<usize> = (1..n).collect();
for &idx in &restart_peers {
if let Some(handle) = handles[idx].take() {
handle.abort();
}
senders[idx] = None;
receivers[idx] = None;
ports[idx] = base_port + 100 + idx as u16;
}
context.sleep(Duration::from_secs(2)).await;
for &idx in &restart_peers {
let peer_context = context.with_label(&format!("peer_{idx}_restarted"));
let bootstrappers = vec![(
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
)];
let config = Config::test(
peers[idx].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[idx]),
bootstrappers,
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[idx] = Some(sender);
receivers[idx] = Some(receiver);
let handle = network.start();
handles[idx] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message): (ed25519::PublicKey, _) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
assert_no_rate_limiting(&context);
});
}
#[test_traced]
fn test_peer_restart_with_new_address_must_dial() {
let base_port = 3600;
let n: usize = 5;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peers: Vec<_> = (0..n)
.map(|i| ed25519::PrivateKey::from_seed(i as u64))
.collect();
let addresses: Vec<_> = peers.iter().map(|p| p.public_key()).collect();
let mut senders: Vec<Option<channels::Sender<_, _>>> = (0..n).map(|_| None).collect();
let mut receivers: Vec<Option<channels::Receiver<_>>> = (0..n).map(|_| None).collect();
let mut handles: Vec<Option<Handle<()>>> = (0..n).map(|_| None).collect();
let mut ports: Vec<u16> = (0..n).map(|i| base_port + i as u16).collect();
let wrong_ip = IpAddr::V4(Ipv4Addr::new(10, 255, 255, 1)); let wrong_address_peer_idx = 2;
for (i, peer) in peers.iter().enumerate() {
let peer_context = context.with_label(&format!("peer_{i}"));
let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[i]);
let dialable_addr: Ingress = if i == wrong_address_peer_idx {
SocketAddr::new(wrong_ip, ports[i]).into()
} else {
listen_addr.into()
};
let mut bootstrappers = Vec::new();
if i > 0 {
bootstrappers.push((
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
));
}
let mut config =
Config::test(peer.clone(), listen_addr, bootstrappers, 1_024 * 1_024);
config.dialable = dialable_addr;
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[i] = Some(sender);
receivers[i] = Some(receiver);
let handle = network.start();
handles[i] = Some(handle);
}
for (i, sender) in senders.iter_mut().enumerate() {
let sender = sender.as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::All,
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
for receiver in receivers.iter_mut() {
let receiver = receiver.as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
}
let restart_peer_idx = 1;
let new_port = base_port + 100;
ports[restart_peer_idx] = new_port;
if let Some(handle) = handles[restart_peer_idx].take() {
handle.abort();
}
senders[restart_peer_idx] = None;
receivers[restart_peer_idx] = None;
let peer_context = context.with_label(&format!("peer_{restart_peer_idx}_restarted"));
let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), new_port);
let bootstrappers = vec![(
addresses[0].clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ports[0]).into(),
)];
let config = Config::test(
peers[restart_peer_idx].clone(),
listen_addr,
bootstrappers,
1_024 * 1_024,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
oracle
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
let (sender, receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
senders[restart_peer_idx] = Some(sender);
receivers[restart_peer_idx] = Some(receiver);
let handle = network.start();
handles[restart_peer_idx] = Some(handle);
let restarted_sender = senders[restart_peer_idx].as_mut().unwrap();
loop {
let sent = restarted_sender
.send(
Recipients::All,
peers[restart_peer_idx].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == n - 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
for i in 0..n {
if i == restart_peer_idx {
continue;
}
let sender = senders[i].as_mut().unwrap();
loop {
let sent = sender
.send(
Recipients::One(addresses[restart_peer_idx].clone()),
peers[i].public_key().as_ref().to_vec(),
true,
)
.await
.unwrap();
if sent.len() == 1 {
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
let restarted_receiver = receivers[restart_peer_idx].as_mut().unwrap();
let mut received = HashSet::new();
while received.len() < n - 1 {
let (sender, message) = restarted_receiver.recv().await.unwrap();
assert_eq!(message, sender.as_ref());
received.insert(sender);
}
assert_no_rate_limiting(&context);
});
}
fn duplicate_addresses_disconnected(seed: u64) {
let base_port = 6000;
let executor = deterministic::Runner::seeded(seed);
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
let peer1 = ed25519::PrivateKey::from_seed(1);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let peer2 = ed25519::PrivateKey::from_seed(2);
let addresses: Vec<_> =
vec![peer0.public_key(), peer1.public_key(), peer2.public_key()];
let config0 = Config::test(
peer0.clone(),
socket0,
vec![
(peer1.public_key(), socket1.into()),
(peer2.public_key(), socket1.into()),
],
MAX_MESSAGE_SIZE,
);
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
let (mut sender0, _receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
oracle0
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
context.sleep(Duration::from_secs(30)).await;
let sent = sender0
.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
.await
.unwrap();
assert!(sent.is_empty());
let config1 = Config::test(
peer1.clone(),
socket1,
vec![(peer0.public_key(), wrong_socket0.into())],
MAX_MESSAGE_SIZE,
);
let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
let (_sender1, mut receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
oracle1.track(0, Set::try_from(addresses).unwrap()).await;
context.sleep(Duration::from_secs(30)).await;
loop {
let sent = sender0
.send(Recipients::All, peer0.public_key().as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() == 1 {
assert_eq!(sent[0], peer1.public_key());
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let (sender, _) = receiver1.recv().await.unwrap();
assert_eq!(sender, peer0.public_key());
});
}
#[test_traced]
fn test_duplicate_addresses_disconnected() {
for seed in 0..25 {
duplicate_addresses_disconnected(seed);
}
}
#[test_traced]
fn test_duplicate_addresses_connected() {
let base_port = 6000;
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peer0 = ed25519::PrivateKey::from_seed(0);
let socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port);
let wrong_socket0 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 100);
let peer1 = ed25519::PrivateKey::from_seed(1);
let socket1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 1);
let peer2 = ed25519::PrivateKey::from_seed(2);
let socket2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), base_port + 2);
let addresses: Vec<_> =
vec![peer0.public_key(), peer1.public_key(), peer2.public_key()];
let config0 = Config::test(
peer0.clone(),
socket0,
vec![
(peer1.public_key(), socket1.into()),
(peer2.public_key(), socket1.into()),
],
MAX_MESSAGE_SIZE,
);
let (mut network0, mut oracle0) = Network::new(context.with_label("peer_0"), config0);
let (mut sender0, mut receiver0) =
network0.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network0.start();
let mut config2 = Config::test(
peer2.clone(),
socket2,
vec![(peer0.public_key(), socket0.into())],
MAX_MESSAGE_SIZE,
);
config2.dialable = socket1.into();
let (mut network2, mut oracle2) = Network::new(context.with_label("peer_2"), config2);
let (_sender2, mut receiver2) =
network2.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network2.start();
oracle0
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
oracle2
.track(0, Set::try_from(addresses.clone()).unwrap())
.await;
context.sleep(Duration::from_secs(30)).await;
loop {
let sent = sender0
.send(Recipients::All, peer2.public_key().as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() == 1 {
assert_eq!(sent[0], peer2.public_key());
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let (sender, _) = receiver2.recv().await.unwrap();
assert_eq!(sender, peer0.public_key());
let config1 = Config::test(
peer1.clone(),
socket1,
vec![(peer0.public_key(), wrong_socket0.into())],
MAX_MESSAGE_SIZE,
);
let (mut network1, mut oracle1) = Network::new(context.with_label("peer_1"), config1);
let (mut sender1, _receiver1) =
network1.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
network1.start();
oracle1.track(0, Set::try_from(addresses).unwrap()).await;
context.sleep(Duration::from_secs(30)).await;
loop {
let sent = sender1
.send(Recipients::All, peer1.public_key().as_ref().to_vec(), true)
.await
.unwrap();
if sent.len() == 2 {
assert!(sent.contains(&peer0.public_key()));
assert!(sent.contains(&peer2.public_key()));
break;
}
context.sleep(Duration::from_millis(100)).await;
}
let mut received0 = false;
while let Ok((sender, _)) = receiver0.recv().await {
if sender == peer1.public_key() {
received0 = true;
break;
}
}
assert!(received0);
let (sender, _) = receiver2.recv().await.unwrap();
assert_eq!(sender, peer1.public_key());
});
}
#[test_traced]
fn test_operations_after_shutdown_do_not_panic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let peer = ed25519::PrivateKey::from_seed(0);
let address = peer.public_key();
let peer_context = context.with_label("peer");
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
vec![],
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
let (mut sender, _receiver) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
let peers: Set<ed25519::PublicKey> = vec![address.clone()].try_into().unwrap();
oracle.track(0, peers.clone()).await;
let handle = network.start();
handle.abort();
context.sleep(Duration::from_millis(100)).await;
oracle.track(1, peers.clone()).await;
let _ = oracle.peer_set(0).await;
let _ = oracle.subscribe().await;
crate::block_peer(&mut oracle, address.clone()).await;
let sent = sender
.send(Recipients::All, address.as_ref().to_vec(), true)
.await
.unwrap();
assert!(sent.is_empty());
});
}
fn clean_shutdown(seed: u64) {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let peer = ed25519::PrivateKey::from_seed(0);
let peer_context = context.with_label("peer");
let config = Config::test(
peer.clone(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000),
vec![],
MAX_MESSAGE_SIZE,
);
let (mut network, mut oracle) =
Network::new(peer_context.with_label("network"), config);
let (_, _) =
network.register(0, Quota::per_second(NZU32!(100)), DEFAULT_MESSAGE_BACKLOG);
let peers: Set<ed25519::PublicKey> = vec![peer.public_key()].try_into().unwrap();
oracle.track(0, peers).await;
let handle = network.start();
context.sleep(Duration::from_millis(100)).await;
let running_before = count_running_tasks(&context, "peer_network");
assert!(
running_before > 0,
"at least one network task should be running"
);
handle.abort();
let _ = handle.await;
context.sleep(Duration::from_millis(100)).await;
let running_after = count_running_tasks(&context, "peer_network");
assert_eq!(
running_after, 0,
"all network tasks should be stopped, but {running_after} still running"
);
});
}
#[test_traced]
fn test_clean_shutdown() {
for seed in 0..25 {
clean_shutdown(seed);
}
}
#[test]
fn test_broadcast_slow_peer_no_blocking() {
let executor = deterministic::Runner::timed(Duration::from_secs(5));
executor.start(|context| async move {
let cfg = RouterConfig { mailbox_size: 10 };
let (router, mut mailbox, messenger) =
RouterActor::<_, ed25519::PublicKey>::new(context.clone(), cfg);
let channels = channels::Channels::new(messenger.clone(), MAX_MESSAGE_SIZE);
let _handle = router.start(channels);
let slow_peer = ed25519::PrivateKey::from_seed(0).public_key();
let (slow_low, _slow_low_rx) = mpsc::channel(10);
let (slow_high, _slow_high_rx) = mpsc::channel(10);
assert!(
mailbox
.ready(slow_peer.clone(), Relay::new(slow_low, slow_high))
.await
.is_some(),
"Failed to register slow peer"
);
let fast_peer = ed25519::PrivateKey::from_seed(1).public_key();
let (fast_low, mut fast_receiver) = mpsc::channel(100);
let (fast_high, _fast_high_rx) = mpsc::channel(100);
assert!(
mailbox
.ready(fast_peer.clone(), Relay::new(fast_low, fast_high))
.await
.is_some(),
"Failed to register fast peer"
);
let message = IoBuf::from(vec![0u8; 100]);
let mut messenger = messenger;
for i in 0..10 {
let sent = messenger
.content(Recipients::All, 0, message.clone().into(), false)
.await;
assert_eq!(sent.len(), 2, "Broadcast {i} should reach both peers");
}
let sent = messenger
.content(Recipients::All, 0, message.into(), false)
.await;
assert!(
sent.contains(&fast_peer),
"Fast peer should receive message"
);
for _ in 0..11 {
assert!(fast_receiver.try_recv().is_ok());
}
});
}
}