use crate::{Handler, Monitor};
mod engine;
use commonware_p2p::Blocker;
pub use engine::Engine;
mod ingress;
pub use ingress::{Mailbox, Message};
#[cfg(test)]
mod mocks;
#[derive(Clone)]
pub struct Config<B: Blocker, M: Monitor, H: Handler, RqC, RsC> {
pub blocker: B,
pub monitor: M,
pub handler: H,
pub mailbox_size: usize,
pub priority_request: bool,
pub request_codec: RqC,
pub priority_response: bool,
pub response_codec: RsC,
}
#[cfg(test)]
mod tests {
use super::{
mocks::{
handler::Handler as MockHandler,
monitor::Monitor as MockMonitor,
types::{Request, Response},
},
Config, Engine, Mailbox,
};
use crate::{Error, Handler, Monitor, Originator};
use commonware_codec::Encode;
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Committable, Signer,
};
use commonware_macros::{select, test_traced};
use commonware_p2p::{
simulated::{Link, Network, Oracle, Receiver, Sender},
Blocker, Manager as _, Recipients, Sender as _,
};
use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
use commonware_utils::{ordered::Set, NZUsize, NZU32};
use std::time::Duration;
const TEST_QUOTA: Quota = Quota::per_second(NZU32!(1_000_000));
const MAILBOX_SIZE: usize = 1024;
const LINK: Link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
const LINK_SLOW: Link = Link {
latency: Duration::from_secs(1),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
async fn setup_network_and_peers(
context: &deterministic::Context,
peer_seeds: &[u64],
) -> (
Oracle<PublicKey, deterministic::Context>,
Vec<PrivateKey>,
Vec<PublicKey>,
Vec<(
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
)>,
) {
let (network, oracle) = Network::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let schemes: Vec<PrivateKey> = peer_seeds
.iter()
.map(|seed| PrivateKey::from_seed(*seed))
.collect();
let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
let mut connections = Vec::new();
for peer in &peers {
let control = oracle.control(peer.clone());
let (sender1, receiver1) = control.register(0, TEST_QUOTA).await.unwrap();
let (sender2, receiver2) = control.register(1, TEST_QUOTA).await.unwrap();
connections.push(((sender1, receiver1), (sender2, receiver2)));
}
oracle
.manager()
.track(0, Set::from_iter_dedup(peers.clone()))
.await;
(oracle, schemes, peers, connections)
}
async fn add_link(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
link: Link,
peers: &[PublicKey],
from: usize,
to: usize,
) {
oracle
.add_link(peers[from].clone(), peers[to].clone(), link.clone())
.await
.unwrap();
oracle
.add_link(peers[to].clone(), peers[from].clone(), link)
.await
.unwrap();
}
#[allow(clippy::type_complexity)]
fn setup_and_spawn_engine(
context: &deterministic::Context,
blocker: impl Blocker<PublicKey = PublicKey>,
signer: impl Signer<PublicKey = PublicKey>,
connection: (
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
),
monitor: impl Monitor<PublicKey = PublicKey, Response = Response>,
handler: impl Handler<PublicKey = PublicKey, Request = Request, Response = Response>,
) -> Mailbox<PublicKey, Request> {
let public_key = signer.public_key();
let (engine, mailbox) = Engine::new(
context.with_label(&format!("engine_{public_key}")),
Config {
blocker,
monitor,
handler,
mailbox_size: MAILBOX_SIZE,
priority_request: false,
request_codec: (),
priority_response: false,
response_codec: (),
},
);
engine.start(connection.0, connection.1);
mailbox
}
#[test_traced]
fn test_send_and_collect_response() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let req_conn = conn.0;
let res_conn = conn.1;
let (mon, mut mon_out) = MockMonitor::new();
let mut mailbox1 = setup_and_spawn_engine(
&context,
oracle.control(scheme.public_key()),
scheme,
(req_conn, res_conn),
mon,
MockHandler::dummy(),
);
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let req_conn = conn.0;
let res_conn = conn.1;
let (handler, mut handler_out) = MockHandler::new(true);
let _mailbox = setup_and_spawn_engine(
&context,
oracle.control(scheme.public_key()),
scheme,
(req_conn, res_conn),
MockMonitor::dummy(),
handler,
);
let request = Request { id: 1, data: 1 };
let recipients = mailbox1
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.expect("send failed");
assert_eq!(recipients, vec![peers[1].clone()]);
let processed = handler_out.recv().await.unwrap();
assert_eq!(processed.origin, peers[0]);
assert_eq!(processed.request, request);
assert!(processed.responded);
let collected = mon_out.recv().await.unwrap();
assert_eq!(collected.handler, peers[1]);
assert_eq!(collected.response.id, 1);
assert_eq!(collected.response.result, 2);
assert_eq!(collected.count, 1);
});
}
#[test_traced]
fn test_cancel_request() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK_SLOW.clone(), &peers, 0, 1).await;
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let req_conn = conn.0;
let res_conn = conn.1;
let (mon, mut mon_out) = MockMonitor::new();
let mut mailbox = setup_and_spawn_engine(
&context,
oracle.control(scheme.public_key()),
scheme,
(req_conn, res_conn),
mon,
MockHandler::dummy(),
);
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let req_conn = conn.0;
let res_conn = conn.1;
let (handler, _) = MockHandler::new(true);
let _mailbox = setup_and_spawn_engine(
&context,
oracle.control(scheme.public_key()),
scheme,
(req_conn, res_conn),
MockMonitor::dummy(),
handler,
);
let request = Request { id: 1, data: 1 };
let commitment = request.commitment();
let recipients = mailbox
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.expect("send failed");
assert_eq!(recipients, vec![peers[1].clone()]);
mailbox.cancel(commitment).await;
select! {
_ = mon_out.recv() => {
panic!("Should not receive any monitor events");
},
_ = context.sleep(Duration::from_millis(5_000)) => {
},
}
});
}
#[test_traced]
fn test_broadcast_request() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1, 2]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
let scheme1 = schemes.next().unwrap();
let conn1 = connections.next().unwrap();
let req_conn1 = conn1.0;
let res_conn1 = conn1.1;
let (mon1, mut mon_out1) = MockMonitor::new();
let mut mailbox1 = setup_and_spawn_engine(
&context,
oracle.control(scheme1.public_key()),
scheme1,
(req_conn1, res_conn1),
mon1,
MockHandler::dummy(),
);
let scheme2 = schemes.next().unwrap();
let conn2 = connections.next().unwrap();
let req_conn2 = conn2.0;
let res_conn2 = conn2.1;
let (handler2, _) = MockHandler::new(true);
let _mailbox2 = setup_and_spawn_engine(
&context,
oracle.control(scheme2.public_key()),
scheme2,
(req_conn2, res_conn2),
MockMonitor::dummy(),
handler2,
);
let scheme3 = schemes.next().unwrap();
let conn3 = connections.next().unwrap();
let req_conn3 = conn3.0;
let res_conn3 = conn3.1;
let (handler3, _) = MockHandler::new(true);
let _mailbox3 = setup_and_spawn_engine(
&context,
oracle.control(scheme3.public_key()),
scheme3,
(req_conn3, res_conn3),
MockMonitor::dummy(),
handler3,
);
let request = Request { id: 3, data: 3 };
let recipients = mailbox1
.send(Recipients::All, request.clone())
.await
.expect("send failed");
assert_eq!(recipients.len(), 2);
assert!(recipients.contains(&peers[1]));
assert!(recipients.contains(&peers[2]));
let mut responses_collected = 0;
let mut peer2_responded = false;
let mut peer3_responded = false;
for _ in 0..2 {
let collected = mon_out1.recv().await.unwrap();
assert_eq!(collected.response.id, 3);
assert_eq!(collected.response.result, 6);
responses_collected += 1;
assert_eq!(collected.count, responses_collected);
if collected.handler == peers[1] {
peer2_responded = true;
} else if collected.handler == peers[2] {
peer3_responded = true;
}
}
assert!(peer2_responded);
assert!(peer3_responded);
});
}
#[test_traced]
fn test_duplicate_response_ignored() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let scheme1 = schemes.next().unwrap();
let conn1 = connections.next().unwrap();
let req_conn1 = conn1.0;
let res_conn1 = conn1.1;
let (mon1, mut mon_out1) = MockMonitor::new();
let mut mailbox1 = setup_and_spawn_engine(
&context,
oracle.control(scheme1.public_key()),
scheme1,
(req_conn1, res_conn1),
mon1,
MockHandler::dummy(),
);
let scheme2 = schemes.next().unwrap();
let conn2 = connections.next().unwrap();
let req_conn2 = conn2.0;
let res_conn2 = conn2.1;
let (handler2, _) = MockHandler::new(true);
let _mailbox2 = setup_and_spawn_engine(
&context,
oracle.control(scheme2.public_key()),
scheme2,
(req_conn2, res_conn2),
MockMonitor::dummy(),
handler2,
);
let request = Request { id: 5, data: 5 };
for _ in 0..3 {
let recipients = mailbox1
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.expect("send failed");
assert_eq!(recipients, vec![peers[1].clone()]);
}
let collected = mon_out1.recv().await.unwrap();
assert_eq!(collected.handler, peers[1]);
assert_eq!(collected.response.id, 5);
assert_eq!(collected.count, 1);
select! {
_ = mon_out1.recv() => {
panic!("Should not receive duplicate responses");
},
_ = context.sleep(Duration::from_millis(5_000)) => {
},
}
});
}
#[test_traced]
fn test_concurrent_requests() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let scheme1 = schemes.next().unwrap();
let conn1 = connections.next().unwrap();
let req_conn1 = conn1.0;
let res_conn1 = conn1.1;
let (mon1, mut mon_out1) = MockMonitor::new();
let mut mailbox1 = setup_and_spawn_engine(
&context,
oracle.control(scheme1.public_key()),
scheme1,
(req_conn1, res_conn1),
mon1,
MockHandler::dummy(),
);
let scheme2 = schemes.next().unwrap();
let conn2 = connections.next().unwrap();
let req_conn2 = conn2.0;
let res_conn2 = conn2.1;
let (mut handler2, _) = MockHandler::new(false);
handler2.set_response(10, Response { id: 10, result: 20 });
handler2.set_response(20, Response { id: 20, result: 40 });
let _mailbox2 = setup_and_spawn_engine(
&context,
oracle.control(scheme2.public_key()),
scheme2,
(req_conn2, res_conn2),
MockMonitor::dummy(),
handler2,
);
let request1 = Request { id: 10, data: 10 };
let request2 = Request { id: 20, data: 20 };
mailbox1
.send(Recipients::One(peers[1].clone()), request1)
.await
.expect("send failed");
mailbox1
.send(Recipients::One(peers[1].clone()), request2)
.await
.expect("send failed");
let mut response10_received = false;
let mut response20_received = false;
for _ in 0..2 {
let collected = mon_out1.recv().await.unwrap();
assert_eq!(collected.handler, peers[1]);
assert_eq!(collected.count, 1);
match collected.response.id {
10 => {
assert_eq!(collected.response.result, 20);
response10_received = true;
}
20 => {
assert_eq!(collected.response.result, 40);
response20_received = true;
}
_ => panic!("Unexpected response ID"),
}
}
assert!(response10_received);
assert!(response20_received);
});
}
#[test_traced]
fn test_handler_no_response() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let scheme1 = schemes.next().unwrap();
let conn1 = connections.next().unwrap();
let req_conn1 = conn1.0;
let res_conn1 = conn1.1;
let (mon1, mut mon_out1) = MockMonitor::new();
let mut mailbox1 = setup_and_spawn_engine(
&context,
oracle.control(scheme1.public_key()),
scheme1,
(req_conn1, res_conn1),
mon1,
MockHandler::dummy(),
);
let scheme2 = schemes.next().unwrap();
let conn2 = connections.next().unwrap();
let req_conn2 = conn2.0;
let res_conn2 = conn2.1;
let (handler2, mut handler_out2) = MockHandler::new(false);
let _mailbox2 = setup_and_spawn_engine(
&context,
oracle.control(scheme2.public_key()),
scheme2,
(req_conn2, res_conn2),
MockMonitor::dummy(),
handler2,
);
let request = Request { id: 100, data: 100 };
let recipients = mailbox1
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.expect("send failed");
assert_eq!(recipients, vec![peers[1].clone()]);
let processed = handler_out2.recv().await.unwrap();
assert_eq!(processed.origin, peers[0]);
assert_eq!(processed.request, request);
assert!(!processed.responded);
select! {
_ = mon_out1.recv() => {
panic!("Should not receive any monitor events");
},
_ = context.sleep(Duration::from_millis(1_000)) => {
},
}
});
}
#[test_traced]
fn test_empty_recipients() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (oracle, schemes, _, connections) = setup_network_and_peers(&context, &[0]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let req_conn = conn.0;
let res_conn = conn.1;
let (mon, mut mon_out) = MockMonitor::new();
let mut mailbox = setup_and_spawn_engine(
&context,
oracle.control(scheme.public_key()),
scheme,
(req_conn, res_conn),
mon,
MockHandler::dummy(),
);
let request = Request { id: 1, data: 1 };
let recipients = mailbox
.send(Recipients::All, request.clone())
.await
.expect("send failed");
assert_eq!(recipients, Vec::<PublicKey>::new());
select! {
_ = mon_out.recv() => {
panic!("Should not receive any monitor events");
},
_ = context.sleep(Duration::from_millis(1_000)) => {
},
}
});
}
#[test_traced]
fn test_send_fails_with_network_error() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let (_, receiver1) = conn.0; let sender1 = super::mocks::sender::Failing::<PublicKey>::new();
let (sender2, receiver2) = conn.1; let (engine, mut mailbox) = Engine::new(
context.with_label(&format!("engine_{}", scheme.public_key())),
Config {
blocker: oracle.control(scheme.public_key()),
monitor: MockMonitor::dummy(),
handler: MockHandler::dummy(),
mailbox_size: MAILBOX_SIZE,
priority_request: false,
request_codec: (),
priority_response: false,
response_codec: (),
},
);
engine.start((sender1, receiver1), (sender2, receiver2));
let request = Request { id: 1, data: 1 };
let err = mailbox
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.unwrap_err();
assert!(matches!(err, Error::SendFailed(_)));
});
}
#[test_traced]
fn test_send_fails_with_canceled() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
let scheme = schemes.next().unwrap();
let conn = connections.next().unwrap();
let (sender1, receiver1) = conn.0; let (sender2, receiver2) = conn.1; let (engine, mut mailbox) = Engine::new(
context.with_label(&format!("engine_{}", scheme.public_key())),
Config {
blocker: oracle.control(scheme.public_key()),
monitor: MockMonitor::dummy(),
handler: MockHandler::dummy(),
mailbox_size: MAILBOX_SIZE,
priority_request: false,
request_codec: (),
priority_response: false,
response_codec: (),
},
);
let handle = engine.start((sender1, receiver1), (sender2, receiver2));
handle.abort();
let request = Request { id: 1, data: 1 };
let err = mailbox
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.unwrap_err();
assert!(matches!(err, Error::Canceled));
});
}
#[test_traced]
fn test_response_from_unknown_peer() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1, 2]).await;
let mut schemes = schemes.into_iter();
let mut connections = connections.into_iter();
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
let scheme1 = schemes.next().unwrap();
let conn1 = connections.next().unwrap();
let req_conn1 = conn1.0;
let res_conn1 = conn1.1;
let (mon1, mut mon_out1) = MockMonitor::new();
let mut mailbox1 = setup_and_spawn_engine(
&context,
oracle.control(scheme1.public_key()),
scheme1,
(req_conn1, res_conn1),
mon1,
MockHandler::dummy(),
);
let scheme2 = schemes.next().unwrap();
let conn2 = connections.next().unwrap();
let req_conn2 = conn2.0;
let res_conn2 = conn2.1;
let (handler2, _) = MockHandler::new(true);
let _mailbox2 = setup_and_spawn_engine(
&context,
oracle.control(scheme2.public_key()),
scheme2,
(req_conn2, res_conn2),
MockMonitor::dummy(),
handler2,
);
let conn3 = connections.next().unwrap();
let mut res_conn3 = conn3.1;
let request_to_peer2 = Request { id: 42, data: 42 };
let recipients = mailbox1
.send(Recipients::One(peers[1].clone()), request_to_peer2.clone())
.await
.expect("send failed");
assert_eq!(recipients, vec![peers[1].clone()]);
let response_to_peer1 = Response { id: 42, result: 72 };
res_conn3
.0
.send(
Recipients::One(peers[0].clone()),
response_to_peer1.encode(),
true,
)
.await
.unwrap();
context.sleep(Duration::from_millis(1_000)).await;
let collected = mon_out1.recv().await.unwrap();
assert_eq!(collected.handler, peers[1]); assert_eq!(collected.response.id, 42);
assert_eq!(collected.response.result, 84); assert_eq!(collected.count, 1);
select! {
_ = mon_out1.recv() => {
panic!("Should not receive response from unknown peer");
},
_ = context.sleep(Duration::from_millis(1_000)) => {
},
}
});
}
#[allow(clippy::type_complexity)]
fn spawn_engines_with_handles(
context: deterministic::Context,
oracle: &Oracle<PublicKey, deterministic::Context>,
schemes: Vec<PrivateKey>,
connections: Vec<(
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
)>,
) -> (
Vec<Mailbox<PublicKey, Request>>,
Vec<commonware_runtime::Handle<()>>,
) {
let engine_context = context.with_label("engine");
let mut mailboxes = Vec::new();
let mut handles = Vec::new();
for (idx, (scheme, conn)) in schemes.into_iter().zip(connections).enumerate() {
let ctx = engine_context.with_label(&format!("peer_{idx}"));
let (mon, _) = MockMonitor::new();
let (handler, _) = MockHandler::new(true);
let (engine, mailbox) = Engine::new(
ctx,
Config {
blocker: oracle.control(scheme.public_key()),
monitor: mon,
handler,
mailbox_size: MAILBOX_SIZE,
priority_request: false,
request_codec: (),
priority_response: false,
response_codec: (),
},
);
handles.push(engine.start(conn.0, conn.1));
mailboxes.push(mailbox);
}
(mailboxes, handles)
}
#[test_traced]
fn test_operations_after_shutdown_do_not_panic() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (mut mailboxes, handles) =
spawn_engines_with_handles(context.clone(), &oracle, schemes, connections);
for handle in handles {
handle.abort();
}
let request = Request { id: 1, data: 1 };
let result = mailboxes[0]
.send(Recipients::One(peers[1].clone()), request.clone())
.await;
assert!(result.is_err(), "send after shutdown should return error");
mailboxes[0].cancel(request.commitment()).await;
});
}
fn clean_shutdown(seed: u64) {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let executor = deterministic::Runner::new(cfg);
executor.start(|context| async move {
let (mut oracle, schemes, peers, connections) =
setup_network_and_peers(&context, &[0, 1]).await;
add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
let (mut mailboxes, handles) =
spawn_engines_with_handles(context.clone(), &oracle, schemes, connections);
context.sleep(Duration::from_millis(100)).await;
let running_before = count_running_tasks(&context, "engine");
assert!(
running_before > 0,
"at least one engine task should be running"
);
let request = Request { id: 1, data: 1 };
let recipients = mailboxes[0]
.send(Recipients::One(peers[1].clone()), request.clone())
.await
.expect("send failed");
assert_eq!(recipients, vec![peers[1].clone()]);
for handle in handles {
handle.abort();
}
context.sleep(Duration::from_millis(100)).await;
let running_after = count_running_tasks(&context, "engine");
assert_eq!(
running_after, 0,
"all engine tasks should be stopped, but {running_after} still running"
);
});
}
#[test]
fn test_clean_shutdown() {
for seed in 0..25 {
clean_shutdown(seed);
}
}
}