use super::{
ingress::{self, Oracle},
metrics,
transmitter::{self, Completion},
Error,
};
use crate::{
utils::{
limited::{CheckedSender as LimitedCheckedSender, Connected, LimitedSender},
PeerSetsAtIndex as PeerSetsAtIndexBase,
},
Channel, Message as NetworkMessage, PeerSetUpdate, Recipients, TrackedPeers,
UnlimitedSender as _,
};
use commonware_actor::{Feedback, Unreliable};
use commonware_codec::{DecodeExt, FixedSize};
use commonware_cryptography::PublicKey;
use commonware_macros::select_loop;
use commonware_runtime::{
spawn_cell,
telemetry::metrics::{CounterFamily, MetricsExt as _},
Clock, ContextCell, Handle, IoBuf, IoBufs, Listener as _, Metrics, Network as RNetwork, Quota,
Spawner,
};
use commonware_stream::utils::codec::{recv_frame, send_frame};
use commonware_utils::{
channel::{fallible::FallibleExt, mpsc, oneshot, ring},
ordered::Set,
NZUsize, TryCollect,
};
use either::Either;
use futures::{future, Sink};
use rand::Rng;
use rand_distr::{Distribution, Normal};
use std::{
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
fmt::Debug,
net::{IpAddr, Ipv4Addr, SocketAddr},
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use tracing::{debug, error, trace, warn};
type PeerSetsAtIndex<P> = PeerSetsAtIndexBase<Set<P>, Set<P>>;
type Task<P> = (Channel, P, Recipients<P>, IoBuf);
struct RegistrationGuard {
active: Arc<AtomicBool>,
}
impl Drop for RegistrationGuard {
fn drop(&mut self) {
self.active.store(false, Ordering::Release);
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[must_use]
pub enum SplitTarget {
None,
Primary,
Secondary,
Both,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[must_use]
pub enum SplitOrigin {
Primary,
Secondary,
}
pub trait SplitForwarder<P: PublicKey>:
Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>> + Send + Sync + Clone + 'static
{
}
impl<P: PublicKey, F> SplitForwarder<P> for F where
F: Fn(SplitOrigin, &Recipients<P>, &IoBuf) -> Option<Recipients<P>>
+ Send
+ Sync
+ Clone
+ 'static
{
}
pub trait SplitRouter<P: PublicKey>:
Fn(&NetworkMessage<P>) -> SplitTarget + Send + Sync + 'static
{
}
impl<P: PublicKey, F> SplitRouter<P> for F where
F: Fn(&NetworkMessage<P>) -> SplitTarget + Send + Sync + 'static
{
}
#[derive(Clone, Copy, Default)]
struct PeerRefCounts {
primary: usize,
secondary: usize,
}
pub struct Config {
pub max_size: u32,
pub disconnect_on_block: bool,
pub tracked_peer_sets: NonZeroUsize,
}
pub struct Network<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> {
context: ContextCell<E>,
max_size: u32,
disconnect_on_block: bool,
next_addr: SocketAddr,
ingress: mpsc::UnboundedReceiver<ingress::Message<P, E>>,
ingress_sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
links: HashMap<(P, P), Link>,
peers: BTreeMap<P, Peer<P>>,
peer_sets: BTreeMap<u64, PeerSetsAtIndex<P>>,
peer_ref_counts: BTreeMap<P, PeerRefCounts>,
tracked_peer_sets: NonZeroUsize,
blocks: BTreeSet<(P, P)>,
transmitter: transmitter::State<P>,
subscribers: Vec<mpsc::UnboundedSender<PeerSetUpdate<P>>>,
peer_subscribers: Vec<(P, ring::Sender<Vec<P>>)>,
received_messages: CounterFamily<metrics::Message>,
sent_messages: CounterFamily<metrics::Message>,
}
impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
pub fn new(mut context: E, cfg: Config) -> (Self, Oracle<P, E>) {
let (oracle_mailbox, oracle_receiver) = mpsc::unbounded_channel();
let sent_messages = context.family("messages_sent", "messages sent");
let received_messages = context.family("messages_received", "messages received");
let next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(context.next_u32())), 0);
(
Self {
context: ContextCell::new(context),
max_size: cfg.max_size,
disconnect_on_block: cfg.disconnect_on_block,
tracked_peer_sets: cfg.tracked_peer_sets,
next_addr,
ingress: oracle_receiver,
ingress_sender: oracle_mailbox.clone(),
links: HashMap::new(),
peers: BTreeMap::new(),
peer_sets: BTreeMap::new(),
peer_ref_counts: BTreeMap::new(),
blocks: BTreeSet::new(),
transmitter: transmitter::State::new(),
subscribers: Vec::new(),
peer_subscribers: Vec::new(),
received_messages,
sent_messages,
},
Oracle::new(oracle_mailbox),
)
}
pub async fn new_with_peers<I>(context: E, cfg: Config, peers: I) -> (Self, Oracle<P, E>)
where
I: IntoIterator<Item = P>,
{
Self::new_with_split_peers(context, cfg, peers, std::iter::empty()).await
}
pub async fn new_with_split_peers<I, J>(
context: E,
cfg: Config,
primary: I,
secondary: J,
) -> (Self, Oracle<P, E>)
where
I: IntoIterator<Item = P>,
J: IntoIterator<Item = P>,
{
let (mut network, oracle) = Self::new(context, cfg);
network
.register_tracked_peer_set(
0,
TrackedPeers::new(
Set::from_iter_dedup(primary),
Set::from_iter_dedup(secondary),
),
)
.await;
(network, oracle)
}
async fn register_tracked_peer_set(&mut self, id: u64, peers: TrackedPeers<P>) -> bool {
let primary = peers.primary;
let secondary = peers.secondary;
let tracked_peer_sets = self.tracked_peer_sets;
if self.peer_sets.contains_key(&id) {
warn!(id, "peer set already exists");
return false;
}
if let Some((last, _)) = self.peer_sets.last_key_value() {
if id <= *last {
warn!(
new_id = id,
old_id = last,
"attempted to register peer set with non-monotonically increasing ID"
);
return false;
}
}
for public_key in primary.iter() {
self.ensure_peer_exists(public_key).await;
self.peer_ref_counts
.entry(public_key.clone())
.or_default()
.primary += 1;
}
let secondary_filtered = Set::from_iter_dedup(
secondary
.iter()
.filter(|s| primary.position(s).is_none())
.cloned(),
);
for public_key in secondary_filtered.iter() {
self.ensure_peer_exists(public_key).await;
self.peer_ref_counts
.entry(public_key.clone())
.or_default()
.secondary += 1;
}
self.peer_sets.insert(
id,
PeerSetsAtIndex {
primary: primary.clone(),
secondary: secondary_filtered,
},
);
while self.peer_sets.len() > tracked_peer_sets.get() {
let (removed_index, sets) = self.peer_sets.pop_first().unwrap();
debug!(index = removed_index, "removed oldest tracked peer sets");
for public_key in sets.primary.iter() {
let counts = self
.peer_ref_counts
.get_mut(public_key)
.expect("reference map out of sync with peer sets");
counts.primary = counts
.primary
.checked_sub(1)
.expect("reference count underflow");
if counts.primary == 0 && counts.secondary == 0 {
self.peer_ref_counts.remove(public_key);
debug!(
?public_key,
"removed peer no longer in any tracked peer set"
);
}
}
for public_key in sets.secondary.iter() {
let counts = self
.peer_ref_counts
.get_mut(public_key)
.expect("reference map out of sync with peer sets");
counts.secondary = counts
.secondary
.checked_sub(1)
.expect("reference count underflow");
if counts.primary == 0 && counts.secondary == 0 {
self.peer_ref_counts.remove(public_key);
debug!(
?public_key,
"removed peer no longer in any tracked peer set"
);
}
}
}
true
}
fn get_next_socket(&mut self) -> SocketAddr {
let result = self.next_addr;
match self.next_addr.port().checked_add(1) {
Some(port) => {
self.next_addr.set_port(port);
}
None => {
let ip = match self.next_addr.ip() {
IpAddr::V4(ipv4) => ipv4,
_ => unreachable!(),
};
let next_ip = Ipv4Addr::to_bits(ip).wrapping_add(1);
self.next_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(next_ip)), 0);
}
}
result
}
async fn handle_ingress(&mut self, message: ingress::Message<P, E>) {
fn send_result<T: std::fmt::Debug>(
result: oneshot::Sender<Result<T, Error>>,
value: Result<T, Error>,
) {
let success = value.is_ok();
if let Err(e) = result.send(value) {
error!(?e, "failed to send result to oracle (ok = {})", success);
}
}
match message {
ingress::Message::Send {
channel,
origin,
recipients,
message,
..
} => {
self.handle_task((channel, origin, recipients, message));
}
ingress::Message::Track { id, peers } => {
if !self.register_tracked_peer_set(id, peers).await {
return;
}
let update = self
.latest_update()
.expect("latest update missing after successful track");
self.subscribers
.retain(|subscriber| subscriber.send_lossy(update.clone()));
self.broadcast_peer_list();
}
ingress::Message::Register {
channel,
public_key,
quota,
result,
} => {
let _ = self.ensure_peer_exists(&public_key).await;
let clock = self
.context
.child("rate_limiter")
.with_attribute("channel", channel)
.with_attribute("peer", &public_key);
let (sender, guard) = Sender::new(
public_key.clone(),
channel,
self.max_size,
self.ingress_sender.clone(),
self.connected_peers_for(&public_key),
clock,
quota,
);
let peer = self.peers.get_mut(&public_key).unwrap();
let receiver = match peer.register(channel, guard).await {
Ok(receiver) => Receiver { receiver },
Err(err) => return send_result(result, Err(err)),
};
send_result(result, Ok((sender, receiver)))
}
ingress::Message::PeerSet { id, response } => {
let _ = response.send(
self.peer_sets
.get(&id)
.map(|e| TrackedPeers::new(e.primary.clone(), e.secondary.clone())),
);
}
ingress::Message::Subscribe { response } => {
let (sender, receiver) = mpsc::unbounded_channel();
if let Some(update) = self.latest_update() {
sender.send_lossy(update);
}
self.subscribers.push(sender);
let _ = response.send(receiver);
}
ingress::Message::SubscribePeers { exclude, sender } => {
self.subscribe_connected(exclude, sender);
}
ingress::Message::LimitBandwidth {
public_key,
egress_cap,
ingress_cap,
result,
} => {
let _ = self.ensure_peer_exists(&public_key).await;
let now = self.context.current();
let completions = self
.transmitter
.limit(now, &public_key, egress_cap, ingress_cap);
self.process_completions(completions);
let _ = result.send(());
}
ingress::Message::AddLink {
sender,
receiver,
sampler,
success_rate,
result,
} => {
let _ = self.ensure_peer_exists(&sender).await;
let (receiver_socket, _) = self.ensure_peer_exists(&receiver).await;
let key = (sender.clone(), receiver.clone());
if self.links.contains_key(&key) {
return send_result(result, Err(Error::LinkExists));
}
let link = Link::new(
self.context.as_mut(),
sender,
receiver,
receiver_socket,
sampler,
success_rate,
self.max_size,
self.received_messages.clone(),
);
self.links.insert(key, link);
send_result(result, Ok(()))
}
ingress::Message::RemoveLink {
sender,
receiver,
result,
} => {
match self.links.remove(&(sender, receiver)) {
Some(_) => (),
None => return send_result(result, Err(Error::LinkMissing)),
}
send_result(result, Ok(()))
}
ingress::Message::Block { from, to } => {
self.blocks.insert((from, to));
}
ingress::Message::Blocked { result } => {
send_result(result, Ok(self.blocks.iter().cloned().collect()))
}
}
}
async fn ensure_peer_exists(&mut self, public_key: &P) -> (SocketAddr, bool) {
if !self.peers.contains_key(public_key) {
let socket = self.get_next_socket();
let peer = Peer::new(
self.context.child("peer"),
public_key.clone(),
socket,
self.max_size,
)
.await;
self.peers.insert(public_key.clone(), peer);
(socket, true)
} else {
(self.peers.get(public_key).unwrap().socket, false)
}
}
fn subscribe_connected(&mut self, exclude: P, mut sender: ring::Sender<Vec<P>>) {
let peers = self.connected_peers_for(&exclude);
if Pin::new(&mut sender).start_send(peers).is_ok() {
self.peer_subscribers.push((exclude, sender));
}
}
fn broadcast_peer_list(&mut self) {
if self.peer_subscribers.is_empty() {
return;
}
let peers: Vec<P> = self.peer_ref_counts.keys().cloned().collect();
let mut live_subscribers = Vec::with_capacity(self.peer_subscribers.len());
for (exclude, mut subscriber) in self.peer_subscribers.drain(..) {
let peer_list = if peers.contains(&exclude) {
peers
.iter()
.filter(|peer| *peer != &exclude)
.cloned()
.collect()
} else {
Vec::new()
};
if Pin::new(&mut subscriber).start_send(peer_list).is_ok() {
live_subscribers.push((exclude, subscriber));
}
}
self.peer_subscribers = live_subscribers;
}
fn aggregate_peer_membership(&self) -> TrackedPeers<P> {
let primary = self
.peer_ref_counts
.iter()
.filter(|(_, c)| c.primary > 0)
.map(|(k, _)| k.clone())
.try_collect()
.expect("BTreeMap keys are unique");
let secondary = Set::from_iter_dedup(
self.peer_ref_counts
.iter()
.filter(|(_, c)| c.secondary > 0 && c.primary == 0)
.map(|(k, _)| k.clone()),
);
TrackedPeers::new(primary, secondary)
}
fn latest_update(&self) -> Option<PeerSetUpdate<P>> {
let (index, entry) = self.peer_sets.last_key_value()?;
Some(PeerSetUpdate {
index: *index,
latest: TrackedPeers::new(entry.primary.clone(), entry.secondary.clone()),
all: self.aggregate_peer_membership(),
})
}
fn connected_peers_for(&self, sender: &P) -> Vec<P> {
if !self.peer_ref_counts.contains_key(sender) {
return Vec::new();
}
self.peer_ref_counts
.keys()
.filter(|peer| *peer != sender)
.cloned()
.collect()
}
fn is_connectable(&self, peer: &P) -> bool {
self.peer_ref_counts.contains_key(peer)
}
}
impl<E: RNetwork + Spawner + Rng + Clock + Metrics, P: PublicKey> Network<E, P> {
fn process_completions(&mut self, completions: Vec<Completion<P>>) {
for completion in completions {
let Some(deliver_at) = completion.deliver_at else {
trace!(
origin = ?completion.origin,
recipient = ?completion.recipient,
"message dropped before delivery",
);
continue;
};
let key = (completion.origin.clone(), completion.recipient.clone());
let Some(link) = self.links.get_mut(&key) else {
trace!(
origin = ?completion.origin,
recipient = ?completion.recipient,
"missing link for completion",
);
continue;
};
if let Err(err) = link.send(completion.channel, completion.message, deliver_at) {
error!(?err, "failed to send");
}
}
}
fn handle_task(&mut self, task: Task<P>) {
let (channel, origin, recipients, message) = task;
if !self.is_connectable(&origin) {
warn!(
?origin,
reason = "not primary or secondary",
"dropping message"
);
return;
}
let recipients = match recipients {
Recipients::All => self.connected_peers_for(&origin),
Recipients::Some(keys) => keys,
Recipients::One(key) => vec![key],
};
let now = self.context.current();
for recipient in recipients {
if recipient == origin {
trace!(?recipient, reason = "self", "dropping message");
continue;
}
if !self.is_connectable(&recipient) {
trace!(
?origin,
?recipient,
reason = "not primary or secondary",
"dropping message"
);
continue;
}
let o_r = (origin.clone(), recipient.clone());
let r_o = (recipient.clone(), origin.clone());
if self.disconnect_on_block
&& (self.blocks.contains(&o_r) || self.blocks.contains(&r_o))
{
trace!(?origin, ?recipient, reason = "blocked", "dropping message");
continue;
}
let Some(link) = self.links.get_mut(&o_r) else {
trace!(?origin, ?recipient, reason = "no link", "dropping message");
continue;
};
self.sent_messages
.get_or_create(&metrics::Message::new(&origin, &recipient, channel))
.inc();
let latency = Duration::from_millis(link.sampler.sample(self.context.as_mut()) as u64);
let should_deliver = self.context.gen_bool(link.success_rate);
let completions = self.transmitter.enqueue(
now,
origin.clone(),
recipient.clone(),
channel,
message.clone(),
latency,
should_deliver,
);
self.process_completions(completions);
}
}
fn queue_task(
high: &mut VecDeque<Task<P>>,
low: &mut VecDeque<Task<P>>,
task: Task<P>,
priority: bool,
) {
if priority {
high.push_back(task);
} else {
low.push_back(task);
}
}
fn handle_tasks(&mut self, high: &mut VecDeque<Task<P>>, low: &mut VecDeque<Task<P>>) {
while let Some(task) = high.pop_front() {
self.handle_task(task);
}
while let Some(task) = low.pop_front() {
self.handle_task(task);
}
}
async fn handle_ordered_ingress(
&mut self,
mut message: ingress::Message<P, E>,
high: &mut VecDeque<Task<P>>,
low: &mut VecDeque<Task<P>>,
) {
loop {
match message {
ingress::Message::Send {
channel,
origin,
recipients,
message,
priority,
} => {
Self::queue_task(high, low, (channel, origin, recipients, message), priority);
}
message => {
self.handle_tasks(high, low);
self.handle_ingress(message).await;
return;
}
}
message = match self.ingress.try_recv() {
Ok(message) => message,
Err(_) => {
self.handle_tasks(high, low);
return;
}
};
}
}
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run())
}
async fn run(mut self) {
let mut high = VecDeque::new();
let mut low = VecDeque::new();
select_loop! {
self.context,
on_start => {
let tick = match self.transmitter.next() {
Some(when) => Either::Left(self.context.sleep_until(when)),
None => Either::Right(future::pending()),
};
},
on_stopped => {},
_ = tick => {
let now = self.context.current();
let completions = self.transmitter.advance(now);
self.process_completions(completions);
},
Some(message) = self.ingress.recv() else break => {
self.handle_ordered_ingress(message, &mut high, &mut low)
.await;
},
}
}
}
pub struct ConnectedPeerProvider<P: PublicKey, E: Clock> {
me: P,
ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
peers: Vec<P>,
_clock: std::marker::PhantomData<E>,
}
impl<P: PublicKey, E: Clock> Clone for ConnectedPeerProvider<P, E> {
fn clone(&self) -> Self {
Self {
me: self.me.clone(),
ingress: self.ingress.clone(),
peers: self.peers.clone(),
_clock: std::marker::PhantomData,
}
}
}
impl<P: PublicKey, E: Clock> ConnectedPeerProvider<P, E> {
const fn new(
me: P,
ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
peers: Vec<P>,
) -> Self {
Self {
me,
ingress,
peers,
_clock: std::marker::PhantomData,
}
}
}
impl<P: PublicKey, E: Clock> Connected for ConnectedPeerProvider<P, E> {
type PublicKey = P;
fn peers(&self) -> Vec<Self::PublicKey> {
self.peers.clone()
}
fn subscribe(&self) -> ring::Receiver<Vec<Self::PublicKey>> {
let (sender, receiver) = ring::channel(NZUsize!(1));
let _ = self.ingress.send_lossy(ingress::Message::SubscribePeers {
exclude: self.me.clone(),
sender,
});
receiver
}
}
pub struct UnlimitedSender<P: PublicKey, E: Clock> {
me: P,
channel: Channel,
max_size: u32,
sender: mpsc::UnboundedSender<ingress::Message<P, E>>,
active: Arc<AtomicBool>,
}
impl<P: PublicKey, E: Clock> Clone for UnlimitedSender<P, E> {
fn clone(&self) -> Self {
Self {
me: self.me.clone(),
channel: self.channel,
max_size: self.max_size,
sender: self.sender.clone(),
active: self.active.clone(),
}
}
}
impl<P: PublicKey, E: Clock> crate::UnlimitedSender for UnlimitedSender<P, E> {
type PublicKey = P;
fn send(
&mut self,
recipients: Recipients<P>,
message: impl Into<IoBufs> + Send,
priority: bool,
) -> Unreliable<Feedback> {
let message = message.into().coalesce();
assert!(
message.len() <= self.max_size as usize,
"message too large: {} > {}",
message.len(),
self.max_size
);
if !self.active.load(Ordering::Acquire) || self.sender.is_closed() {
return Unreliable::new(Feedback::Closed);
}
if self.sender.send_lossy(ingress::Message::Send {
channel: self.channel,
origin: self.me.clone(),
recipients,
message,
priority,
}) {
Unreliable::new(Feedback::Ok)
} else {
Unreliable::new(Feedback::Closed)
}
}
}
pub struct Sender<P: PublicKey, E: Clock> {
limited_sender: LimitedSender<E, UnlimitedSender<P, E>, ConnectedPeerProvider<P, E>>,
}
impl<P: PublicKey, E: Clock> Clone for Sender<P, E> {
fn clone(&self) -> Self {
Self {
limited_sender: self.limited_sender.clone(),
}
}
}
impl<P: PublicKey, E: Clock> Debug for Sender<P, E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").finish_non_exhaustive()
}
}
impl<P: PublicKey, E: Clock> Sender<P, E> {
#[allow(clippy::too_many_arguments)]
fn new(
me: P,
channel: Channel,
max_size: u32,
ingress: mpsc::UnboundedSender<ingress::Message<P, E>>,
connected_peers: Vec<P>,
clock: E,
quota: Quota,
) -> (Self, RegistrationGuard) {
let active = Arc::new(AtomicBool::new(true));
let unlimited_sender = UnlimitedSender {
me: me.clone(),
channel,
max_size,
sender: ingress.clone(),
active: active.clone(),
};
let peer_source = ConnectedPeerProvider::new(me, ingress, connected_peers);
let limited_sender = LimitedSender::new(unlimited_sender, quota, clock, peer_source);
(Self { limited_sender }, RegistrationGuard { active })
}
pub fn split_with<F: SplitForwarder<P>>(
self,
forwarder: F,
) -> (SplitSender<P, E, F>, SplitSender<P, E, F>) {
(
SplitSender {
replica: SplitOrigin::Primary,
inner: self.clone(),
forwarder: forwarder.clone(),
},
SplitSender {
replica: SplitOrigin::Secondary,
inner: self,
forwarder,
},
)
}
}
impl<P: PublicKey, E: Clock> crate::LimitedSender for Sender<P, E> {
type PublicKey = P;
type Checked<'a>
= crate::utils::limited::CheckedSender<'a, UnlimitedSender<P, E>>
where
Self: 'a;
fn check(
&mut self,
recipients: Recipients<Self::PublicKey>,
) -> Result<Self::Checked<'_>, SystemTime> {
self.limited_sender.check(recipients)
}
}
pub struct SplitSender<P: PublicKey, E: Clock, F: SplitForwarder<P>> {
replica: SplitOrigin,
inner: Sender<P, E>,
forwarder: F,
}
impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> Clone for SplitSender<P, E, F> {
fn clone(&self) -> Self {
Self {
replica: self.replica,
inner: self.inner.clone(),
forwarder: self.forwarder.clone(),
}
}
}
impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> std::fmt::Debug for SplitSender<P, E, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SplitSender")
.field("replica", &self.replica)
.field("inner", &self.inner)
.finish()
}
}
impl<P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::LimitedSender for SplitSender<P, E, F> {
type PublicKey = P;
type Checked<'a> = SplitCheckedSender<'a, P, E, F>;
fn check(
&mut self,
recipients: Recipients<Self::PublicKey>,
) -> Result<Self::Checked<'_>, SystemTime> {
Ok(SplitCheckedSender {
checked: self.inner.limited_sender.check(recipients.clone())?,
replica: self.replica,
forwarder: self.forwarder.clone(),
recipients,
_phantom: std::marker::PhantomData,
})
}
}
pub struct SplitCheckedSender<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> {
checked: LimitedCheckedSender<'a, UnlimitedSender<P, E>>,
replica: SplitOrigin,
forwarder: F,
recipients: Recipients<P>,
_phantom: std::marker::PhantomData<E>,
}
impl<'a, P: PublicKey, E: Clock, F: SplitForwarder<P>> crate::CheckedSender
for SplitCheckedSender<'a, P, E, F>
{
type PublicKey = P;
fn recipients(&self) -> Vec<Self::PublicKey> {
crate::CheckedSender::recipients(&self.checked)
}
fn send(self, message: impl Into<IoBufs> + Send, priority: bool) -> Unreliable<Feedback> {
let message = message.into().coalesce();
let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else {
return Unreliable::Rejected;
};
self.checked
.into_inner()
.send(recipients, message, priority)
}
}
type MessageReceiver<P> = mpsc::UnboundedReceiver<NetworkMessage<P>>;
type ChannelRegistration<P> = (
Channel,
RegistrationGuard,
oneshot::Sender<MessageReceiver<P>>,
);
#[derive(Debug)]
pub struct Receiver<P: PublicKey> {
receiver: MessageReceiver<P>,
}
impl<P: PublicKey> crate::Receiver for Receiver<P> {
type Error = Error;
type PublicKey = P;
async fn recv(&mut self) -> Result<NetworkMessage<Self::PublicKey>, Error> {
self.receiver.recv().await.ok_or(Error::NetworkClosed)
}
}
impl<P: PublicKey> Receiver<P> {
pub fn split_with<E: Spawner, R: SplitRouter<P>>(
mut self,
context: E,
router: R,
) -> (Self, Self) {
let (primary_tx, primary_rx) = mpsc::unbounded_channel();
let (secondary_tx, secondary_rx) = mpsc::unbounded_channel();
context.spawn(move |_| async move {
while let Some(message) = self.receiver.recv().await {
let direction = router(&message);
match direction {
SplitTarget::None => {}
SplitTarget::Primary => {
if let Err(err) = primary_tx.send(message) {
error!(?err, "failed to send message to primary");
}
}
SplitTarget::Secondary => {
if let Err(err) = secondary_tx.send(message) {
error!(?err, "failed to send message to secondary");
}
}
SplitTarget::Both => {
if let Err(err) = primary_tx.send(message.clone()) {
error!(?err, "failed to send message to primary");
}
if let Err(err) = secondary_tx.send(message) {
error!(?err, "failed to send message to secondary");
}
}
}
if primary_tx.is_closed() && secondary_tx.is_closed() {
break;
}
}
});
(
Self {
receiver: primary_rx,
},
Self {
receiver: secondary_rx,
},
)
}
}
struct Peer<P: PublicKey> {
socket: SocketAddr,
control: mpsc::UnboundedSender<ChannelRegistration<P>>,
}
impl<P: PublicKey> Peer<P> {
async fn new<E: Spawner + RNetwork + Metrics + Clock>(
context: E,
public_key: P,
socket: SocketAddr,
max_size: u32,
) -> Self {
let (control_sender, mut control_receiver): (
mpsc::UnboundedSender<ChannelRegistration<P>>,
_,
) = mpsc::unbounded_channel();
let (inbox_sender, mut inbox_receiver) = mpsc::unbounded_channel();
context.child("router").spawn(|context| async move {
let mut mailboxes = HashMap::new();
select_loop! {
context,
on_stopped => {},
Some((channel, guard, result_tx)) = control_receiver.recv() else break => {
let (receiver_tx, receiver_rx) = mpsc::unbounded_channel();
if mailboxes.insert(channel, (receiver_tx, guard)).is_some() {
warn!(?public_key, ?channel, "overwriting existing channel");
}
result_tx.send(receiver_rx).unwrap();
},
Some((channel, message)) = inbox_receiver.recv() else break => {
match mailboxes.get_mut(&channel) {
Some((receiver_tx, _)) => {
if let Err(err) = receiver_tx.send(message) {
debug!(?err, "failed to send message to mailbox");
}
}
None => {
trace!(
recipient = ?public_key,
channel,
reason = "missing channel",
"dropping message",
);
}
}
},
}
});
let (ready_tx, ready_rx) = oneshot::channel();
context.child("listener").spawn(move |context| async move {
let mut listener = context.bind(socket).await.unwrap();
let _ = ready_tx.send(());
while let Ok((_, _, mut stream)) = listener.accept().await {
context.child("receiver").spawn({
let inbox_sender = inbox_sender.clone();
move |_| async move {
let dialer = match recv_frame(&mut stream, max_size).await {
Ok(data) => data,
Err(_) => {
error!("failed to receive public key from dialer");
return;
}
};
let Ok(dialer) = P::decode(dialer.coalesce()) else {
error!("received public key is invalid");
return;
};
while let Ok(data) = recv_frame(&mut stream, max_size).await {
let data = data.coalesce();
let channel = Channel::from_be_bytes(
data.as_ref()[..Channel::SIZE].try_into().unwrap(),
);
let message = data.slice(Channel::SIZE..);
if let Err(err) =
inbox_sender.send((channel, (dialer.clone(), message)))
{
debug!(?err, "failed to send message to mailbox");
break;
}
}
}
});
}
});
let _ = ready_rx.await;
Self {
socket,
control: control_sender,
}
}
async fn register(
&mut self,
channel: Channel,
guard: RegistrationGuard,
) -> Result<MessageReceiver<P>, Error> {
let (result_tx, result_rx) = oneshot::channel();
self.control
.send((channel, guard, result_tx))
.map_err(|_| Error::NetworkClosed)?;
result_rx.await.map_err(|_| Error::NetworkClosed)
}
}
struct Link {
sampler: Normal<f64>,
success_rate: f64,
inbox: mpsc::UnboundedSender<(Channel, IoBuf, SystemTime)>,
}
impl Link {
#[allow(clippy::too_many_arguments)]
fn new<E: Spawner + RNetwork + Clock + Metrics, P: PublicKey>(
context: &mut E,
dialer: P,
receiver: P,
socket: SocketAddr,
sampler: Normal<f64>,
success_rate: f64,
max_size: u32,
received_messages: CounterFamily<metrics::Message>,
) -> Self {
let (inbox, mut outbox) = mpsc::unbounded_channel::<(Channel, IoBuf, SystemTime)>();
context.child("link").spawn(move |context| async move {
let (mut sink, _) = context.dial(socket).await.unwrap();
if let Err(err) = send_frame(&mut sink, dialer.as_ref().to_vec(), max_size).await {
error!(?err, "failed to send public key to listener");
return;
}
while let Some((channel, message, receive_complete_at)) = outbox.recv().await {
context.sleep_until(receive_complete_at).await;
let channel_bytes = channel.to_be_bytes();
let mut data = Vec::with_capacity(channel_bytes.len() + message.len());
data.extend_from_slice(&channel_bytes);
data.extend_from_slice(message.as_ref());
let _ = send_frame(&mut sink, data, max_size).await;
received_messages
.get_or_create(&metrics::Message::new(&dialer, &receiver, channel))
.inc();
}
});
Self {
sampler,
success_rate,
inbox,
}
}
fn send(
&mut self,
channel: Channel,
message: IoBuf,
receive_complete_at: SystemTime,
) -> Result<(), Error> {
self.inbox
.send((channel, message, receive_complete_at))
.map_err(|_| Error::NetworkClosed)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
CheckedSender as _, LimitedSender as _, Manager as _, Provider, Receiver as _, Recipients,
Sender as _, TrackedPeers,
};
use commonware_cryptography::{ed25519, Signer as _};
use commonware_runtime::{deterministic, Quota, Runner as _, Supervisor as _};
use commonware_utils::{ordered::Set, NZUsize};
use futures::FutureExt;
use std::num::NonZeroU32;
const MAX_MESSAGE_SIZE: u32 = 1024 * 1024;
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
async fn send_when_ready(
context: &deterministic::Context,
sender: &mut Sender<ed25519::PublicKey, deterministic::Context>,
recipients: Recipients<ed25519::PublicKey>,
expected_recipients: usize,
message: Vec<u8>,
priority: bool,
) -> SystemTime {
loop {
let checked = sender.check(recipients.clone()).unwrap();
if checked.recipients().len() == expected_recipients {
checked.send(message, priority);
return context.current();
}
context.sleep(Duration::from_millis(1)).await;
}
}
#[test]
fn test_register_and_link() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
let peers = [pk1.clone(), pk2.clone()];
let (network, oracle) =
Network::new_with_peers(context.child("network"), cfg, peers).await;
network.start();
let control = oracle.control(pk1.clone());
control.register(0, TEST_QUOTA).await.unwrap();
control.register(1, TEST_QUOTA).await.unwrap();
let control = oracle.control(pk2.clone());
control.register(0, TEST_QUOTA).await.unwrap();
control.register(1, TEST_QUOTA).await.unwrap();
control.register(1, TEST_QUOTA).await.unwrap();
let link = ingress::Link {
latency: Duration::from_millis(2),
jitter: Duration::from_millis(1),
success_rate: 0.9,
};
oracle
.add_link(pk1.clone(), pk2.clone(), link.clone())
.await
.unwrap();
assert!(matches!(
oracle.add_link(pk1, pk2, link).await,
Err(Error::LinkExists)
));
});
}
#[test]
fn test_new_with_split_peers_seeds_initial_update() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let primary = ed25519::PrivateKey::from_seed(11).public_key();
let secondary = ed25519::PrivateKey::from_seed(12).public_key();
let (network, oracle) = Network::new_with_split_peers(
context.child("network"),
cfg,
[primary.clone()],
[secondary.clone()],
)
.await;
network.start();
let mut manager = oracle.manager();
let peer_set = manager.peer_set(0).await.unwrap();
assert_eq!(peer_set.primary, Set::try_from([primary.clone()]).unwrap());
assert_eq!(
peer_set.secondary,
Set::try_from([secondary.clone()]).unwrap()
);
let mut updates = manager.subscribe().await;
let update = updates.recv().await.unwrap();
assert_eq!(update.index, 0);
assert_eq!(
update.latest.primary,
Set::try_from([primary.clone()]).unwrap()
);
assert_eq!(
update.latest.secondary,
Set::try_from([secondary.clone()]).unwrap()
);
assert_eq!(update.all.primary, Set::try_from([primary]).unwrap());
assert_eq!(update.all.secondary, Set::try_from([secondary]).unwrap());
});
}
#[test]
fn test_split_channel_single() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let twin = ed25519::PrivateKey::from_seed(20).public_key();
let peer_a = ed25519::PrivateKey::from_seed(21).public_key();
let peer_b = ed25519::PrivateKey::from_seed(22).public_key();
let mut manager = oracle.manager();
manager.track(
0,
Set::try_from([twin.clone(), peer_a.clone(), peer_b.clone()]).unwrap(),
);
let (mut peer_a_sender, mut peer_a_recv) = oracle
.control(peer_a.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut peer_b_sender, mut peer_b_recv) = oracle
.control(peer_b.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (twin_sender, twin_receiver) = oracle
.control(twin.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let peer_a_for_router = peer_a.clone();
let peer_b_for_router = peer_b.clone();
let (mut twin_primary_sender, mut twin_secondary_sender) =
twin_sender.split_with(move |origin, _, _| match origin {
SplitOrigin::Primary => Some(Recipients::One(peer_a_for_router.clone())),
SplitOrigin::Secondary => Some(Recipients::One(peer_b_for_router.clone())),
});
let peer_a_for_recv = peer_a.clone();
let peer_b_for_recv = peer_b.clone();
let (mut twin_primary_recv, mut twin_secondary_recv) =
twin_receiver.split_with(context.child("split_receiver"), move |(sender, _)| {
if sender == &peer_a_for_recv {
SplitTarget::Primary
} else if sender == &peer_b_for_recv {
SplitTarget::Secondary
} else {
panic!("unexpected sender");
}
});
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(peer_a.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_a.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(peer_b.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_b.clone(), link.clone())
.await
.unwrap();
peer_a_sender.send(Recipients::One(twin.clone()), b"from_a", false);
peer_b_sender.send(Recipients::One(twin.clone()), b"from_b", false);
twin_primary_sender.send(Recipients::All, b"primary_out", false);
twin_secondary_sender.send(Recipients::All, b"secondary_out", false);
let (sender, payload) = twin_primary_recv.recv().await.unwrap();
assert_eq!(sender, peer_a);
assert_eq!(payload, b"from_a");
let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
assert_eq!(sender, peer_b);
assert_eq!(payload, b"from_b");
let (sender, payload) = peer_a_recv.recv().await.unwrap();
assert_eq!(sender, twin);
assert_eq!(payload, b"primary_out");
let (sender, payload) = peer_b_recv.recv().await.unwrap();
assert_eq!(sender, twin);
assert_eq!(payload, b"secondary_out");
});
}
#[test]
fn test_split_channel_both() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let twin = ed25519::PrivateKey::from_seed(30).public_key();
let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
let mut manager = oracle.manager();
manager.track(0, Set::try_from([twin.clone(), peer_c.clone()]).unwrap());
let (mut peer_c_sender, _peer_c_recv) = oracle
.control(peer_c.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (twin_sender, twin_receiver) = oracle
.control(twin.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_twin_primary_sender, _twin_secondary_sender) =
twin_sender.split_with(|_origin, recipients, _| Some(recipients.clone()));
let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
.split_with(context.child("split_receiver_both"), |_| SplitTarget::Both);
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(peer_c.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_c.clone(), link)
.await
.unwrap();
peer_c_sender.send(Recipients::One(twin.clone()), b"to_both", false);
let (sender, payload) = twin_primary_recv.recv().await.unwrap();
assert_eq!(sender, peer_c);
assert_eq!(payload, b"to_both");
let (sender, payload) = twin_secondary_recv.recv().await.unwrap();
assert_eq!(sender, peer_c);
assert_eq!(payload, b"to_both");
});
}
#[test]
fn test_split_channel_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let twin = ed25519::PrivateKey::from_seed(30).public_key();
let peer_c = ed25519::PrivateKey::from_seed(31).public_key();
let mut manager = oracle.manager();
manager.track(0, Set::try_from([twin.clone(), peer_c.clone()]).unwrap());
let (mut peer_c_sender, _peer_c_recv) = oracle
.control(peer_c.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (twin_sender, twin_receiver) = oracle
.control(twin.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mut twin_primary_sender, mut twin_secondary_sender) =
twin_sender.split_with(|_origin, _, _| None);
let (mut twin_primary_recv, mut twin_secondary_recv) = twin_receiver
.split_with(context.child("split_receiver_both"), |_| SplitTarget::None);
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(peer_c.clone(), twin.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(twin.clone(), peer_c.clone(), link)
.await
.unwrap();
let sent = peer_c_sender.send(Recipients::One(twin.clone()), b"to_both", false);
assert_eq!(sent.len(), 1);
assert_eq!(sent[0], twin);
context.sleep(Duration::from_millis(100)).await;
assert!(twin_primary_recv.recv().now_or_never().is_none());
assert!(twin_secondary_recv.recv().now_or_never().is_none());
let sent = twin_primary_sender.send(Recipients::One(peer_c.clone()), b"to_both", false);
assert!(sent.is_empty());
let sent =
twin_secondary_sender.send(Recipients::One(peer_c.clone()), b"to_both", false);
assert!(sent.is_empty());
});
}
#[test]
fn test_unordered_peer_sets() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
let mut manager = oracle.manager();
let mut subscription = manager.subscribe().await;
manager.track(10, Set::try_from([pk1.clone(), pk2.clone()]).unwrap());
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 10);
assert_eq!(update.latest.primary.len(), 2);
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary.len(), 2);
assert!(update.all.secondary.is_empty());
let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
manager.track(9, Set::try_from([pk3.clone()]).unwrap());
let pk4 = ed25519::PrivateKey::from_seed(4).public_key();
manager.track(11, Set::try_from([pk4.clone()]).unwrap());
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 11);
assert_eq!(update.latest.primary, Set::try_from([pk4.clone()]).unwrap());
assert!(update.latest.secondary.is_empty());
assert_eq!(update.all.primary, Set::try_from([pk1, pk2, pk4]).unwrap());
assert!(update.all.secondary.is_empty());
});
}
#[test]
fn test_peer_set_update_all_cross_index_primary_wins() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let pk_a = ed25519::PrivateKey::from_seed(21).public_key();
let pk_b = ed25519::PrivateKey::from_seed(22).public_key();
let pk_overlap = ed25519::PrivateKey::from_seed(23).public_key();
let pk_sec = ed25519::PrivateKey::from_seed(24).public_key();
let mut manager = oracle.manager();
let mut subscription = manager.subscribe().await;
manager.track(
10,
TrackedPeers::new(
Set::try_from([pk_a.clone(), pk_overlap.clone()]).unwrap(),
Set::default(),
),
);
let _ = subscription.recv().await.unwrap();
manager.track(
11,
TrackedPeers::new(
Set::try_from([pk_b.clone()]).unwrap(),
Set::try_from([pk_overlap.clone(), pk_sec.clone()]).unwrap(),
),
);
let update = subscription.recv().await.unwrap();
assert_eq!(update.index, 11);
assert_eq!(
update.latest.primary,
Set::try_from([pk_b.clone()]).unwrap()
);
assert!(update.latest.secondary.position(&pk_overlap).is_some());
assert!(update.latest.secondary.position(&pk_sec).is_some());
assert!(update.all.primary.position(&pk_a).is_some());
assert!(update.all.primary.position(&pk_b).is_some());
assert!(update.all.primary.position(&pk_overlap).is_some());
assert!(
update.all.secondary.position(&pk_overlap).is_none(),
"aggregate secondary must omit peers who have any primary membership"
);
assert!(update.all.secondary.position(&pk_sec).is_some());
});
}
#[test]
fn test_get_next_socket() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
type PublicKey = ed25519::PublicKey;
let (mut network, _) =
Network::<deterministic::Context, PublicKey>::new(context.child("network"), cfg);
let mut original = network.next_addr;
let next = network.get_next_socket();
assert_eq!(next, original);
let next = network.get_next_socket();
original.set_port(1);
assert_eq!(next, original);
let max_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 0, 255, 255)), 65535);
network.next_addr = max_addr;
let next = network.get_next_socket();
assert_eq!(next, max_addr);
let next = network.get_next_socket();
assert_eq!(
next,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 1, 0, 0)), 0)
);
});
}
#[test]
fn test_fifo_burst_same_recipient() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
let (network, oracle) = Network::new(context.child("network"), cfg);
let network_handle = network.start();
let sender_pk = ed25519::PrivateKey::from_seed(10).public_key();
let recipient_pk = ed25519::PrivateKey::from_seed(11).public_key();
let mut manager = oracle.manager();
manager.track(
0,
Set::try_from([sender_pk.clone(), recipient_pk.clone()]).unwrap(),
);
let (mut sender, _sender_recv) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender2, mut receiver) = oracle
.control(recipient_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.limit_bandwidth(sender_pk.clone(), Some(5_000), None)
.await
.unwrap();
oracle
.limit_bandwidth(recipient_pk.clone(), None, Some(5_000))
.await
.unwrap();
oracle
.add_link(
sender_pk.clone(),
recipient_pk.clone(),
ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
},
)
.await
.unwrap();
const COUNT: usize = 50;
let mut expected = Vec::with_capacity(COUNT);
for i in 0..COUNT {
let msg = vec![i as u8; 64];
sender
.check(Recipients::One(recipient_pk.clone()))
.unwrap()
.send(msg.clone(), false);
expected.push(msg);
}
for expected_msg in expected {
let (_pk, bytes) = receiver.recv().await.unwrap();
assert_eq!(bytes, expected_msg.as_slice());
}
drop(oracle);
drop(sender);
network_handle.abort();
});
}
#[test]
fn test_broadcast_respects_transmit_latency() {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let runner = deterministic::Runner::default();
runner.start(|context| async move {
let (network, oracle) = Network::new(context.child("network"), cfg);
let network_handle = network.start();
let sender_pk = ed25519::PrivateKey::from_seed(42).public_key();
let recipient_a = ed25519::PrivateKey::from_seed(43).public_key();
let recipient_b = ed25519::PrivateKey::from_seed(44).public_key();
let mut manager = oracle.manager();
manager.track(
0,
Set::try_from([sender_pk.clone(), recipient_a.clone(), recipient_b.clone()])
.unwrap(),
);
let (mut sender, _recv_sender) = oracle
.control(sender_pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender2, mut recv_a) = oracle
.control(recipient_a.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_sender3, mut recv_b) = oracle
.control(recipient_b.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.limit_bandwidth(sender_pk.clone(), Some(1_000), None)
.await
.unwrap();
oracle
.limit_bandwidth(recipient_a.clone(), None, Some(1_000))
.await
.unwrap();
oracle
.limit_bandwidth(recipient_b.clone(), None, Some(1_000))
.await
.unwrap();
let link = ingress::Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
oracle
.add_link(sender_pk.clone(), recipient_a.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(sender_pk.clone(), recipient_b.clone(), link)
.await
.unwrap();
let big_msg = vec![7u8; 10_000];
let start = send_when_ready(
&context,
&mut sender,
Recipients::All,
2,
big_msg.clone(),
false,
)
.await;
let (_pk, received_a) = recv_a.recv().await.unwrap();
assert_eq!(received_a, big_msg.as_slice());
let elapsed_a = context.current().duration_since(start).unwrap();
assert!(elapsed_a >= Duration::from_secs(20));
let (_pk, received_b) = recv_b.recv().await.unwrap();
assert_eq!(received_b, big_msg.as_slice());
let elapsed_b = context.current().duration_since(start).unwrap();
assert!(elapsed_b >= Duration::from_secs(20));
assert!(elapsed_a.abs_diff(elapsed_b) <= Duration::from_secs(1));
drop(oracle);
drop(sender);
network_handle.abort();
});
}
#[test]
fn test_overlapping_primary_secondary_no_duplicate_recipients() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(3),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let pk1 = ed25519::PrivateKey::from_seed(1).public_key();
let pk2 = ed25519::PrivateKey::from_seed(2).public_key();
let pk3 = ed25519::PrivateKey::from_seed(3).public_key();
let mut manager = oracle.manager();
manager.track(
0,
TrackedPeers::new(
Set::try_from([pk1.clone(), pk2.clone()]).unwrap(),
Set::try_from([pk2.clone(), pk3.clone()]).unwrap(),
),
);
let mut updates = manager.subscribe().await;
let update = updates.recv().await.unwrap();
assert_eq!(update.index, 0);
assert!(update.latest.primary.position(&pk2).is_some());
assert!(
update.latest.secondary.position(&pk2).is_none(),
"overlap peer must not appear in latest.secondary"
);
assert!(update.latest.secondary.position(&pk3).is_some());
assert!(update.all.primary.position(&pk2).is_some());
assert!(
update.all.secondary.position(&pk2).is_none(),
"aggregate secondary must not list peers who are primary"
);
assert!(update.all.secondary.position(&pk3).is_some());
let link = ingress::Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
};
for (a, b) in [(&pk1, &pk2), (&pk1, &pk3), (&pk2, &pk3)] {
oracle
.add_link(a.clone(), b.clone(), link.clone())
.await
.unwrap();
}
let (mut sender1, _) = oracle
.control(pk1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut recv2) = oracle
.control(pk2.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut recv3) = oracle
.control(pk3.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let msg = vec![42u8; 10];
let checked = sender1.check(Recipients::All).unwrap();
let sent_to = crate::CheckedSender::recipients(&checked);
checked.send(msg.clone(), true);
let pk2_count = sent_to.iter().filter(|pk| *pk == &pk2).count();
assert_eq!(pk2_count, 1, "pk2 received duplicate sends");
assert!(sent_to.iter().any(|pk| pk == &pk3));
context.sleep(Duration::from_millis(10)).await;
let (from2, data2) = recv2.recv().await.unwrap();
assert_eq!(from2, pk1);
assert_eq!(data2, msg.as_slice());
let (from3, data3) = recv3.recv().await.unwrap();
assert_eq!(from3, pk1);
assert_eq!(data3, msg.as_slice());
assert!(recv2.recv().now_or_never().is_none());
});
}
#[test]
fn test_demotion_from_primary_to_secondary() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let pk_x = ed25519::PrivateKey::from_seed(1).public_key();
let pk_y = ed25519::PrivateKey::from_seed(2).public_key();
let mut manager = oracle.manager();
let mut sub = manager.subscribe().await;
manager.track(
0,
TrackedPeers::new(
Set::try_from([pk_x.clone()]).unwrap(),
Set::try_from([pk_y.clone()]).unwrap(),
),
);
let update = sub.recv().await.unwrap();
assert!(update.all.primary.position(&pk_x).is_some());
assert!(update.all.secondary.position(&pk_y).is_some());
manager.track(
1,
TrackedPeers::new(
Set::try_from([pk_y.clone()]).unwrap(),
Set::try_from([pk_x.clone()]).unwrap(),
),
);
let update = sub.recv().await.unwrap();
assert!(update.all.primary.position(&pk_x).is_some());
assert!(update.all.primary.position(&pk_y).is_some());
assert!(update.all.secondary.is_empty());
manager.track(
2,
TrackedPeers::new(
Set::try_from([pk_y.clone()]).unwrap(),
Set::try_from([pk_x.clone()]).unwrap(),
),
);
let update = sub.recv().await.unwrap();
assert!(update.all.primary.position(&pk_y).is_some());
assert!(update.all.secondary.position(&pk_x).is_some());
assert!(update.all.primary.position(&pk_x).is_none());
});
}
#[test]
fn test_secondary_sets_remain_until_eviction() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cfg = Config {
max_size: MAX_MESSAGE_SIZE,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
};
let (network, oracle) = Network::new(context.child("network"), cfg);
network.start();
let primary_0 = ed25519::PrivateKey::from_seed(1).public_key();
let primary_1 = ed25519::PrivateKey::from_seed(2).public_key();
let primary_2 = ed25519::PrivateKey::from_seed(3).public_key();
let secondary_0 = ed25519::PrivateKey::from_seed(4).public_key();
let secondary_1 = ed25519::PrivateKey::from_seed(5).public_key();
let mut manager = oracle.manager();
manager.track(
0,
TrackedPeers::new(
Set::try_from([primary_0.clone()]).unwrap(),
Set::try_from([secondary_0.clone()]).unwrap(),
),
);
manager.track(
1,
TrackedPeers::new(
Set::try_from([primary_1.clone()]).unwrap(),
Set::try_from([secondary_1.clone()]).unwrap(),
),
);
let link = ingress::Link {
latency: Duration::from_millis(1),
jitter: Duration::ZERO,
success_rate: 1.0,
};
oracle
.add_link(primary_1.clone(), secondary_0.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(primary_1.clone(), secondary_1.clone(), link.clone())
.await
.unwrap();
let (mut sender_1, _) = oracle
.control(primary_1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver_0) = oracle
.control(secondary_0.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_, mut receiver_1) = oracle
.control(secondary_1.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let msg_1 = vec![1u8; 8];
sender_1
.check(Recipients::Some(vec![
secondary_0.clone(),
secondary_1.clone(),
]))
.unwrap()
.send(msg_1.clone(), true);
assert_eq!(receiver_0.recv().await.unwrap().1, msg_1.as_slice());
assert_eq!(receiver_1.recv().await.unwrap().1, msg_1.as_slice());
crate::Manager::track(
&mut manager,
2,
TrackedPeers::primary([primary_2.clone()].try_into().unwrap()),
);
oracle
.add_link(primary_2.clone(), secondary_0.clone(), link.clone())
.await
.unwrap();
oracle
.add_link(primary_2.clone(), secondary_1.clone(), link)
.await
.unwrap();
let (mut sender_2, _) = oracle
.control(primary_2)
.register(0, TEST_QUOTA)
.await
.unwrap();
let msg_2 = vec![2u8; 8];
sender_2
.check(Recipients::Some(vec![
secondary_0.clone(),
secondary_1.clone(),
]))
.unwrap()
.send(msg_2.clone(), true);
assert!(receiver_0.recv().now_or_never().is_none());
assert_eq!(receiver_1.recv().await.unwrap().1, msg_2.as_slice());
});
}
}