mod bandwidth;
mod ingress;
mod metrics;
mod network;
mod transmitter;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error("message too large: {0}")]
MessageTooLarge(usize),
#[error("network closed")]
NetworkClosed,
#[error("not valid to link self")]
LinkingSelf,
#[error("link already exists")]
LinkExists,
#[error("link missing")]
LinkMissing,
#[error("invalid success rate (must be in [0, 1]): {0}")]
InvalidSuccessRate(f64),
#[error("send_frame failed")]
SendFrameFailed,
#[error("recv_frame failed")]
RecvFrameFailed,
#[error("bind failed")]
BindFailed,
#[error("accept failed")]
AcceptFailed,
#[error("dial failed")]
DialFailed,
#[error("peer missing")]
PeerMissing,
}
pub use ingress::{Control, Link, Manager, Oracle, SocketManager};
pub use network::{
Config, ConnectedPeerProvider, Network, Receiver, Sender, SplitForwarder, SplitOrigin,
SplitRouter, SplitSender, SplitTarget, UnlimitedSender,
};
#[cfg(test)]
mod tests {
use super::*;
use crate::{
Address, AddressableManager, AddressableTrackedPeers, Ingress, Manager, Provider, Receiver,
Recipients, Sender, TrackedPeers,
};
use commonware_cryptography::{
ed25519::{self, PrivateKey, PublicKey},
Signer as _,
};
use commonware_macros::{select, test_group};
use commonware_runtime::{
count_running_tasks, deterministic, Clock, IoBuf, Metrics, Quota, Runner, Spawner,
};
use commonware_utils::{
channel::mpsc,
hostname, ordered,
ordered::{Map, Set},
NZUsize, NZU32,
};
use rand::Rng;
use std::{
collections::{BTreeMap, HashMap, HashSet},
net::SocketAddr,
num::NonZeroU32,
time::Duration,
};
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
async fn track_peers<I>(oracle: &Oracle<PublicKey, deterministic::Context>, peers: I)
where
I: IntoIterator<Item = PublicKey>,
{
let mut manager = oracle.manager();
manager.track(0, Set::from_iter_dedup(peers)).await;
}
fn simulate_messages(seed: u64, size: usize) -> (String, Vec<usize>) {
let executor = deterministic::Runner::seeded(seed);
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut agents = BTreeMap::new();
let (seen_sender, mut seen_receiver) = mpsc::channel(1024);
for i in 0..size {
let pk = PrivateKey::from_seed(i as u64).public_key();
let (sender, mut receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
agents.insert(pk, sender);
let agent_sender = seen_sender.clone();
context
.with_label("agent_receiver")
.spawn(move |_| async move {
for _ in 0..size {
receiver.recv().await.unwrap();
}
agent_sender.send(i).await.unwrap();
});
}
track_peers(&oracle, agents.keys().cloned()).await;
let only_inbound = PrivateKey::from_seed(0).public_key();
for agent in agents.keys() {
if agent == &only_inbound {
continue;
}
for other in agents.keys() {
let result = oracle
.add_link(
agent.clone(),
other.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 0.75,
},
)
.await;
if agent == other {
assert!(matches!(result, Err(Error::LinkingSelf)));
} else {
assert!(result.is_ok());
}
}
}
context
.with_label("agent_sender")
.spawn(|mut context| async move {
let keys = agents.keys().collect::<Vec<_>>();
loop {
let index = context.gen_range(0..keys.len());
let sender = keys[index];
let msg = format!("hello from {sender:?}");
let msg = IoBuf::copy_from_slice(msg.as_bytes());
let mut message_sender = agents.get(sender).unwrap().clone();
let sent = message_sender
.send(Recipients::All, msg.clone(), false)
.await
.unwrap();
if sender == &only_inbound {
assert_eq!(sent.len(), 0);
} else {
assert_eq!(sent.len(), keys.len() - 1);
}
}
});
let mut results = Vec::new();
for _ in 0..size {
results.push(seen_receiver.recv().await.unwrap());
}
(context.auditor().state(), results)
})
}
fn compare_outputs(seeds: u64, size: usize) {
let mut outputs = Vec::new();
for seed in 0..seeds {
outputs.push(simulate_messages(seed, size));
}
for seed in 0..seeds {
let output = simulate_messages(seed, size);
assert_eq!(output, outputs[seed as usize]);
}
}
#[test_group("slow")]
#[test]
fn test_determinism() {
compare_outputs(25, 25);
}
#[test]
fn test_message_too_big() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut agents = HashMap::new();
for i in 0..10 {
let pk = PrivateKey::from_seed(i as u64).public_key();
let (sender, _) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
agents.insert(pk, sender);
}
let keys = agents.keys().collect::<Vec<_>>();
let index = context.gen_range(0..keys.len());
let sender = keys[index];
let mut message_sender = agents.get(sender).unwrap().clone();
let mut msg = vec![0u8; 1024 * 1024 + 1];
context.fill(&mut msg[..]);
let result = message_sender
.send(Recipients::All, msg, false)
.await
.unwrap_err();
assert!(matches!(result, Error::MessageTooLarge(_)));
});
}
#[test]
fn test_linking_self() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk = PrivateKey::from_seed(0).public_key();
oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let result = oracle
.add_link(
pk.clone(),
pk,
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 0.75,
},
)
.await;
assert!(matches!(result, Err(Error::LinkingSelf)));
});
}
#[test]
fn test_duplicate_channel() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let my_pk = PrivateKey::from_seed(0).public_key();
let other_pk = PrivateKey::from_seed(1).public_key();
oracle
.add_link(
my_pk.clone(),
other_pk.clone(),
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.add_link(
other_pk.clone(),
my_pk.clone(),
Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
},
)
.await
.unwrap();
let (mut my_sender, mut my_receiver) = oracle
.control(my_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut other_sender, mut other_receiver) = oracle
.control(other_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [my_pk.clone(), other_pk.clone()]).await;
let msg = IoBuf::from(b"hello");
my_sender
.send(Recipients::One(other_pk.clone()), msg.clone(), false)
.await
.unwrap();
let (from, message) = other_receiver.recv().await.unwrap();
assert_eq!(from, my_pk);
assert_eq!(message, msg.clone());
other_sender
.send(Recipients::One(my_pk.clone()), msg.clone(), false)
.await
.unwrap();
let (from, message) = my_receiver.recv().await.unwrap();
assert_eq!(from, other_pk);
assert_eq!(message, msg);
let (mut my_sender_2, mut my_receiver_2) = oracle
.control(my_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let msg = IoBuf::from(b"hello again");
my_sender_2
.send(Recipients::One(other_pk.clone()), msg.clone(), false)
.await
.unwrap();
let (from, message) = other_receiver.recv().await.unwrap();
assert_eq!(from, my_pk);
assert_eq!(message, msg.clone());
other_sender
.send(Recipients::One(my_pk.clone()), msg.clone(), false)
.await
.unwrap();
let (from, message) = my_receiver_2.recv().await.unwrap();
assert_eq!(from, other_pk);
assert_eq!(message, msg.clone());
assert!(matches!(
my_receiver.recv().await,
Err(Error::NetworkClosed)
));
assert!(my_sender
.send(Recipients::One(other_pk.clone()), msg, false)
.await
.is_ok());
});
}
#[test]
fn test_invalid_success_rate() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let result = oracle
.add_link(
pk1,
pk2,
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.5,
},
)
.await;
assert!(matches!(result, Err(Error::InvalidSuccessRate(_))));
});
}
#[test]
fn test_add_link_before_channel_registration() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
let (network, oracle) = Network::new_with_peers(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
[pk1.clone(), pk2.clone()],
)
.await;
network.start();
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let (mut sender1, _receiver1) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver2) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let msg1 = IoBuf::from(b"link-before-register-1");
sender1
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
let (from, received) = receiver2.recv().await.unwrap();
assert_eq!(from, pk1);
assert_eq!(received, msg1);
});
}
#[test]
fn test_simple_message_delivery() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
let (mut sender1, mut receiver1) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut sender2, mut receiver2) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let _ = oracle
.control(pk1.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
let _ = oracle
.control(pk2.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.add_link(
pk2.clone(),
pk1.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.0,
},
)
.await
.unwrap();
let msg1 = IoBuf::from(b"hello from pk1");
let msg2 = IoBuf::from(b"hello from pk2");
sender1
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
sender2
.send(Recipients::One(pk1.clone()), msg2.clone(), false)
.await
.unwrap();
let (sender, message) = receiver1.recv().await.unwrap();
assert_eq!(sender, pk2);
assert_eq!(message, msg2);
let (sender, message) = receiver2.recv().await.unwrap();
assert_eq!(sender, pk1);
assert_eq!(message, msg1);
});
}
#[test]
fn test_send_wrong_channel() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
let (mut sender1, _) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver2) = oracle
.control(pk2.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
oracle
.add_link(
pk1,
pk2.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let msg = IoBuf::from(b"hello from pk1");
sender1
.send(Recipients::One(pk2), msg, false)
.await
.unwrap();
select! {
_ = receiver2.recv() => {
panic!("unexpected message");
},
_ = context.sleep(Duration::from_secs(1)) => {},
}
});
}
#[test]
fn test_dynamic_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
let (mut sender1, mut receiver1) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut sender2, mut receiver2) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.add_link(
pk2.clone(),
pk1.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.0,
},
)
.await
.unwrap();
let msg1 = IoBuf::from(b"attempt 1: hello from pk1");
let msg2 = IoBuf::from(b"attempt 1: hello from pk2");
sender1
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
sender2
.send(Recipients::One(pk1.clone()), msg2.clone(), false)
.await
.unwrap();
let (sender, message) = receiver1.recv().await.unwrap();
assert_eq!(sender, pk2);
assert_eq!(message, msg2);
let (sender, message) = receiver2.recv().await.unwrap();
assert_eq!(sender, pk1);
assert_eq!(message, msg1);
});
}
#[test]
fn test_dynamic_links() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
let (mut sender1, mut receiver1) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut sender2, mut receiver2) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
let msg1 = IoBuf::from(b"attempt 1: hello from pk1");
let msg2 = IoBuf::from(b"attempt 1: hello from pk2");
sender1
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
sender2
.send(Recipients::One(pk1.clone()), msg2.clone(), false)
.await
.unwrap();
select! {
_ = receiver1.recv() => {
panic!("unexpected message");
},
_ = receiver2.recv() => {
panic!("unexpected message");
},
_ = context.sleep(Duration::from_secs(1)) => {},
}
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.add_link(
pk2.clone(),
pk1.clone(),
Link {
latency: Duration::from_millis(5),
jitter: Duration::from_millis(2),
success_rate: 1.0,
},
)
.await
.unwrap();
let msg1 = IoBuf::from(b"attempt 2: hello from pk1");
let msg2 = IoBuf::from(b"attempt 2: hello from pk2");
sender1
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
sender2
.send(Recipients::One(pk1.clone()), msg2.clone(), false)
.await
.unwrap();
let (sender, message) = receiver1.recv().await.unwrap();
assert_eq!(sender, pk2);
assert_eq!(message, msg2);
let (sender, message) = receiver2.recv().await.unwrap();
assert_eq!(sender, pk1);
assert_eq!(message, msg1);
oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
oracle.remove_link(pk2.clone(), pk1.clone()).await.unwrap();
let msg1 = IoBuf::from(b"attempt 3: hello from pk1");
let msg2 = IoBuf::from(b"attempt 3: hello from pk2");
sender1
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
sender2
.send(Recipients::One(pk1.clone()), msg2.clone(), false)
.await
.unwrap();
select! {
_ = receiver1.recv() => {
panic!("unexpected message");
},
_ = receiver2.recv() => {
panic!("unexpected message");
},
_ = context.sleep(Duration::from_secs(1)) => {},
}
let result = oracle.remove_link(pk1, pk2).await;
assert!(matches!(result, Err(Error::LinkMissing)));
});
}
async fn test_bandwidth_between_peers(
context: &mut deterministic::Context,
oracle: &Oracle<PublicKey, deterministic::Context>,
index: u64,
sender_bps: Option<usize>,
receiver_bps: Option<usize>,
message_size: usize,
expected_duration_ms: u64,
) {
let pk1 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
let pk2 = PrivateKey::from_seed(context.gen::<u64>()).public_key();
let (mut sender, _) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let mut manager = oracle.manager();
manager
.track(index, Set::from_iter_dedup([pk1.clone(), pk2.clone()]))
.await;
oracle
.limit_bandwidth(pk1.clone(), sender_bps, None)
.await
.unwrap();
oracle
.limit_bandwidth(pk2.clone(), None, receiver_bps)
.await
.unwrap();
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let msg = IoBuf::from(vec![42u8; message_size]);
let start = context.current();
sender
.send(Recipients::One(pk2.clone()), msg.clone(), true)
.await
.unwrap();
let (origin, received) = receiver.recv().await.unwrap();
let elapsed = context.current().duration_since(start).unwrap();
assert_eq!(origin, pk1);
assert_eq!(received, msg);
assert!(
elapsed >= Duration::from_millis(expected_duration_ms),
"Message arrived too quickly: {elapsed:?} (expected >= {expected_duration_ms}ms)"
);
assert!(
elapsed < Duration::from_millis(expected_duration_ms + 100),
"Message took too long: {elapsed:?} (expected ~{expected_duration_ms}ms)"
);
}
#[test]
fn test_bandwidth() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
test_bandwidth_between_peers(
&mut context,
&oracle,
0,
Some(1000), Some(1000), 500, 500, )
.await;
test_bandwidth_between_peers(
&mut context,
&oracle,
1,
Some(500), Some(2000), 250, 500, )
.await;
test_bandwidth_between_peers(
&mut context,
&oracle,
2,
Some(2000), Some(500), 250, 500, )
.await;
test_bandwidth_between_peers(
&mut context,
&oracle,
3,
None, Some(1000), 500, 500, )
.await;
test_bandwidth_between_peers(
&mut context,
&oracle,
4,
Some(1000), None, 500, 500, )
.await;
test_bandwidth_between_peers(
&mut context,
&oracle,
5,
None, None, 500, 0, )
.await;
});
}
#[test]
fn test_bandwidth_contention() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
const NUM_PEERS: usize = 100;
const MESSAGE_SIZE: usize = 1000; const EFFECTIVE_BPS: usize = 10_000;
let mut peers = Vec::with_capacity(NUM_PEERS + 1);
let mut senders = Vec::with_capacity(NUM_PEERS + 1);
let mut receivers = Vec::with_capacity(NUM_PEERS + 1);
for i in 0..=NUM_PEERS {
let pk = PrivateKey::from_seed(i as u64).public_key();
let (sender, receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
peers.push(pk);
senders.push(sender);
receivers.push(receiver);
}
track_peers(&oracle, peers.iter().cloned()).await;
for pk in &peers {
oracle
.limit_bandwidth(pk.clone(), Some(EFFECTIVE_BPS), Some(EFFECTIVE_BPS))
.await
.unwrap();
}
for peer in peers.iter().skip(1) {
oracle
.add_link(
peer.clone(),
peers[0].clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.add_link(
peers[0].clone(),
peer.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
let start = context.current();
let msg = IoBuf::from(vec![0u8; MESSAGE_SIZE]);
for peer in peers.iter().skip(1) {
senders[0]
.send(Recipients::One(peer.clone()), msg.clone(), true)
.await
.unwrap();
}
for receiver in receivers.iter_mut().skip(1) {
let (origin, received) = receiver.recv().await.unwrap();
assert_eq!(origin, peers[0]);
assert_eq!(received.len(), MESSAGE_SIZE);
}
let elapsed = context.current().duration_since(start).unwrap();
let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
assert!(
elapsed >= Duration::from_millis(expected_ms as u64),
"One-to-many completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
);
assert!(
elapsed < Duration::from_millis((expected_ms as u64) + 500),
"One-to-many took too long: {elapsed:?} (expected ~{expected_ms}ms)"
);
let start = context.current();
let msg = IoBuf::from(vec![0; MESSAGE_SIZE]);
for mut sender in senders.into_iter().skip(1) {
sender
.send(Recipients::One(peers[0].clone()), msg.clone(), true)
.await
.unwrap();
}
let mut received_from = HashSet::new();
for _ in 1..=NUM_PEERS {
let (origin, received) = receivers[0].recv().await.unwrap();
assert_eq!(received.len(), MESSAGE_SIZE);
assert!(
received_from.insert(origin.clone()),
"Received duplicate from {origin:?}"
);
}
let elapsed = context.current().duration_since(start).unwrap();
let expected_ms = (NUM_PEERS * MESSAGE_SIZE * 1000) / EFFECTIVE_BPS;
assert!(
elapsed >= Duration::from_millis(expected_ms as u64),
"Many-to-one completed too quickly: {elapsed:?} (expected >= {expected_ms}ms)"
);
assert!(
elapsed < Duration::from_millis((expected_ms as u64) + 500),
"Many-to-one took too long: {elapsed:?} (expected ~{expected_ms}ms)"
);
assert_eq!(received_from.len(), NUM_PEERS);
for peer in peers.iter().skip(1) {
assert!(received_from.contains(peer));
}
});
}
#[test]
fn test_message_ordering() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let (mut sender, _) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::from_millis(50),
jitter: Duration::from_millis(40),
success_rate: 1.0,
},
)
.await
.unwrap();
let messages = vec![
IoBuf::from(b"message 1"),
IoBuf::from(b"message 2"),
IoBuf::from(b"message 3"),
IoBuf::from(b"message 4"),
IoBuf::from(b"message 5"),
];
for msg in messages.clone() {
sender
.send(Recipients::One(pk2.clone()), msg, true)
.await
.unwrap();
}
for expected_msg in messages {
let (origin, received_msg) = receiver.recv().await.unwrap();
assert_eq!(origin, pk1);
assert_eq!(received_msg, expected_msg);
}
})
}
#[test]
fn test_high_latency_message_blocks_followup() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let (mut sender, _) = oracle.control(pk1.clone()).register(0, TEST_QUOTA).await.unwrap();
let (_, mut receiver) = oracle.control(pk2.clone()).register(0, TEST_QUOTA).await.unwrap();
track_peers(&oracle, [pk1.clone(), pk2.clone()]).await;
const BPS: usize = 1_000;
oracle
.limit_bandwidth(pk1.clone(), Some(BPS), None)
.await
.unwrap();
oracle
.limit_bandwidth(pk2.clone(), None, Some(BPS))
.await
.unwrap();
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::from_millis(5_000),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let slow = IoBuf::from(vec![0u8; 1_000]);
sender
.send(Recipients::One(pk2.clone()), slow.clone(), true)
.await
.unwrap();
oracle.remove_link(pk1.clone(), pk2.clone()).await.unwrap();
oracle
.add_link(
pk1.clone(),
pk2.clone(),
Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let fast = IoBuf::from(vec![1u8; 1_000]);
sender
.send(Recipients::One(pk2.clone()), fast.clone(), true)
.await
.unwrap();
let start = context.current();
let (origin1, message1) = receiver.recv().await.unwrap();
assert_eq!(origin1, pk1);
assert_eq!(message1, slow);
let first_elapsed = context.current().duration_since(start).unwrap();
let (origin2, message2) = receiver.recv().await.unwrap();
let second_elapsed = context.current().duration_since(start).unwrap();
assert_eq!(origin2, pk1);
assert_eq!(message2, fast);
let egress_time = Duration::from_secs(1);
let slow_latency = Duration::from_millis(5_000);
let expected_first = egress_time + slow_latency;
let tolerance = Duration::from_millis(10);
assert!(
first_elapsed >= expected_first.saturating_sub(tolerance)
&& first_elapsed <= expected_first + tolerance,
"slow message arrived outside expected window: {first_elapsed:?} (expected {expected_first:?} ± {tolerance:?})"
);
assert!(
second_elapsed >= first_elapsed,
"fast message arrived before slow transmission completed"
);
let arrival_gap = second_elapsed
.checked_sub(first_elapsed)
.expect("timestamps ordered");
assert!(
arrival_gap >= egress_time.saturating_sub(tolerance)
&& arrival_gap <= egress_time + tolerance,
"next arrival deviated from transmit duration (gap = {arrival_gap:?}, expected {egress_time:?} ± {tolerance:?})"
);
})
}
#[test]
fn test_many_to_one_bandwidth_sharing() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut senders = Vec::new();
let mut sender_txs = Vec::new();
for i in 0..10 {
let sender = ed25519::PrivateKey::from_seed(i).public_key();
senders.push(sender.clone());
let (tx, _) = oracle
.control(sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
sender_txs.push(tx);
oracle
.limit_bandwidth(sender.clone(), Some(10_000), None)
.await
.unwrap();
}
let receiver = ed25519::PrivateKey::from_seed(100).public_key();
let (_, mut receiver_rx) = oracle
.control(receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
oracle
.limit_bandwidth(receiver.clone(), None, Some(100_000))
.await
.unwrap();
for sender in &senders {
oracle
.add_link(
sender.clone(),
receiver.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
let start = context.current();
for (i, mut tx) in sender_txs.into_iter().enumerate() {
let receiver_clone = receiver.clone();
let msg = IoBuf::from(vec![i as u8; 10_000]);
tx.send(Recipients::One(receiver_clone), msg, true)
.await
.unwrap();
}
for i in 0..10 {
let (_, _msg) = receiver_rx.recv().await.unwrap();
let recv_time = context.current().duration_since(start).unwrap();
assert!(
recv_time >= Duration::from_millis(950)
&& recv_time <= Duration::from_millis(1100),
"Message {i} received at {recv_time:?}, expected ~1s",
);
}
});
}
#[test]
fn test_one_to_many_fast_sender() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let sender = ed25519::PrivateKey::from_seed(0).public_key();
let (sender_tx, _) = oracle
.control(sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.limit_bandwidth(sender.clone(), Some(100_000), None)
.await
.unwrap();
let mut receivers = Vec::new();
let mut receiver_rxs = Vec::new();
for i in 0..10 {
let receiver = ed25519::PrivateKey::from_seed(i + 1).public_key();
receivers.push(receiver.clone());
let (_, rx) = oracle
.control(receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
receiver_rxs.push(rx);
oracle
.limit_bandwidth(receiver.clone(), None, Some(10_000))
.await
.unwrap();
oracle
.add_link(
sender.clone(),
receiver.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
track_peers(
&oracle,
core::iter::once(sender.clone()).chain(receivers.iter().cloned()),
)
.await;
let start = context.current();
for (i, receiver) in receivers.iter().enumerate() {
let mut sender_tx = sender_tx.clone();
let receiver_clone = receiver.clone();
let msg = IoBuf::from(vec![i as u8; 10_000]);
sender_tx
.send(Recipients::One(receiver_clone), msg, true)
.await
.unwrap();
}
for (i, mut rx) in receiver_rxs.into_iter().enumerate() {
let (_, msg) = rx.recv().await.unwrap();
assert_eq!(msg.as_ref()[0], i as u8);
let recv_time = context.current().duration_since(start).unwrap();
assert!(
recv_time >= Duration::from_millis(950)
&& recv_time <= Duration::from_millis(1100),
"Receiver {i} received at {recv_time:?}, expected ~1s",
);
}
});
}
#[test]
fn test_many_slow_senders_to_fast_receiver() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut senders = Vec::new();
let mut sender_txs = Vec::new();
for i in 0..10 {
let sender = ed25519::PrivateKey::from_seed(i).public_key();
senders.push(sender.clone());
let (tx, _) = oracle
.control(sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
sender_txs.push(tx);
oracle
.limit_bandwidth(sender.clone(), Some(1_000), None)
.await
.unwrap();
}
let receiver = ed25519::PrivateKey::from_seed(100).public_key();
let (_, mut receiver_rx) = oracle
.control(receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
oracle
.limit_bandwidth(receiver.clone(), None, Some(10_000))
.await
.unwrap();
for sender in &senders {
oracle
.add_link(
sender.clone(),
receiver.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
let start = context.current();
for (i, mut tx) in sender_txs.into_iter().enumerate() {
let receiver_clone = receiver.clone();
let msg = IoBuf::from(vec![i as u8; 1_000]);
tx.send(Recipients::One(receiver_clone), msg, true)
.await
.unwrap();
}
for i in 0..10 {
let (_, _msg) = receiver_rx.recv().await.unwrap();
let recv_time = context.current().duration_since(start).unwrap();
assert!(
recv_time >= Duration::from_millis(950)
&& recv_time <= Duration::from_millis(1100),
"Message {i} received at {recv_time:?}, expected ~1s",
);
}
});
}
#[test]
fn test_dynamic_bandwidth_allocation_staggered() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut senders = Vec::new();
let mut sender_txs = Vec::new();
for i in 0..3 {
let sender = ed25519::PrivateKey::from_seed(i).public_key();
senders.push(sender.clone());
let (tx, _) = oracle
.control(sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
sender_txs.push(tx);
oracle
.limit_bandwidth(sender.clone(), Some(30_000), None)
.await
.unwrap();
}
let receiver = ed25519::PrivateKey::from_seed(100).public_key();
let (_, mut receiver_rx) = oracle
.control(receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
oracle
.limit_bandwidth(receiver.clone(), None, Some(30_000))
.await
.unwrap();
for sender in &senders {
oracle
.add_link(
sender.clone(),
receiver.clone(),
Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
let start = context.current();
let mut tx0 = sender_txs[0].clone();
let rx_clone = receiver.clone();
context.clone().spawn(move |_| async move {
let msg = IoBuf::from(vec![0u8; 30_000]);
tx0.send(Recipients::One(rx_clone), msg, true)
.await
.unwrap();
});
let mut tx1 = sender_txs[1].clone();
let rx_clone = receiver.clone();
context.clone().spawn(move |context| async move {
context.sleep(Duration::from_millis(500)).await;
let msg = IoBuf::from(vec![1u8; 30_000]);
tx1.send(Recipients::One(rx_clone), msg, true)
.await
.unwrap();
});
let mut tx2 = sender_txs[2].clone();
let rx_clone = receiver.clone();
context.clone().spawn(move |context| async move {
context.sleep(Duration::from_millis(1500)).await;
let msg = IoBuf::from(vec![2u8; 15_000]);
tx2.send(Recipients::One(rx_clone), msg, true)
.await
.unwrap();
});
let (_, msg0) = receiver_rx.recv().await.unwrap();
assert_eq!(msg0.as_ref()[0], 0);
let t0 = context.current().duration_since(start).unwrap();
assert!(
t0 >= Duration::from_millis(1490) && t0 <= Duration::from_millis(1600),
"Message 0 received at {t0:?}, expected ~1.5s",
);
let (_, msg_a) = receiver_rx.recv().await.unwrap();
let t_a = context.current().duration_since(start).unwrap();
let (_, msg_b) = receiver_rx.recv().await.unwrap();
let t_b = context.current().duration_since(start).unwrap();
let (msg1, t1, msg2, t2) = if msg_a.as_ref()[0] == 1 {
(msg_a, t_a, msg_b, t_b)
} else {
(msg_b, t_b, msg_a, t_a)
};
assert_eq!(msg1.as_ref()[0], 1);
assert_eq!(msg2.as_ref()[0], 2);
assert!(
t1 >= Duration::from_millis(1500) && t1 <= Duration::from_millis(2600),
"Message 1 received at {t1:?}, expected between 1.5s-2.6s",
);
assert!(
t2 >= Duration::from_millis(1500) && t2 <= Duration::from_millis(2600),
"Message 2 received at {t2:?}, expected between 1.5s-2.6s",
);
});
}
#[test]
fn test_dynamic_bandwidth_varied_sizes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut senders = Vec::new();
let mut sender_txs = Vec::new();
for i in 0..3 {
let sender = ed25519::PrivateKey::from_seed(i).public_key();
senders.push(sender.clone());
let (tx, _) = oracle
.control(sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
sender_txs.push(tx);
oracle
.limit_bandwidth(sender.clone(), None, None)
.await
.unwrap();
}
let receiver = ed25519::PrivateKey::from_seed(100).public_key();
let (_, mut receiver_rx) = oracle
.control(receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, senders.iter().cloned().chain([receiver.clone()])).await;
oracle
.limit_bandwidth(receiver.clone(), None, Some(30_000))
.await
.unwrap();
for sender in &senders {
oracle
.add_link(
sender.clone(),
receiver.clone(),
Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
let start = context.current();
let sizes = [10_000, 20_000, 30_000];
for (i, (mut tx, size)) in sender_txs.into_iter().zip(sizes.iter()).enumerate() {
let rx_clone = receiver.clone();
let msg_size = *size;
let msg = IoBuf::from(vec![i as u8; msg_size]);
tx.send(Recipients::One(rx_clone), msg, true).await.unwrap();
}
let mut messages = Vec::new();
for _ in 0..3 {
let (_, msg) = receiver_rx.recv().await.unwrap();
let t = context.current().duration_since(start).unwrap();
messages.push((msg.as_ref()[0] as usize, msg.len(), t));
}
assert_eq!(messages.len(), 3);
let max_time = messages.iter().map(|&(_, _, t)| t).max().unwrap();
assert!(
max_time >= Duration::from_millis(2000),
"Total time {max_time:?} should be at least 2s for 60KB at 30KB/s",
);
});
}
#[test]
fn test_bandwidth_pipe_reservation_duration() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let sender = PrivateKey::from_seed(1).public_key();
let receiver = PrivateKey::from_seed(2).public_key();
let (sender_tx, _) = oracle
.control(sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver_rx) = oracle
.control(receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [sender.clone(), receiver.clone()]).await;
oracle
.limit_bandwidth(sender.clone(), Some(1000), None)
.await
.unwrap();
oracle
.limit_bandwidth(receiver.clone(), None, Some(1000))
.await
.unwrap();
oracle
.add_link(
sender.clone(),
receiver.clone(),
Link {
latency: Duration::from_secs(1), jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let start = context.current();
for i in 0..3 {
let mut sender_tx = sender_tx.clone();
let receiver = receiver.clone();
let msg = IoBuf::from(vec![i; 500]);
sender_tx
.send(Recipients::One(receiver), msg, false)
.await
.unwrap();
}
let mut receive_times = Vec::new();
for i in 0..3 {
let (_, received) = receiver_rx.recv().await.unwrap();
receive_times.push(context.current().duration_since(start).unwrap());
assert_eq!(received.as_ref()[0], i);
}
for (i, time) in receive_times.iter().enumerate() {
let expected_min = (i as u64 * 500) + 1500;
let expected_max = expected_min + 100;
assert!(
*time >= Duration::from_millis(expected_min)
&& *time < Duration::from_millis(expected_max),
"Message {} should arrive at ~{}ms, got {:?}",
i + 1,
expected_min,
time
);
}
});
}
#[test]
fn test_dynamic_bandwidth_affects_new_transfers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk_sender = PrivateKey::from_seed(1).public_key();
let pk_receiver = PrivateKey::from_seed(2).public_key();
let (mut sender_tx, _) = oracle
.control(pk_sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver_rx) = oracle
.control(pk_receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
oracle
.add_link(
pk_sender.clone(),
pk_receiver.clone(),
Link {
latency: Duration::from_millis(1), jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.limit_bandwidth(pk_sender.clone(), Some(10_000), None)
.await
.unwrap();
oracle
.limit_bandwidth(pk_receiver.clone(), None, Some(10_000))
.await
.unwrap();
let msg1 = IoBuf::from(vec![1u8; 20_000]); let start_time = context.current();
sender_tx
.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
.await
.unwrap();
let (_sender, received_msg1) = receiver_rx.recv().await.unwrap();
let msg1_time = context.current().duration_since(start_time).unwrap();
assert_eq!(received_msg1.len(), 20_000);
assert!(
msg1_time >= Duration::from_millis(1999)
&& msg1_time <= Duration::from_millis(2010),
"First message should take ~2s, got {msg1_time:?}",
);
oracle
.limit_bandwidth(pk_sender.clone(), Some(2_000), None)
.await
.unwrap();
let msg2 = IoBuf::from(vec![2u8; 10_000]); let msg2_start = context.current();
sender_tx
.send(Recipients::One(pk_receiver.clone()), msg2.clone(), false)
.await
.unwrap();
let (_sender, received_msg2) = receiver_rx.recv().await.unwrap();
let msg2_time = context.current().duration_since(msg2_start).unwrap();
assert_eq!(received_msg2.len(), 10_000);
assert!(
msg2_time >= Duration::from_millis(4999)
&& msg2_time <= Duration::from_millis(5010),
"Second message should take ~5s at reduced bandwidth, got {msg2_time:?}",
);
});
}
#[test]
fn test_zero_receiver_ingress_bandwidth() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk_sender = PrivateKey::from_seed(1).public_key();
let pk_receiver = PrivateKey::from_seed(2).public_key();
let (mut sender_tx, _) = oracle
.control(pk_sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver_rx) = oracle
.control(pk_receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
oracle
.add_link(
pk_sender.clone(),
pk_receiver.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.limit_bandwidth(pk_receiver.clone(), None, Some(0))
.await
.unwrap();
let msg1 = IoBuf::from(vec![1u8; 20_000]); let sent = sender_tx
.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0], pk_receiver);
select! {
_ = receiver_rx.recv() => {
panic!("unexpected message");
},
_ = context.sleep(Duration::from_secs(10)) => {},
}
oracle
.limit_bandwidth(pk_receiver.clone(), None, None)
.await
.unwrap();
select! {
_ = receiver_rx.recv() => {},
_ = context.sleep(Duration::from_secs(1)) => {
panic!("timeout");
},
}
});
}
#[test]
fn test_zero_sender_egress_bandwidth() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let pk_sender = PrivateKey::from_seed(1).public_key();
let pk_receiver = PrivateKey::from_seed(2).public_key();
let (mut sender_tx, _) = oracle
.control(pk_sender.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver_rx) = oracle
.control(pk_receiver.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
track_peers(&oracle, [pk_sender.clone(), pk_receiver.clone()]).await;
oracle
.add_link(
pk_sender.clone(),
pk_receiver.clone(),
Link {
latency: Duration::ZERO,
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.limit_bandwidth(pk_sender.clone(), Some(0), None)
.await
.unwrap();
let msg1 = IoBuf::from(vec![1u8; 20_000]); let sent = sender_tx
.send(Recipients::One(pk_receiver.clone()), msg1.clone(), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0], pk_receiver);
select! {
_ = receiver_rx.recv() => {
panic!("unexpected message");
},
_ = context.sleep(Duration::from_secs(10)) => {},
}
oracle
.limit_bandwidth(pk_sender.clone(), None, None)
.await
.unwrap();
select! {
_ = receiver_rx.recv() => {},
_ = context.sleep(Duration::from_secs(1)) => {
panic!("timeout");
},
}
});
}
#[test]
fn register_peer_set() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let mut manager = oracle.manager();
assert_eq!(manager.peer_set(0).await, None);
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
manager
.track(0xFF, Set::try_from([pk1.clone(), pk2.clone()]).unwrap())
.await;
assert_eq!(
manager.peer_set(0xFF).await.unwrap(),
TrackedPeers::primary(Set::try_from([pk1, pk2]).unwrap())
);
});
}
#[test]
fn test_socket_manager() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
let mut manager = oracle.socket_manager();
manager
.track(
1,
Map::<_, Address>::try_from([
(pk1.clone(), addr1.clone()),
(pk2.clone(), addr2.clone()),
])
.unwrap(),
)
.await;
let peer_set = manager.peer_set(1).await.expect("peer set missing");
let keys: Vec<_> = Vec::from(peer_set.primary.clone());
assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
let mut subscription = manager.subscribe().await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
let latest_keys: Vec<_> = Vec::from(update.latest.primary.clone());
assert_eq!(latest_keys, vec![pk1.clone(), pk2.clone()]);
assert!(update.latest.secondary.is_empty());
let all_primary_keys: Vec<_> = Vec::from(update.all.primary.clone());
assert_eq!(all_primary_keys, vec![pk1.clone(), pk2.clone()]);
assert!(update.all.secondary.is_empty());
manager
.track(
2,
Map::<_, Address>::try_from([(pk2.clone(), addr2)]).unwrap(),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 2);
let latest_keys: Vec<_> = Vec::from(update.latest.primary);
assert_eq!(latest_keys, vec![pk2.clone()]);
assert!(update.latest.secondary.is_empty());
let all_primary_keys: Vec<_> = Vec::from(update.all.primary);
assert_eq!(all_primary_keys, vec![pk1, pk2]);
assert!(update.all.secondary.is_empty());
});
}
#[test]
fn test_manager_track_accepts_tracked_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let mut manager = oracle.manager();
manager
.track(
7,
TrackedPeers::new(
Set::try_from([pk1.clone()]).unwrap(),
Set::try_from([pk2]).unwrap(),
),
)
.await;
assert_eq!(
manager.peer_set(7).await.unwrap(),
TrackedPeers::new(
Set::try_from([pk1]).unwrap(),
Set::try_from([PrivateKey::from_seed(2).public_key()]).unwrap(),
)
);
});
}
#[test]
fn test_manager_track_tracked_peers_overlap_primary_wins() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let pk3 = PrivateKey::from_seed(3).public_key();
let mut manager = oracle.manager();
manager
.track(
9,
TrackedPeers::new(
Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
Set::try_from([pk2.clone(), pk3.clone()]).unwrap(),
),
)
.await;
assert_eq!(
manager.peer_set(9).await.unwrap(),
TrackedPeers::new(
Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
Set::try_from([pk3.clone()]).unwrap(),
)
);
let mut subscription = manager.subscribe().await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 9);
assert!(update.latest.primary.position(&pk2).is_some());
assert!(update.latest.secondary.position(&pk2).is_none());
assert!(update.latest.secondary.position(&pk3).is_some());
assert!(update.all.secondary.position(&pk2).is_none());
assert!(update.all.primary.position(&pk2).is_some());
});
}
#[test]
fn test_socket_manager_track_accepts_addressable_tracked_peers() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let addr1: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
let addr2: Address = SocketAddr::from(([127, 0, 0, 1], 4001)).into();
let mut manager = oracle.socket_manager();
manager
.track(
7,
AddressableTrackedPeers::new(
Map::<_, Address>::try_from([(pk1.clone(), addr1)]).unwrap(),
Map::<_, Address>::try_from([(pk2, addr2)]).unwrap(),
),
)
.await;
assert_eq!(
manager.peer_set(7).await.unwrap(),
TrackedPeers::new(
Set::try_from([pk1]).unwrap(),
Set::try_from([PrivateKey::from_seed(2).public_key()]).unwrap(),
)
);
});
}
#[test]
fn test_socket_manager_track_addressable_overlap_primary_wins() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk = PrivateKey::from_seed(1).public_key();
let addr_primary: Address = SocketAddr::from(([127, 0, 0, 1], 4000)).into();
let addr_secondary: Address = SocketAddr::from(([127, 0, 0, 1], 5000)).into();
let mut manager = oracle.socket_manager();
let mut subscription = manager.subscribe().await;
manager
.track(
11,
AddressableTrackedPeers::new(
Map::<_, Address>::try_from([(pk.clone(), addr_primary.clone())]).unwrap(),
Map::<_, Address>::try_from([(pk.clone(), addr_secondary)]).unwrap(),
),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 11);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.secondary.is_empty());
assert!(update.all.secondary.is_empty());
assert_eq!(update.latest.primary, Set::try_from([pk.clone()]).unwrap());
});
}
#[test]
fn test_socket_manager_with_asymmetric_addresses() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let addr1 = Address::Asymmetric {
ingress: Ingress::Socket(SocketAddr::from(([10, 0, 0, 1], 8080))),
egress: SocketAddr::from(([192, 168, 1, 1], 9090)),
};
let addr2 = Address::Asymmetric {
ingress: Ingress::Dns {
host: hostname!("node2.example.com"),
port: 8080,
},
egress: SocketAddr::from(([192, 168, 1, 2], 9090)),
};
let mut manager = oracle.socket_manager();
manager
.track(
1,
Map::<_, Address>::try_from([(pk1.clone(), addr1), (pk2.clone(), addr2)])
.unwrap(),
)
.await;
let peer_set = manager.peer_set(1).await.expect("peer set missing");
let keys: Vec<_> = Vec::from(peer_set.primary);
assert_eq!(keys, vec![pk1.clone(), pk2.clone()]);
let mut subscription = manager.subscribe().await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
let latest_keys: Vec<_> = Vec::from(update.latest.primary);
assert_eq!(latest_keys, vec![pk1, pk2]);
assert!(update.latest.secondary.is_empty());
});
}
#[test]
fn test_peer_set_window_management() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2), },
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let pk3 = PrivateKey::from_seed(3).public_key();
let pk4 = PrivateKey::from_seed(4).public_key();
let mut manager = oracle.manager();
manager
.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap())
.await;
let (mut sender1, _receiver1) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut sender2, _receiver2) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut sender3, _receiver3) = oracle
.control(pk3.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_mut_sender4, _receiver4) = oracle
.control(pk4.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
for peer_a in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
for peer_b in &[pk1.clone(), pk2.clone(), pk3.clone(), pk4.clone()] {
if peer_a != peer_b {
oracle
.add_link(
peer_a.clone(),
peer_b.clone(),
Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
}
}
}
let sent = sender1
.send(Recipients::One(pk2.clone()), IoBuf::from(b"msg1"), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
let sent = sender1
.send(Recipients::One(pk3.clone()), IoBuf::from(b"msg2"), false)
.await
.unwrap();
assert_eq!(sent.len(), 0);
manager
.track(2, Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap())
.await;
let sent = sender1
.send(Recipients::One(pk3.clone()), IoBuf::from(b"msg3"), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
manager
.track(3, Set::try_from(vec![pk3.clone(), pk4.clone()]).unwrap())
.await;
let sent = sender2
.send(Recipients::One(pk1.clone()), IoBuf::from(b"msg4"), false)
.await
.unwrap();
assert_eq!(sent.len(), 0);
let sent = sender2
.send(Recipients::One(pk3.clone()), IoBuf::from(b"msg5"), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
let sent = sender3
.send(Recipients::One(pk4.clone()), IoBuf::from(b"msg6"), false)
.await
.unwrap();
assert_eq!(sent.len(), 1);
let peer_set_2 = manager.peer_set(2).await.unwrap();
assert!(peer_set_2.primary.position(&pk2).is_some());
assert!(peer_set_2.primary.position(&pk3).is_some());
let peer_set_3 = manager.peer_set(3).await.unwrap();
assert!(peer_set_3.primary.position(&pk3).is_some());
assert!(peer_set_3.primary.position(&pk4).is_some());
assert!(manager.peer_set(1).await.is_none());
});
}
#[test]
fn test_sender_removed_from_peer_set_drops_message() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut manager = oracle.manager();
let mut subscription = manager.subscribe().await;
let sender_pk = PrivateKey::from_seed(1).public_key();
let recipient_pk = PrivateKey::from_seed(2).public_key();
manager
.track(
1,
Set::try_from(vec![sender_pk.clone(), recipient_pk.clone()]).unwrap(),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
let (mut sender, _) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender2, mut receiver) = oracle
.control(recipient_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(
sender_pk.clone(),
recipient_pk.clone(),
Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
},
)
.await
.unwrap();
let initial_msg = IoBuf::from(b"tracked");
let sent = sender
.send(
Recipients::One(recipient_pk.clone()),
initial_msg.clone(),
false,
)
.await
.unwrap();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0], recipient_pk);
let (_pk, received) = receiver.recv().await.unwrap();
assert_eq!(received, initial_msg.clone());
let other_pk = PrivateKey::from_seed(3).public_key();
manager
.track(
2,
Set::try_from(vec![recipient_pk.clone(), other_pk]).unwrap(),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 2);
let sent = sender
.send(
Recipients::One(recipient_pk.clone()),
IoBuf::from(b"untracked"),
false,
)
.await
.unwrap();
assert!(sent.is_empty());
select! {
_ = receiver.recv() => {
panic!("unexpected message");
},
_ = context.sleep(Duration::from_secs(10)) => {},
}
manager
.track(
3,
Set::try_from(vec![sender_pk.clone(), recipient_pk.clone()]).unwrap(),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 3);
let sent = sender
.send(
Recipients::One(recipient_pk.clone()),
initial_msg.clone(),
false,
)
.await
.unwrap();
assert_eq!(sent.len(), 1);
assert_eq!(sent[0], recipient_pk);
let (_pk, received) = receiver.recv().await.unwrap();
assert_eq!(received, initial_msg);
});
}
#[test]
fn test_subscribe_to_peer_sets() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
},
);
network.start();
let mut manager = oracle.manager();
let mut subscription = manager.subscribe().await;
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let pk3 = PrivateKey::from_seed(3).public_key();
manager
.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap())
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
assert_eq!(
update.latest.primary,
Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap()
);
assert!(update.latest.secondary.is_empty());
assert_eq!(
update.all.primary,
Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap()
);
assert!(update.all.secondary.is_empty());
manager
.track(2, Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap())
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 2);
assert_eq!(
update.latest.primary,
Set::try_from(vec![pk2.clone(), pk3.clone()]).unwrap()
);
assert!(update.latest.secondary.is_empty());
assert_eq!(
update.all.primary,
vec![pk1.clone(), pk2.clone(), pk3.clone()]
.try_into()
.unwrap()
);
assert!(update.all.secondary.is_empty());
manager
.track(3, Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap())
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 3);
assert_eq!(
update.latest.primary,
Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
);
assert!(update.latest.secondary.is_empty());
assert_eq!(
update.all.primary,
vec![pk1.clone(), pk2.clone(), pk3.clone()]
.try_into()
.unwrap()
);
assert!(update.all.secondary.is_empty());
manager
.track(4, Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap())
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 4);
assert_eq!(
update.latest.primary,
Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
);
assert!(update.latest.secondary.is_empty());
assert_eq!(
update.all.primary,
Set::try_from(vec![pk1.clone(), pk3.clone()]).unwrap()
);
assert!(update.all.secondary.is_empty());
});
}
#[test]
fn test_multiple_subscriptions() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let mut manager = oracle.manager();
let mut subscription1 = manager.subscribe().await;
let mut subscription2 = manager.subscribe().await;
let mut subscription3 = manager.subscribe().await;
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
manager
.track(1, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap())
.await;
let update1 = subscription1.recv().await.unwrap();
let update2 = subscription2.recv().await.unwrap();
let update3 = subscription3.recv().await.unwrap();
assert_eq!(update1.index, 1);
assert_eq!(update2.index, 1);
assert_eq!(update3.index, 1);
drop(subscription2);
manager
.track(2, Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap())
.await;
let update1 = subscription1.recv().await.unwrap();
let update3 = subscription3.recv().await.unwrap();
assert_eq!(update1.index, 2);
assert_eq!(update3.index, 2);
});
}
#[test]
fn test_subscription_includes_self_when_registered() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
},
);
network.start();
let self_pk = PrivateKey::from_seed(0).public_key();
let other_pk = PrivateKey::from_seed(1).public_key();
let (_sender, _receiver) = oracle
.control(self_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let mut manager = oracle.manager();
let mut subscription = manager.subscribe().await;
manager
.track(1, Set::try_from(vec![other_pk.clone()]).unwrap())
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 1);
assert_eq!(update.latest.primary.len(), 1);
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary.len(), 1);
assert!(update.all.secondary.is_empty());
assert!(
update.latest.primary.position(&self_pk).is_none(),
"latest primary set should not include self"
);
assert!(
update.latest.primary.position(&other_pk).is_some(),
"latest primary set should include other"
);
assert!(
update.all.primary.position(&self_pk).is_none(),
"peer set should not include self"
);
assert!(
update.all.primary.position(&other_pk).is_some(),
"peer set should include other"
);
manager
.track(
2,
Set::try_from(vec![self_pk.clone(), other_pk.clone()]).unwrap(),
)
.await;
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 2);
assert_eq!(update.latest.primary.len(), 2);
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary.len(), 2);
assert!(update.all.secondary.is_empty());
assert!(
update.latest.primary.position(&self_pk).is_some(),
"latest primary set should include self"
);
assert!(
update.latest.primary.position(&other_pk).is_some(),
"latest primary set should include other"
);
assert!(
update.all.primary.position(&self_pk).is_some(),
"peer set should include self"
);
assert!(
update.all.primary.position(&other_pk).is_some(),
"peer set should include other"
);
});
}
#[test]
fn test_rate_limiting() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let network_context = context.with_label("network");
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
let (network, oracle) =
Network::new_with_peers(network_context.clone(), cfg, [pk1.clone(), pk2.clone()])
.await;
network.start();
let restrictive_quota = Quota::per_second(NZU32!(1));
let control1 = oracle.control(pk1.clone());
let (mut sender, _) = control1.register(0, restrictive_quota).await.unwrap();
let control2 = oracle.control(pk2.clone());
let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await
.unwrap();
oracle.add_link(pk2.clone(), pk1, link).await.unwrap();
let msg1 = IoBuf::from(b"message1");
let result1 = sender
.send(Recipients::One(pk2.clone()), msg1.clone(), false)
.await
.unwrap();
assert_eq!(result1.len(), 1, "first message should be sent");
let (_, received1) = receiver.recv().await.unwrap();
assert_eq!(received1, msg1);
let msg2 = IoBuf::from(b"message2");
let result2 = sender
.send(Recipients::One(pk2.clone()), msg2.clone(), false)
.await
.unwrap();
assert_eq!(
result2.len(),
0,
"second message should be rate-limited (skipped)"
);
context.sleep(Duration::from_secs(1)).await;
let msg3 = IoBuf::from(b"message3");
let result3 = sender
.send(Recipients::One(pk2.clone()), msg3.clone(), false)
.await
.unwrap();
assert_eq!(result3.len(), 1, "third message should be sent after wait");
let (_, received3) = receiver.recv().await.unwrap();
assert_eq!(received3, msg3);
});
}
#[test]
fn test_operations_after_shutdown_do_not_panic() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let network_context = context.with_label("network");
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
let (network, oracle) =
Network::new_with_peers(network_context.clone(), cfg, [pk1.clone(), pk2.clone()])
.await;
let handle = network.start();
let mut manager = oracle.manager();
let control1 = oracle.control(pk1.clone());
let (mut sender, _receiver) = control1.register(0, TEST_QUOTA).await.unwrap();
let link = ingress::Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await
.unwrap();
handle.abort();
context.sleep(Duration::from_millis(100)).await;
let msg = IoBuf::from(b"test");
let result = sender.send(Recipients::One(pk2.clone()), msg, false).await;
assert!(
result.is_err() || result.unwrap().is_empty(),
"send after shutdown should fail or return empty"
);
manager
.track(1, Set::try_from([pk1.clone()]).unwrap())
.await;
let _ = manager.peer_set(0).await;
let _ = manager.subscribe().await;
let _ = oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await;
let _ = oracle.remove_link(pk1.clone(), pk2.clone()).await;
let _ = oracle.blocked().await;
let _ = control1.register(1, TEST_QUOTA).await;
});
}
fn clean_shutdown(seed: u64) {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let cfg = Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let network_context = context.with_label("network");
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
let (network, oracle) =
Network::new_with_peers(network_context, cfg, [pk1.clone(), pk2.clone()]).await;
let handle = network.start();
let control1 = oracle.control(pk1.clone());
let control2 = oracle.control(pk2.clone());
let (mut sender, _) = control1.register(0, TEST_QUOTA).await.unwrap();
let (_, mut receiver) = control2.register(0, TEST_QUOTA).await.unwrap();
let link = ingress::Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(pk2.clone(), pk1.clone(), link)
.await
.unwrap();
context.sleep(Duration::from_millis(100)).await;
let running_before = count_running_tasks(&context, "network");
assert!(
running_before > 0,
"at least one network task should be running"
);
let msg = IoBuf::from(b"test_message");
let result = sender
.send(Recipients::One(pk2.clone()), msg.clone(), false)
.await
.unwrap();
assert_eq!(result.len(), 1, "message should be sent");
let (_, received) = receiver.recv().await.unwrap();
assert_eq!(received, msg, "message should be received");
handle.abort();
let _ = handle.await;
context.sleep(Duration::from_millis(100)).await;
let running_after = count_running_tasks(&context, "network");
assert_eq!(
running_after, 0,
"all network tasks should be stopped, but {running_after} still running"
);
});
}
#[test]
fn test_clean_shutdown() {
for seed in 0..25 {
clean_shutdown(seed);
}
}
#[test]
fn test_socket_manager_overwrite() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(1).public_key();
let pk2 = PrivateKey::from_seed(2).public_key();
let _pk3 = PrivateKey::from_seed(3).public_key();
let mut socket_manager = oracle.socket_manager();
let addr: Address = "127.0.0.1:8000".parse::<SocketAddr>().unwrap().into();
socket_manager
.track(
0,
Map::<PublicKey, Address>::try_from([
(
pk1.clone(),
"127.0.0.1:8001".parse::<SocketAddr>().unwrap().into(),
),
(
pk2.clone(),
"127.0.0.1:8002".parse::<SocketAddr>().unwrap().into(),
),
])
.unwrap(),
)
.await;
socket_manager
.overwrite([(pk1.clone(), addr.clone())].try_into().unwrap())
.await;
});
}
#[test]
fn test_subscribe_returns_current_peer_set() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let (network, oracle) = Network::new(
context.with_label("network"),
Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
},
);
network.start();
let pk1 = PrivateKey::from_seed(0).public_key();
let pk2 = PrivateKey::from_seed(1).public_key();
let peers = ordered::Set::try_from(vec![pk1.clone(), pk2.clone()]).unwrap();
let mut manager = oracle.manager();
Manager::track(&mut manager, 0, peers.clone()).await;
let mut subscription = Provider::subscribe(&mut manager).await;
let update = subscription
.try_recv()
.expect("current peer set should be available immediately after subscribe");
assert_eq!(update.index, 0);
assert_eq!(update.latest.primary, peers);
assert!(update.latest.secondary.is_empty());
});
}
}