use crate::{Channel, CheckedSender, LimitedSender, Message, Receiver, Recipients, Sender};
use commonware_codec::{varint::UInt, Encode, Error as CodecError, ReadExt};
use commonware_macros::select_loop;
use commonware_runtime::{spawn_cell, ContextCell, Handle, IoBuf, IoBufs, Spawner};
use commonware_utils::channel::{
fallible::FallibleExt,
mpsc::{self, error::TrySendError},
oneshot,
};
use std::{collections::HashMap, fmt::Debug, time::SystemTime};
use thiserror::Error;
use tracing::debug;
#[derive(Error, Debug)]
pub enum Error {
#[error("subchannel already registered: {0}")]
AlreadyRegistered(Channel),
#[error("muxer is closed")]
Closed,
#[error("recv failed")]
RecvFailed,
}
pub fn parse(mut buf: IoBuf) -> Result<(Channel, IoBuf), CodecError> {
let subchannel: Channel = UInt::read(&mut buf)?.into();
Ok((subchannel, buf))
}
enum Control<R: Receiver> {
Register {
subchannel: Channel,
sender: oneshot::Sender<mpsc::Receiver<Message<R::PublicKey>>>,
},
Deregister {
subchannel: Channel,
},
}
type Routes<P> = HashMap<Channel, mpsc::Sender<Message<P>>>;
type BackupResponse<P> = (Channel, Message<P>);
pub struct Muxer<E: Spawner, S: Sender, R: Receiver> {
context: ContextCell<E>,
sender: S,
receiver: R,
mailbox_size: usize,
control_rx: mpsc::UnboundedReceiver<Control<R>>,
routes: Routes<R::PublicKey>,
backup: Option<mpsc::Sender<BackupResponse<R::PublicKey>>>,
}
impl<E: Spawner, S: Sender, R: Receiver> Muxer<E, S, R> {
pub fn new(context: E, sender: S, receiver: R, mailbox_size: usize) -> (Self, MuxHandle<S, R>) {
Self::builder(context, sender, receiver, mailbox_size).build()
}
pub fn builder(
context: E,
sender: S,
receiver: R,
mailbox_size: usize,
) -> MuxerBuilder<E, S, R> {
let (control_tx, control_rx) = mpsc::unbounded_channel();
let mux = Self {
context: ContextCell::new(context),
sender,
receiver,
mailbox_size,
control_rx,
routes: HashMap::new(),
backup: None,
};
let mux_handle = MuxHandle {
sender: mux.sender.clone(),
control_tx,
};
MuxerBuilder { mux, mux_handle }
}
pub fn start(mut self) -> Handle<Result<(), R::Error>> {
spawn_cell!(self.context, self.run().await)
}
pub async fn run(mut self) -> Result<(), R::Error> {
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping muxer");
},
Some(control) = self.control_rx.recv() else {
return Ok(());
} => match control {
Control::Register { subchannel, sender } => {
if self.routes.contains_key(&subchannel) {
continue;
}
let (tx, rx) = mpsc::channel(self.mailbox_size);
self.routes.insert(subchannel, tx);
let _ = sender.send(rx);
}
Control::Deregister { subchannel } => {
self.routes.remove(&subchannel);
}
},
message = self.receiver.recv() => {
let (pk, bytes) = message?;
let (subchannel, bytes) = match parse(bytes) {
Ok(parsed) => parsed,
Err(_) => {
debug!(?pk, "invalid message: missing subchannel");
continue;
}
};
let Some(sender) = self.routes.get_mut(&subchannel) else {
if let Some(backup) = &mut self.backup {
if let Err(e) = backup.try_send((subchannel, (pk, bytes))) {
debug!(?subchannel, ?e, "failed to send message to backup channel");
}
}
continue;
};
if let Err(e) = sender.try_send((pk, bytes)) {
if matches!(e, TrySendError::Closed(_)) {
self.routes.remove(&subchannel);
debug!(?subchannel, "subchannel receiver dropped, removing route");
} else {
debug!(?subchannel, "subchannel full, dropping message");
}
}
},
}
Ok(())
}
}
#[derive(Clone)]
pub struct MuxHandle<S: Sender, R: Receiver> {
sender: S,
control_tx: mpsc::UnboundedSender<Control<R>>,
}
impl<S: Sender, R: Receiver> MuxHandle<S, R> {
pub async fn register(
&mut self,
subchannel: Channel,
) -> Result<(SubSender<S>, SubReceiver<R>), Error> {
let (tx, rx) = oneshot::channel();
self.control_tx
.send(Control::Register {
subchannel,
sender: tx,
})
.map_err(|_| Error::Closed)?;
let receiver = rx.await.map_err(|_| Error::AlreadyRegistered(subchannel))?;
Ok((
SubSender {
subchannel,
inner: GlobalSender::new(self.sender.clone()),
},
SubReceiver {
receiver,
control_tx: Some(self.control_tx.clone()),
subchannel,
},
))
}
}
#[derive(Clone, Debug)]
pub struct SubSender<S: Sender> {
inner: GlobalSender<S>,
subchannel: Channel,
}
impl<S: Sender> LimitedSender for SubSender<S> {
type PublicKey = S::PublicKey;
type Checked<'a> = CheckedGlobalSender<'a, S>;
async fn check(
&mut self,
recipients: Recipients<Self::PublicKey>,
) -> Result<Self::Checked<'_>, SystemTime> {
self.inner
.check(recipients)
.await
.map(|checked| checked.with_subchannel(self.subchannel))
}
}
pub struct SubReceiver<R: Receiver> {
receiver: mpsc::Receiver<Message<R::PublicKey>>,
control_tx: Option<mpsc::UnboundedSender<Control<R>>>,
subchannel: Channel,
}
impl<R: Receiver> Receiver for SubReceiver<R> {
type Error = Error;
type PublicKey = R::PublicKey;
async fn recv(&mut self) -> Result<Message<Self::PublicKey>, Self::Error> {
self.receiver.recv().await.ok_or(Error::RecvFailed)
}
}
impl<R: Receiver> Debug for SubReceiver<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SubReceiver({})", self.subchannel)
}
}
impl<R: Receiver> Drop for SubReceiver<R> {
fn drop(&mut self) {
let control_tx = self
.control_tx
.take()
.expect("SubReceiver::drop called twice");
control_tx.send_lossy(Control::Deregister {
subchannel: self.subchannel,
});
}
}
#[derive(Clone, Debug)]
pub struct GlobalSender<S: Sender> {
inner: S,
}
impl<S: Sender> GlobalSender<S> {
pub const fn new(inner: S) -> Self {
Self { inner }
}
pub async fn send(
&mut self,
subchannel: Channel,
recipients: Recipients<S::PublicKey>,
payload: impl Into<IoBufs> + Send,
priority: bool,
) -> Result<Vec<S::PublicKey>, <S::Checked<'_> as CheckedSender>::Error> {
match self.check(recipients).await {
Ok(checked) => {
checked
.with_subchannel(subchannel)
.send(payload, priority)
.await
}
Err(_) => Ok(Vec::new()),
}
}
}
impl<S: Sender> LimitedSender for GlobalSender<S> {
type PublicKey = S::PublicKey;
type Checked<'a> = CheckedGlobalSender<'a, S>;
async fn check(
&mut self,
recipients: Recipients<Self::PublicKey>,
) -> Result<Self::Checked<'_>, SystemTime> {
self.inner
.check(recipients)
.await
.map(|checked| CheckedGlobalSender {
subchannel: None,
inner: checked,
})
}
}
pub struct CheckedGlobalSender<'a, S: Sender> {
subchannel: Option<Channel>,
inner: S::Checked<'a>,
}
impl<'a, S: Sender> CheckedGlobalSender<'a, S> {
pub const fn with_subchannel(mut self, subchannel: Channel) -> Self {
self.subchannel = Some(subchannel);
self
}
}
impl<'a, S: Sender> CheckedSender for CheckedGlobalSender<'a, S> {
type PublicKey = S::PublicKey;
type Error = <S::Checked<'a> as CheckedSender>::Error;
async fn send(
self,
message: impl Into<IoBufs> + Send,
priority: bool,
) -> Result<Vec<Self::PublicKey>, Self::Error> {
let subchannel = UInt(self.subchannel.expect("subchannel not set"));
let mut message = message.into();
message.prepend(subchannel.encode().into());
self.inner.send(message, priority).await
}
}
pub trait Builder {
type Output;
fn build(self) -> Self::Output;
}
pub struct MuxerBuilder<E: Spawner, S: Sender, R: Receiver> {
mux: Muxer<E, S, R>,
mux_handle: MuxHandle<S, R>,
}
impl<E: Spawner, S: Sender, R: Receiver> Builder for MuxerBuilder<E, S, R> {
type Output = (Muxer<E, S, R>, MuxHandle<S, R>);
fn build(self) -> Self::Output {
(self.mux, self.mux_handle)
}
}
impl<E: Spawner, S: Sender, R: Receiver> MuxerBuilder<E, S, R> {
pub fn with_backup(mut self) -> MuxerBuilderWithBackup<E, S, R> {
let (tx, rx) = mpsc::channel(self.mux.mailbox_size);
self.mux.backup = Some(tx);
MuxerBuilderWithBackup {
mux: self.mux,
mux_handle: self.mux_handle,
backup_rx: rx,
}
}
pub fn with_global_sender(self) -> MuxerBuilderWithGlobalSender<E, S, R> {
let global_sender = GlobalSender::new(self.mux.sender.clone());
MuxerBuilderWithGlobalSender {
mux: self.mux,
mux_handle: self.mux_handle,
global_sender,
}
}
}
pub struct MuxerBuilderWithBackup<E: Spawner, S: Sender, R: Receiver> {
mux: Muxer<E, S, R>,
mux_handle: MuxHandle<S, R>,
backup_rx: mpsc::Receiver<BackupResponse<R::PublicKey>>,
}
impl<E: Spawner, S: Sender, R: Receiver> MuxerBuilderWithBackup<E, S, R> {
pub fn with_global_sender(self) -> MuxerBuilderAllOpts<E, S, R> {
let global_sender = GlobalSender::new(self.mux.sender.clone());
MuxerBuilderAllOpts {
mux: self.mux,
mux_handle: self.mux_handle,
backup_rx: self.backup_rx,
global_sender,
}
}
}
impl<E: Spawner, S: Sender, R: Receiver> Builder for MuxerBuilderWithBackup<E, S, R> {
type Output = (
Muxer<E, S, R>,
MuxHandle<S, R>,
mpsc::Receiver<BackupResponse<R::PublicKey>>,
);
fn build(self) -> Self::Output {
(self.mux, self.mux_handle, self.backup_rx)
}
}
pub struct MuxerBuilderWithGlobalSender<E: Spawner, S: Sender, R: Receiver> {
mux: Muxer<E, S, R>,
mux_handle: MuxHandle<S, R>,
global_sender: GlobalSender<S>,
}
impl<E: Spawner, S: Sender, R: Receiver> MuxerBuilderWithGlobalSender<E, S, R> {
pub fn with_backup(mut self) -> MuxerBuilderAllOpts<E, S, R> {
let (tx, rx) = mpsc::channel(self.mux.mailbox_size);
self.mux.backup = Some(tx);
MuxerBuilderAllOpts {
mux: self.mux,
mux_handle: self.mux_handle,
backup_rx: rx,
global_sender: self.global_sender,
}
}
}
impl<E: Spawner, S: Sender, R: Receiver> Builder for MuxerBuilderWithGlobalSender<E, S, R> {
type Output = (Muxer<E, S, R>, MuxHandle<S, R>, GlobalSender<S>);
fn build(self) -> Self::Output {
(self.mux, self.mux_handle, self.global_sender)
}
}
pub struct MuxerBuilderAllOpts<E: Spawner, S: Sender, R: Receiver> {
mux: Muxer<E, S, R>,
mux_handle: MuxHandle<S, R>,
backup_rx: mpsc::Receiver<BackupResponse<R::PublicKey>>,
global_sender: GlobalSender<S>,
}
impl<E: Spawner, S: Sender, R: Receiver> Builder for MuxerBuilderAllOpts<E, S, R> {
type Output = (
Muxer<E, S, R>,
MuxHandle<S, R>,
mpsc::Receiver<BackupResponse<R::PublicKey>>,
GlobalSender<S>,
);
fn build(self) -> Self::Output {
(
self.mux,
self.mux_handle,
self.backup_rx,
self.global_sender,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
simulated::{self, Link, Network, Oracle},
Manager as _, Provider as _, Recipients,
};
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Signer,
};
use commonware_macros::{select, test_traced};
use commonware_runtime::{deterministic, IoBuf, Metrics, Quota, Runner};
use commonware_utils::{ordered::Set, NZUsize};
use std::{num::NonZeroU32, time::Duration};
const LINK: Link = Link {
latency: Duration::from_millis(0),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
const CAPACITY: usize = 5usize;
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
fn start_network(context: deterministic::Context) -> Oracle<PublicKey, deterministic::Context> {
let (network, oracle) = Network::new(
context.with_label("network"),
simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
oracle
}
fn pk(seed: u64) -> PublicKey {
PrivateKey::from_seed(seed).public_key()
}
async fn link_bidirectional(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
a: PublicKey,
b: PublicKey,
) {
let mut manager = oracle.manager();
let peers = manager.peer_set(0).await.unwrap_or_default();
manager
.track(
0,
Set::from_iter_dedup(peers.primary.iter().cloned().chain([a.clone(), b.clone()])),
)
.await;
oracle.add_link(a.clone(), b.clone(), LINK).await.unwrap();
oracle.add_link(b, a, LINK).await.unwrap();
}
async fn create_peer(
context: &deterministic::Context,
oracle: &mut Oracle<PublicKey, deterministic::Context>,
seed: u64,
) -> (
PublicKey,
MuxHandle<impl Sender<PublicKey = PublicKey>, impl Receiver<PublicKey = PublicKey>>,
) {
let pubkey = pk(seed);
let (sender, receiver) = oracle
.control(pubkey.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mux, handle) = Muxer::new(context.with_label("mux"), sender, receiver, CAPACITY);
mux.start();
(pubkey, handle)
}
async fn create_peer_with_backup_and_global_sender(
context: &deterministic::Context,
oracle: &mut Oracle<PublicKey, deterministic::Context>,
seed: u64,
) -> (
PublicKey,
MuxHandle<impl Sender<PublicKey = PublicKey>, impl Receiver<PublicKey = PublicKey>>,
mpsc::Receiver<BackupResponse<PublicKey>>,
GlobalSender<simulated::Sender<PublicKey, deterministic::Context>>,
) {
let pubkey = pk(seed);
let (sender, receiver) = oracle
.control(pubkey.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (mux, handle, backup, global_sender) =
Muxer::builder(context.with_label("mux"), sender, receiver, CAPACITY)
.with_backup()
.with_global_sender()
.build();
mux.start();
(pubkey, handle, backup, global_sender)
}
async fn send_burst<S: Sender>(txs: &mut [SubSender<S>], count: usize) {
for i in 0..count {
let payload = IoBuf::from(vec![i as u8]);
for tx in txs.iter_mut() {
let _ = tx
.send(Recipients::All, payload.clone(), false)
.await
.unwrap();
}
}
}
async fn expect_n_messages(
rx: &mut SubReceiver<impl Receiver<PublicKey = PublicKey>>,
n: usize,
) {
let mut count = 0;
loop {
select! {
res = rx.recv() => {
res.expect("should have received message");
count += 1;
},
}
if count >= n {
break;
}
}
assert_eq!(n, count);
}
async fn expect_n_messages_with_backup(
rx: &mut SubReceiver<impl Receiver<PublicKey = PublicKey>>,
backup_rx: &mut mpsc::Receiver<BackupResponse<PublicKey>>,
n: usize,
n_backup: usize,
) {
let mut count_std = 0;
let mut count_backup = 0;
loop {
select! {
res = rx.recv() => {
res.expect("should have received message");
count_std += 1;
},
res = backup_rx.recv() => {
res.expect("should have received message");
count_backup += 1;
},
}
if count_std >= n && count_backup >= n_backup {
break;
}
}
assert_eq!(n, count_std);
assert_eq!(n_backup, count_backup);
}
#[test]
fn test_basic_routing() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2) = create_peer(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (_, mut sub_rx1) = handle1.register(7).await.unwrap();
let (mut sub_tx2, _) = handle2.register(7).await.unwrap();
let payload = IoBuf::from(b"hello");
let _ = sub_tx2
.send(Recipients::One(pk1.clone()), payload.clone(), false)
.await
.unwrap();
let (from, bytes) = sub_rx1.recv().await.unwrap();
assert_eq!(from, pk2);
assert_eq!(bytes, payload);
});
}
#[test]
fn test_multiple_routes() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2) = create_peer(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (_, mut rx_a) = handle1.register(10).await.unwrap();
let (_, mut rx_b) = handle1.register(20).await.unwrap();
let (mut tx2_a, _) = handle2.register(10).await.unwrap();
let (mut tx2_b, _) = handle2.register(20).await.unwrap();
let payload_a = IoBuf::from(b"A");
let payload_b = IoBuf::from(b"B");
let _ = tx2_a
.send(Recipients::One(pk1.clone()), payload_a.clone(), false)
.await
.unwrap();
let _ = tx2_b
.send(Recipients::One(pk1.clone()), payload_b.clone(), false)
.await
.unwrap();
let (from_a, bytes_a) = rx_a.recv().await.unwrap();
assert_eq!(from_a, pk2);
assert_eq!(bytes_a, payload_a);
let (from_b, bytes_b) = rx_b.recv().await.unwrap();
assert_eq!(from_b, pk2);
assert_eq!(bytes_b, payload_b);
});
}
#[test_traced]
fn test_mailbox_capacity_drops_when_full() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2) = create_peer(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (tx1, _) = handle1.register(99).await.unwrap();
let (tx2, _) = handle1.register(100).await.unwrap();
let (_, mut rx1) = handle2.register(99).await.unwrap();
let (_, mut rx2) = handle2.register(100).await.unwrap();
send_burst(&mut [tx1, tx2], CAPACITY * 2).await;
expect_n_messages(&mut rx1, CAPACITY).await;
expect_n_messages(&mut rx2, CAPACITY).await;
});
}
#[test]
fn test_drop_subchannel_receiver_deregisters_route() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2) = create_peer(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (tx1, _) = handle1.register(99).await.unwrap();
let (tx2, _) = handle1.register(100).await.unwrap();
let (_, rx1) = handle2.register(99).await.unwrap();
let (_, mut rx2) = handle2.register(100).await.unwrap();
drop(rx1);
send_burst(&mut [tx1, tx2], CAPACITY).await;
expect_n_messages(&mut rx2, CAPACITY).await;
});
}
#[test]
fn test_drop_messages_for_unregistered_subchannel() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2) = create_peer(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (tx1, _) = handle1.register(1).await.unwrap();
let (tx2, _) = handle1.register(2).await.unwrap();
let (_, mut rx2) = handle2.register(2).await.unwrap();
send_burst(&mut [tx1, tx2], CAPACITY).await;
expect_n_messages(&mut rx2, CAPACITY).await;
});
}
#[test]
fn test_backup_for_unregistered_subchannel() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2, mut backup2, _) =
create_peer_with_backup_and_global_sender(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (tx1, _) = handle1.register(1).await.unwrap();
let (tx2, _) = handle1.register(2).await.unwrap();
let (_, mut rx2) = handle2.register(2).await.unwrap();
send_burst(&mut [tx1, tx2], CAPACITY).await;
expect_n_messages_with_backup(&mut rx2, &mut backup2, CAPACITY, CAPACITY).await;
});
}
#[test]
fn test_backup_for_unregistered_subchannel_response() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, _handle2, mut backup2, mut global_sender2) =
create_peer_with_backup_and_global_sender(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (tx1, mut rx1) = handle1.register(1).await.unwrap();
send_burst(&mut [tx1], 1).await;
let (subchannel, (from, _)) = backup2.recv().await.unwrap();
assert_eq!(subchannel, 1);
assert_eq!(from, pk1);
global_sender2
.send(subchannel, Recipients::One(pk1), b"TEST", true)
.await
.unwrap();
let (from, bytes) = rx1.recv().await.unwrap();
assert_eq!(from, pk2);
assert_eq!(bytes, b"TEST");
});
}
#[test]
fn test_message_dropped_for_closed_subchannel() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2) = create_peer(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
let (tx1, _) = handle1.register(1).await.unwrap();
let (tx2, _) = handle1.register(2).await.unwrap();
let (_, mut rx1) = handle2.register(1).await.unwrap();
let (_, mut rx2) = handle2.register(2).await.unwrap();
send_burst(&mut [tx1.clone()], CAPACITY).await;
expect_n_messages(&mut rx1, CAPACITY).await;
send_burst(&mut [tx2.clone()], CAPACITY).await;
expect_n_messages(&mut rx2, CAPACITY).await;
rx1.receiver.close();
send_burst(&mut [tx1, tx2], CAPACITY).await;
expect_n_messages(&mut rx2, CAPACITY).await;
});
}
#[test]
fn test_dropped_backup_channel_doesnt_block() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (pk2, mut handle2, backup2, _) =
create_peer_with_backup_and_global_sender(&context, &mut oracle, 1).await;
link_bidirectional(&mut oracle, pk1.clone(), pk2.clone()).await;
drop(backup2);
let (tx1, _) = handle1.register(1).await.unwrap();
let (tx2, _) = handle1.register(2).await.unwrap();
let (_, mut rx2) = handle2.register(2).await.unwrap();
send_burst(&mut [tx1, tx2], CAPACITY).await;
expect_n_messages(&mut rx2, CAPACITY).await;
});
}
#[test]
fn test_duplicate_registration() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (_pk1, mut handle1) = create_peer(&context, &mut oracle, 0).await;
let (_, _rx) = handle1.register(7).await.unwrap();
assert!(matches!(
handle1.register(7).await,
Err(Error::AlreadyRegistered(_))
));
});
}
#[test]
fn test_register_after_deregister() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let mut oracle = start_network(context.clone());
let (_, mut handle) = create_peer(&context, &mut oracle, 0).await;
let (_, rx) = handle.register(7).await.unwrap();
drop(rx);
handle.register(7).await.unwrap();
});
}
}