#![allow(clippy::blocks_in_conditions, unreachable_code)]
use std::{
future::Future,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use agnostic_lite::RuntimeLite;
use bytes::Bytes;
use futures::FutureExt;
use nodecraft::{resolver::AddressResolver, CheapClone, Node};
use smol_str::SmolStr;
use transformable::Transformable;
use crate::{
delegate::{mock::MockDelegate, CompositeDelegate, VoidDelegate},
state::LocalNodeState,
tests::{get_memberlist, next_socket_addr_v4, next_socket_addr_v6, AnyError},
transport::{Ack, Alive, IndirectPing, MaybeResolvedAddress, Message},
types::Epoch,
Member, Memberlist, Options,
};
use super::{Meta, NodeState, Ping, PushNodeState, PushPull, State, Transport};
mod unimplemented;
pub use unimplemented::*;
const TIMEOUT_DURATION: Duration = Duration::from_secs(5);
const WAIT_DURATION: Duration = Duration::from_secs(6);
pub enum AddressKind {
V4,
V6,
}
impl core::fmt::Display for AddressKind {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::V4 => write!(f, "v4"),
Self::V6 => write!(f, "v6"),
}
}
}
impl AddressKind {
pub fn next(&self, network: u8) -> SocketAddr {
match self {
Self::V4 => next_socket_addr_v4(network),
Self::V6 => next_socket_addr_v6(),
}
}
}
pub trait TestPacketStream: Send + Sync + 'static {
fn send_to(&mut self, data: &[u8]) -> impl Future<Output = Result<(), AnyError>> + Send;
fn recv_from(&mut self) -> impl Future<Output = Result<(Bytes, SocketAddr), AnyError>> + Send;
fn finish(&mut self) -> impl Future<Output = Result<(), AnyError>> + Send;
}
pub trait TestPacketConnection: Send + Sync + 'static {
type Stream: TestPacketStream;
fn accept(&self) -> impl Future<Output = Result<Self::Stream, AnyError>> + Send;
fn connect(&self) -> impl Future<Output = Result<Self::Stream, AnyError>> + Send;
}
pub trait TestPacketClient: Sized + Send + Sync + 'static {
type Connection: TestPacketConnection;
fn accept(&mut self) -> impl Future<Output = Result<Self::Connection, AnyError>> + Send;
fn connect(
&self,
addr: SocketAddr,
) -> impl Future<Output = Result<Self::Connection, AnyError>> + Send;
fn local_addr(&self) -> SocketAddr;
fn close(&mut self) -> impl Future<Output = ()> + Send;
}
pub trait TestPromisedStream: Send + Sync + 'static {
fn finish(&mut self) -> impl Future<Output = Result<(), AnyError>> + Send;
}
pub trait TestPromisedConnection: Send + Sync + 'static {
type Stream: TestPromisedStream;
fn accept(&self) -> impl Future<Output = Result<(Self::Stream, SocketAddr), AnyError>> + Send;
fn connect(&self) -> impl Future<Output = Result<Self::Stream, AnyError>> + Send;
}
pub trait TestPromisedClient: Sized + Send + Sync + 'static {
type Stream: TestPromisedStream;
type Connection: TestPromisedConnection<Stream = Self::Stream>;
fn connect(
&self,
addr: SocketAddr,
) -> impl Future<Output = Result<Self::Connection, AnyError>> + Send;
fn accept(&self) -> impl Future<Output = Result<Self::Connection, AnyError>> + Send;
fn local_addr(&self) -> std::io::Result<SocketAddr>;
fn close(&self) -> impl Future<Output = Result<(), AnyError>> + Send;
}
pub async fn handle_ping<A, T, C, R>(trans: T, mut client: C) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
C: TestPacketClient,
R: RuntimeLite,
{
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let source_addr = client.local_addr();
let ping = Ping::new(
42,
Node::new("test".into(), source_addr),
m.advertise_node(),
);
let buf = Message::from(ping).encode_to_vec()?;
let connection = client.connect(*m.advertise_address()).await?;
let mut send_stream = connection.connect().await?;
if let Err(e) = send_stream.send_to(&buf).await {
panic!("failed to send: {}", e);
}
let (tx, rx) = async_channel::bounded(1);
let tx1 = tx.clone();
R::spawn_detach(async move {
futures::select! {
_ = rx.recv().fuse() => {},
_ = R::sleep(TIMEOUT_DURATION).fuse() => {
tx1.close();
}
}
});
let (in_, _) = R::timeout(WAIT_DURATION, async {
let connection = client.accept().await.unwrap();
let mut recv_stream = connection.accept().await.unwrap();
recv_stream.recv_from().await.unwrap()
})
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))?;
let ack = Message::<SmolStr, SocketAddr>::decode(&in_).map(|(_, msg)| msg.unwrap_ack())?;
assert_eq!(
ack.sequence_number(),
42,
"bad sequence no: {}",
ack.sequence_number()
);
let res = futures::select! {
res = tx.send(()).fuse() => {
if res.is_err() {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout").into())
} else {
Ok(())
}
}
};
let _ = m.shutdown().await;
client.close().await;
res
}
pub async fn handle_compound_ping<A, T, C, E, R>(
trans: T,
mut client: C,
encoder: E,
) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
C: TestPacketClient,
E: FnOnce(&[Message<SmolStr, SocketAddr>]) -> Result<Bytes, AnyError>,
R: RuntimeLite,
{
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let source_addr = client.local_addr();
let ping = Ping::new(
42,
Node::new("test".into(), source_addr),
m.advertise_node(),
);
let msgs = [
Message::from(ping.cheap_clone()),
Message::from(ping.cheap_clone()),
Message::from(ping.cheap_clone()),
];
let buf = encoder(&msgs)?;
let connection = client.connect(*m.advertise_address()).await?;
let mut send_stream = connection.connect().await?;
send_stream.send_to(&buf).await?;
let (tx, rx) = async_channel::bounded(1);
let tx1 = tx.clone();
R::spawn_detach(async move {
futures::select! {
_ = rx.recv().fuse() => {},
_ = R::sleep(TIMEOUT_DURATION).fuse() => {
tx1.close();
}
}
});
let connection = R::timeout(WAIT_DURATION, client.accept()).await??;
for _ in 0..3 {
let (in_, _) = R::timeout(WAIT_DURATION, async {
let mut recv_stream = connection.accept().await?;
recv_stream.recv_from().await
})
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))??;
let ack = Message::<SmolStr, SocketAddr>::decode(&in_).map(|(_, msg)| msg.unwrap_ack())?;
assert_eq!(
ack.sequence_number(),
42,
"bad sequence no: {}",
ack.sequence_number()
);
}
let res = futures::select! {
res = tx.send(()).fuse() => {
if res.is_err() {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout").into())
} else {
Ok(())
}
}
};
let _ = m.shutdown().await;
client.close().await;
res
}
pub async fn handle_indirect_ping<A, T, C, R>(trans: T, mut client: C) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
C: TestPacketClient,
R: RuntimeLite,
{
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let source_addr = client.local_addr();
let ping = IndirectPing::new(
100,
Node::new("test".into(), source_addr),
m.advertise_node(),
);
let buf = Message::from(ping).encode_to_vec()?;
let connection = client.connect(*m.advertise_address()).await?;
let mut send_stream = connection.connect().await?;
send_stream.send_to(&buf).await?;
let (tx, rx) = async_channel::bounded(1);
let tx1 = tx.clone();
R::spawn_detach(async move {
futures::select! {
_ = rx.recv().fuse() => {},
_ = R::sleep(TIMEOUT_DURATION).fuse() => {
tx1.close();
}
}
});
let (in_, _) = R::timeout(WAIT_DURATION, async {
let connection = client.accept().await?;
let mut recv_stream = connection.accept().await?;
recv_stream.recv_from().await
})
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))??;
let ack = Message::<SmolStr, SocketAddr>::decode(&in_).map(|(_, msg)| msg.unwrap_ack())?;
assert_eq!(
ack.sequence_number(),
100,
"bad sequence no: {}",
ack.sequence_number()
);
let res = futures::select! {
res = tx.send(()).fuse() => {
if res.is_err() {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout").into())
} else {
Ok(())
}
}
};
let _ = m.shutdown().await;
client.close().await;
res
}
pub async fn handle_ping_wrong_node<A, T, C, R>(trans: T, mut client: C) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
C: TestPacketClient,
R: RuntimeLite,
{
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let source_addr = client.local_addr();
let ping: Ping<SmolStr, _> = Ping::new(
42,
Node::new("test".into(), source_addr),
Node::new("bad".into(), {
let mut a = source_addr;
a.set_port(12345);
a
}),
);
let buf = Message::from(ping).encode_to_vec()?;
let connection = client.connect(*m.advertise_address()).await?;
let mut send_stream = connection.connect().await?;
send_stream.send_to(&buf).await?;
let res = R::timeout(WAIT_DURATION, async {
let connection = client.accept().await?;
let mut recv_stream = connection.accept().await?;
recv_stream.recv_from().await
})
.await;
#[allow(clippy::single_match)]
match res {
Ok(Ok(_)) => {
panic!("should got timeout error");
}
_ => {}
}
let _ = m.shutdown().await;
client.close().await;
Ok(())
}
pub async fn send_packet_piggyback<A, T, C, D, R>(
trans: T,
mut client: C,
decoder: D,
) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
C: TestPacketClient,
D: FnOnce(Bytes) -> Result<[Message<SmolStr, SocketAddr>; 2], AnyError>,
R: RuntimeLite,
{
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let source_addr = client.local_addr();
let n: Node<SmolStr, SocketAddr> = Node::new("rand".into(), *m.advertise_address());
let a = Alive::new(10, n.clone()).with_meta(Meta::empty());
m.broadcast(n.id().clone(), Message::from(a)).await;
let ping = Ping::new(
42,
Node::new("test".into(), source_addr),
m.advertise_node(),
);
let buf = Message::from(ping).encode_to_vec()?;
let connection = client.connect(*m.advertise_address()).await?;
let mut send_stream = connection.connect().await?;
send_stream.send_to(&buf).await?;
let (tx, rx) = async_channel::bounded(1);
let tx1 = tx.clone();
R::spawn_detach(async move {
futures::select! {
_ = rx.recv().fuse() => {},
_ = R::sleep(TIMEOUT_DURATION).fuse() => {
tx1.close();
}
}
});
let (in_, _) = R::timeout(WAIT_DURATION, async {
let connection = client.accept().await?;
let mut recv_stream = connection.accept().await?;
recv_stream.recv_from().await
})
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout"))??;
let parts = decoder(in_)?;
let [m1, m2] = parts;
let ack = m1.unwrap_ack();
assert_eq!(ack.sequence_number(), 42, "bad sequence no");
let alive = m2.unwrap_alive();
assert_eq!(alive.incarnation(), 10);
assert_eq!(alive.node(), &n);
let res = futures::select! {
res = tx.send(()).fuse() => {
if res.is_err() {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout").into())
} else {
Ok(())
}
}
};
let _ = m.shutdown().await;
client.close().await;
res
}
macro_rules! unwrap_ok {
($tx:ident.send($expr:expr)) => {
match $expr {
::std::result::Result::Ok(val) => val,
::std::result::Result::Err(e) => {
$tx.send(e).await.unwrap();
return;
}
}
};
}
macro_rules! panic_on_err {
($expr:expr) => {
match $expr {
::std::result::Result::Ok(val) => val,
::std::result::Result::Err(e) => {
panic!("{}", e);
}
}
};
}
pub async fn promised_ping<A, T, P, R>(
trans: T,
promised: P,
kind: AddressKind,
) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
P: TestPromisedClient,
P::Stream: TestPromisedStream + AsMut<T::Stream>,
R: RuntimeLite,
{
let promised_addr = promised.local_addr()?;
let promised = Arc::new(promised);
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let ping_timeout = m.inner.opts.probe_interval + Duration::from_secs(2);
let ping_time_max = m.inner.opts.probe_interval + Duration::from_secs(10);
let node = Node::new("mongo".into(), kind.next(0));
let ping_out = Ping::new(23, m.advertise_node(), node.cheap_clone());
let (ping_err_tx, ping_err_rx) = async_channel::bounded::<AnyError>(1);
let m1 = m.clone();
let node1 = node.cheap_clone();
let p1 = promised.clone();
let ping_err_tx1 = ping_err_tx.clone();
let (tx, rx) = async_channel::bounded(1);
let (tx1, rx1) = async_channel::bounded(1);
let (tx2, rx2) = async_channel::bounded(1);
let (tx3, rx3) = async_channel::bounded(1);
R::spawn_detach(async move {
let mut num_accepted = 0;
let acceptor = unwrap_ok!(ping_err_tx1.send(unwrap_ok!(ping_err_tx1.send(
R::timeout(ping_time_max, p1.accept())
.await
.map_err(Into::into)
))));
loop {
futures::select! {
stream = acceptor.accept().fuse() => {
num_accepted += 1;
match stream {
Ok((mut stream, addr)) if num_accepted == 1 => {
let (_, p) = unwrap_ok!(ping_err_tx1.send(
m1.inner
.transport
.read_message(&addr, stream.as_mut())
.await
.map_err(Into::into)
));
let ping_in = p.unwrap_ping();
assert_eq!(ping_in.sequence_number(), 23);
assert_eq!(ping_in.target(), &node1);
let ack = Ack::new(23);
unwrap_ok!(ping_err_tx1.send(
m1.inner
.transport
.send_message(stream.as_mut(), ack.into())
.await
.map_err(Into::into)
));
let _ = TestPromisedStream::finish(&mut stream).await;
let _ = rx1.recv().await;
},
Ok((mut stream, addr)) if num_accepted == 2 => {
let (_, p) = unwrap_ok!(ping_err_tx1.send(
m1.inner
.transport
.read_message(&addr, stream.as_mut())
.await
.map_err(Into::into)
));
let ping_in = p.unwrap_ping();
let ack = Ack::new(ping_in.sequence_number() + 1);
unwrap_ok!(ping_err_tx1.send(
m1.inner
.transport
.send_message(stream.as_mut(), ack.into())
.await
.map_err(Into::into)
));
let _ = stream.finish().await;
let _ = rx2.recv().await;
},
Ok((mut stream, addr)) if num_accepted == 3 => {
let _ = unwrap_ok!(ping_err_tx1.send(
m1.inner
.transport
.read_message(&addr, stream.as_mut())
.await
.map_err(Into::into)
));
unwrap_ok!(ping_err_tx1.send(
m1.inner
.transport
.send_message(
stream.as_mut(),
IndirectPing::new(0, Node::new("unknown source".into(), kind.next(0)), Node::new("unknown target".into(), kind.next(0)))
.into()
)
.await
.map_err(Into::into)
));
let _ = stream.finish().await;
let _ = rx3.recv().await;
}
Ok(_) => {
let _ = rx.recv().await;
},
Err(e) => {
let _ = ping_err_tx1.send(e).await;
}
}
}
_ = rx.recv().fuse() => {
return;
}
}
}
});
let did_contact = panic_on_err!(
m.send_ping_and_wait_for_ack(
&promised_addr,
ping_out.clone(),
Instant::now() + ping_timeout
)
.await
);
if !did_contact {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "expected successful ping").into());
}
if !ping_err_rx.is_empty() {
return Err(ping_err_rx.recv().await.unwrap());
}
let _ = tx1.send(()).await;
let err = m
.send_ping_and_wait_for_ack(
&promised_addr,
ping_out.clone(),
Instant::now() + ping_timeout,
)
.await
.expect_err("expected failed ping");
if !err
.to_string()
.contains("sequence number mismatch: ping(23), ack(24)")
{
panic!("should catch sequence number mismatch err, but got err: {err}");
}
if !ping_err_rx.is_empty() {
panic!("{}", ping_err_rx.recv().await.unwrap());
}
let _ = tx2.send(()).await;
let err = m
.send_ping_and_wait_for_ack(
&promised_addr,
ping_out.clone(),
Instant::now() + ping_timeout,
)
.await
.expect_err("expected failed ping");
if !err
.to_string()
.contains("unexpected message: expected Ack, got IndirectPing")
{
panic!("should catch unexpected message type err, but got err: {err}");
}
if !ping_err_rx.is_empty() {
panic!("{}", ping_err_rx.recv().await.unwrap());
}
let _ = tx3.send(()).await;
let _ = tx.send(()).await;
let _ = promised.close().await;
drop(promised);
R::sleep(Duration::from_secs(1)).await;
let start_ping = Instant::now();
let did_contact = m
.send_ping_and_wait_for_ack(&promised_addr, ping_out, Instant::now() + ping_timeout)
.await?;
let elapsed = start_ping.elapsed();
assert!(!did_contact, "expected failed ping");
assert!(
elapsed <= ping_time_max,
"elapsed: {:?}, take too long to fail ping",
elapsed
);
let _ = m.shutdown().await;
Ok(())
}
pub async fn promised_push_pull<A, T, P, R>(trans: T, promised: P) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
P: TestPromisedClient,
P::Stream: TestPromisedStream + AsMut<T::Stream>,
R: RuntimeLite,
{
let m = get_memberlist(trans, VoidDelegate::default(), Options::default()).await?;
let bind_addr = *m.advertise_address();
let id0: SmolStr = "Test 0".into();
{
let mut members = m.inner.nodes.write().await;
members.nodes.push(Member {
state: LocalNodeState {
server: Arc::new(NodeState::new(id0.cheap_clone(), bind_addr, State::Alive)),
incarnation: Arc::new(0.into()),
state_change: Epoch::now() - Duration::from_secs(1),
state: State::Suspect,
},
suspicion: None,
});
members.node_map.insert(id0.cheap_clone(), 0);
}
let connector = promised.connect(bind_addr).await?;
let push_pull = PushPull::new(
false,
[
PushNodeState::new(1, id0.cheap_clone(), bind_addr, State::Alive),
PushNodeState::new(1, "Test 1".into(), bind_addr, State::Alive),
PushNodeState::new(1, "Test 2".into(), bind_addr, State::Alive),
]
.into_iter()
.collect(),
);
let mut conn = connector.connect().await?;
m.inner
.transport
.send_message(conn.as_mut(), push_pull.into())
.await?;
let (_, msg) = m
.inner
.transport
.read_message(&bind_addr, conn.as_mut())
.await?;
let readed_push_pull = msg.unwrap_push_pull();
assert!(!readed_push_pull.join());
assert_eq!(readed_push_pull.states().len(), 1);
assert_eq!(readed_push_pull.states()[0].id(), &id0);
assert_eq!(
readed_push_pull.states()[0].incarnation(),
0,
"bad incarnation"
);
assert_eq!(
readed_push_pull.states()[0].state(),
State::Suspect,
"bad state"
);
m.shutdown().await?;
Ok(())
}
pub async fn join<A, T1, T2, R>(trans1: T1::Options, trans2: T2::Options) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T1: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
T2: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
R: RuntimeLite,
{
let m1 = Memberlist::<T1, _>::new(trans1, Options::default())
.await
.map_err(|e| {
tracing::error!("fail to start memberlist node 1: {}", e);
e
})?;
let m2 = Memberlist::<T2, _>::new(trans2, Options::default())
.await
.map_err(|e| {
tracing::error!("fail to start memberlist node 2: {}", e);
e
})?;
m2.join(Node::new(
m1.local_id().cheap_clone(),
MaybeResolvedAddress::resolved(*m1.advertise_address()),
))
.await
.map_err(|e| {
tracing::error!("fail to join: {}", e);
e
})?;
assert_eq!(m2.num_members().await, 2);
assert_eq!(m2.estimate_num_nodes(), 2);
m1.shutdown().await?;
m2.shutdown().await?;
Ok(())
}
pub async fn join_dead_node<A, T, P, R>(trans1: T::Options, promised: P, fake_id: T::Id)
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
P: TestPromisedClient,
R: RuntimeLite,
{
let local_addr = promised.local_addr().unwrap();
let m = Memberlist::<T, _>::new(
trans1,
Options::default().with_timeout(Duration::from_millis(50)),
)
.await
.unwrap();
R::spawn_detach(async move {
R::sleep(TIMEOUT_DURATION).await;
panic!("should have timed out by now");
});
let target = Node::new(fake_id, MaybeResolvedAddress::resolved(local_addr));
m.join(target).await.unwrap_err();
m.shutdown().await.unwrap();
promised.close().await.unwrap();
}
pub async fn send<A, T1, T2, R>(trans1: T1::Options, trans2: T2::Options) -> Result<(), AnyError>
where
A: AddressResolver<ResolvedAddress = SocketAddr, Runtime = R>,
T1: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
T2: Transport<Id = SmolStr, Resolver = A, Runtime = R>,
R: RuntimeLite,
{
let m1 = Memberlist::<T1, _>::with_delegate(
CompositeDelegate::new().with_node_delegate(MockDelegate::<SmolStr, SocketAddr>::new()),
trans1,
Options::default(),
)
.await?;
let m2 = Memberlist::<T2, _>::new(trans2, Options::default()).await?;
m2.join(Node::new(
m1.local_id().cheap_clone(),
MaybeResolvedAddress::resolved(*m1.advertise_address()),
))
.await?;
assert_eq!(m2.num_members().await, 2);
assert_eq!(m2.estimate_num_nodes(), 2);
m2.send(m1.advertise_address(), Bytes::from_static(b"send"))
.await
.map_err(|e| {
tracing::error!("fail to send packet {e}");
e
})
.unwrap();
m2.send_reliable(m1.advertise_address(), Bytes::from_static(b"send_reliable"))
.await
.map_err(|e| {
tracing::error!("fail to send message {e}");
e
})
.unwrap();
R::sleep(WAIT_DURATION).await;
let mut msgs1 = m1.delegate().unwrap().node_delegate().get_messages().await;
msgs1.sort();
assert_eq!(msgs1, ["send".as_bytes(), "send_reliable".as_bytes()]);
m1.shutdown().await?;
m2.shutdown().await?;
Ok(())
}