mod config;
pub use config::Config;
mod engine;
pub use engine::Engine;
mod ingress;
pub use ingress::Mailbox;
pub(crate) use ingress::Message;
mod metrics;
#[cfg(test)]
pub mod mocks;
#[cfg(test)]
mod tests {
use super::{mocks::TestMessage, *};
use crate::Broadcaster;
use commonware_codec::RangeCfg;
use commonware_cryptography::{
ed25519::{PrivateKey, PublicKey},
Digestible, Hasher, Sha256, Signer as _,
};
use commonware_macros::test_traced;
use commonware_p2p::{
simulated::{Link, Network, Oracle, Receiver, Sender},
Manager as _, Recipients, Sender as _, TrackedPeers,
};
use commonware_runtime::{
count_running_tasks, deterministic, Clock, Error, IoBuf, Metrics, Quota, Runner,
};
use commonware_utils::NZUsize;
use std::{collections::BTreeMap, num::NonZeroU32, time::Duration};
const CACHE_SIZE: usize = 10;
const A_JIFFY: Duration = Duration::from_millis(10);
const NETWORK_SPEED: Duration = Duration::from_millis(100);
const NETWORK_SPEED_WITH_BUFFER: Duration = Duration::from_millis(200);
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
type Registrations = BTreeMap<
PublicKey,
(
Sender<PublicKey, deterministic::Context>,
Receiver<PublicKey>,
),
>;
async fn initialize_simulation(
context: deterministic::Context,
num_peers: u32,
success_rate: f64,
) -> (
Vec<PublicKey>,
Registrations,
Oracle<PublicKey, deterministic::Context>,
) {
let (network, oracle) = Network::<deterministic::Context, PublicKey>::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut schemes = (0..num_peers)
.map(|i| PrivateKey::from_seed(i as u64))
.collect::<Vec<_>>();
schemes.sort_by_key(|s| s.public_key());
let peers: Vec<PublicKey> = schemes.iter().map(|c| c.public_key()).collect();
let mut registrations: Registrations = BTreeMap::new();
for peer in peers.iter() {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
registrations.insert(peer.clone(), (sender, receiver));
}
let link = Link {
latency: NETWORK_SPEED,
jitter: Duration::ZERO,
success_rate,
};
for p1 in peers.iter() {
for p2 in peers.iter() {
if p2 == p1 {
continue;
}
oracle
.add_link(p1.clone(), p2.clone(), link.clone())
.await
.unwrap();
}
}
let all_peers = commonware_utils::ordered::Set::from_iter_dedup(peers.clone());
oracle.manager().track(0, all_peers).await;
(peers, registrations, oracle)
}
async fn spawn_peer_engines(
context: deterministic::Context,
oracle: &Oracle<PublicKey, deterministic::Context>,
registrations: &mut Registrations,
) -> BTreeMap<PublicKey, Mailbox<PublicKey, TestMessage>> {
let mut mailboxes = BTreeMap::new();
while let Some((peer, network)) = registrations.pop_first() {
let context = context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, engine_mailbox) =
Engine::<_, PublicKey, TestMessage, _>::new(context.clone(), config);
mailboxes.insert(peer.clone(), engine_mailbox);
engine.start(network);
}
context.sleep(A_JIFFY).await;
mailboxes
}
#[test_traced]
fn test_broadcast() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 4, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let message = TestMessage::shared(b"hello world test message");
let first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
let result = first_mailbox
.broadcast(Recipients::All, message.clone())
.await;
context.sleep(Duration::from_secs(1)).await;
for peer in peers.iter() {
let mailbox = mailboxes.get(peer).unwrap().clone();
let digest = message.digest();
let receiver = mailbox.subscribe(digest).await;
let received_message = receiver.await.ok();
assert_eq!(received_message.unwrap(), message.clone());
}
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
let message = TestMessage::shared(b"hello world again");
let result = first_mailbox
.broadcast(Recipients::All, message.clone())
.await;
drop(result);
context.sleep(Duration::from_secs(1)).await;
let mut found = 0;
for peer in peers.iter() {
let mailbox = mailboxes.get(peer).unwrap().clone();
let digest = message.digest();
let receiver = mailbox.get(digest).await;
if let Some(receiver) = receiver {
assert_eq!(receiver, message.clone());
found += 1;
}
}
assert!(found > 0, "No peers received the message");
});
}
#[test_traced]
fn test_self_retrieval() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 1, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let mailbox_a = mailboxes.get(&peers[0]).unwrap().clone();
let m1 = TestMessage::shared(b"hello world");
let digest_m1 = m1.digest();
let receiver_before = mailbox_a.get(digest_m1).await;
assert!(receiver_before.is_none());
let receiver_before = mailbox_a.subscribe(digest_m1).await;
let result = mailbox_a.broadcast(Recipients::All, m1.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
let msg_before = receiver_before
.await
.expect("Pre-broadcast retrieval failed");
assert_eq!(msg_before, m1);
let receiver_after = mailbox_a.get(digest_m1).await;
assert_eq!(receiver_after, Some(m1.clone()));
let receiver_after = mailbox_a.subscribe(digest_m1).await;
let start = context.current();
let msg_after = receiver_after
.await
.expect("Post-broadcast retrieval failed");
let duration = context.current().duration_since(start).unwrap();
assert_eq!(msg_after, m1);
assert!(duration < A_JIFFY, "get not instant");
});
}
#[test_traced]
fn test_packet_loss() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 10, 0.1).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let message = TestMessage::shared(b"hello world test message");
let first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
let digest = message.digest();
for i in 0..100 {
let result = first_mailbox
.broadcast(Recipients::All, message.clone())
.await;
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let mut all_received = true;
for peer in peers.iter() {
let mailbox = mailboxes.get(peer).unwrap().clone();
let receiver = mailbox.subscribe(digest).await;
let has = match context.timeout(A_JIFFY, receiver).await {
Ok(r) => r.is_ok(),
Err(Error::Timeout) => false,
Err(e) => panic!("unexpected error: {e:?}"),
};
all_received &= has;
}
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
if all_received {
assert!(i > 0, "Message received on first try");
return;
}
}
panic!("Not all peers received the message after retries");
});
}
#[test_traced]
fn test_get_cached() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 2, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let message = TestMessage::shared(b"cached message");
let first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
let result = first_mailbox
.broadcast(Recipients::All, message.clone())
.await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let digest = message.digest();
let mailbox = mailboxes.get(peers.last().unwrap()).unwrap().clone();
let receiver = mailbox.subscribe(digest).await;
let start = context.current();
let received = receiver.await.expect("failed to get cached message");
let duration = context.current().duration_since(start).unwrap();
assert_eq!(received, message);
assert!(duration < A_JIFFY, "get not instant",);
});
}
#[test_traced]
fn test_get_nonexistent() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 2, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let message = TestMessage::shared(b"future message");
let digest = message.digest();
let mailbox1 = mailboxes.get(&peers[0]).unwrap().clone();
let mailbox2 = mailboxes.get(&peers[1]).unwrap().clone();
let receiver = mailbox1.subscribe(digest).await;
let dummy1 = mailbox1.subscribe(digest).await;
let dummy2 = mailbox2.subscribe(digest).await;
drop(dummy1);
drop(dummy2);
let result = mailbox1.broadcast(Recipients::All, message.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let received = receiver.await.expect("receiver1 should get message");
assert_eq!(received, message);
});
}
#[test_traced]
fn test_cache_eviction_single_peer() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 2, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let mailbox = mailboxes.get(&peers[0]).unwrap().clone();
let mut messages = vec![];
for i in 0..CACHE_SIZE + 1 {
messages.push(TestMessage::shared(format!("message {i}").as_bytes()));
}
for message in messages.iter() {
let result = mailbox.broadcast(Recipients::All, message.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
}
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let peer_mailbox = mailboxes.get(&peers[1]).unwrap().clone();
for msg in messages.iter().skip(1) {
let result = peer_mailbox.subscribe(msg.digest()).await.await.unwrap();
assert_eq!(result, msg.clone());
}
let receiver = peer_mailbox.subscribe(messages[0].digest()).await;
match context.timeout(A_JIFFY, receiver).await {
Ok(_) => panic!("receiver should have failed"),
Err(Error::Timeout) => {} Err(e) => panic!("unexpected error: {e:?}"),
}
});
}
#[test_traced]
fn test_cache_eviction_multi_peer() {
let runner = deterministic::Runner::timed(Duration::from_secs(10));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 3, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let mailbox_a = mailboxes.get(&peers[0]).unwrap().clone();
let mailbox_b = mailboxes.get(&peers[1]).unwrap().clone();
let mailbox_c = mailboxes.get(&peers[2]).unwrap().clone();
let m1 = TestMessage::shared(b"message M1");
let digest_m1 = m1.digest();
let result = mailbox_a.broadcast(Recipients::All, m1.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let result = mailbox_c.broadcast(Recipients::All, m1.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let mut new_messages_a = Vec::with_capacity(CACHE_SIZE);
for i in 0..CACHE_SIZE {
new_messages_a.push(TestMessage::shared(format!("A{i}").as_bytes()));
}
for msg in &new_messages_a {
let result = mailbox_a.broadcast(Recipients::All, msg.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
}
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let receiver = mailbox_b.subscribe(digest_m1).await;
let received = receiver.await.expect("M1 should be retrievable");
assert_eq!(received, m1);
let mut new_messages_c = Vec::with_capacity(CACHE_SIZE);
for i in 0..CACHE_SIZE {
new_messages_c.push(TestMessage::shared(format!("C{i}").as_bytes()));
}
for msg in &new_messages_c {
let result = mailbox_c.broadcast(Recipients::All, msg.clone()).await;
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
}
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let receiver = mailbox_b.subscribe(digest_m1).await;
match context.timeout(A_JIFFY, receiver).await {
Ok(_) => panic!("M1 should not be retrievable"),
Err(Error::Timeout) => {} Err(e) => panic!("unexpected error: {e:?}"),
}
});
}
#[test_traced]
fn test_selective_recipients() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 4, 1.0).await;
let sender_pk = peers[0].clone();
let target_peer = peers[1].clone();
let non_target_peer = peers[2].clone();
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let sender_mb = mailboxes.get(&sender_pk).unwrap().clone();
let msg = TestMessage::shared(b"selective-broadcast");
let result = sender_mb
.broadcast(Recipients::One(target_peer.clone()), msg.clone())
.await;
assert_eq!(result.await.unwrap(), vec![target_peer.clone()]);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let got_target = mailboxes
.get(&target_peer)
.unwrap()
.clone()
.get(msg.digest())
.await;
assert_eq!(got_target, Some(msg.clone()));
let got_other = mailboxes
.get(&non_target_peer)
.unwrap()
.clone()
.get(msg.digest())
.await;
assert!(got_other.is_none());
});
}
#[test_traced]
fn test_ref_count_across_peers() {
let runner = deterministic::Runner::timed(Duration::from_secs(10));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 3, 1.0).await;
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let p0 = peers[0].clone();
let p1 = peers[1].clone();
let observer = peers[2].clone();
let mb0 = mailboxes.get(&p0).unwrap().clone();
let mb1 = mailboxes.get(&p1).unwrap().clone();
let obs = mailboxes.get(&observer).unwrap().clone();
let dup = TestMessage::shared(b"dup");
let digest = dup.digest();
mb0.broadcast(Recipients::All, dup.clone())
.await
.await
.unwrap();
mb1.broadcast(Recipients::All, dup.clone())
.await
.await
.unwrap();
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert_eq!(obs.get(digest).await, Some(dup.clone()));
for i in 0..CACHE_SIZE {
let spam = TestMessage::shared(format!("p0-{i}").into_bytes());
mb0.broadcast(Recipients::All, spam).await.await.unwrap();
}
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert_eq!(obs.get(digest).await, Some(dup.clone()));
for i in 0..CACHE_SIZE {
let spam = TestMessage::shared(format!("p1-{i}").into_bytes());
mb1.broadcast(Recipients::All, spam).await.await.unwrap();
}
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert!(obs.get(digest).await.is_none());
});
}
#[test_traced]
fn test_deterministic_retrieval() {
let run = |seed: u64| {
let config = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(5)));
let runner = deterministic::Runner::new(config);
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 1, 1.0).await;
let mailboxes =
spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let sender1 = peers[0].clone();
let mb1 = mailboxes.get(&sender1).unwrap().clone();
let m1 = TestMessage::shared(b"content-1");
let m2 = TestMessage::shared(b"content-2");
let m3 = TestMessage::shared(b"content-3");
mb1.broadcast(Recipients::All, m1.clone())
.await
.await
.unwrap();
mb1.broadcast(Recipients::All, m2.clone())
.await
.await
.unwrap();
mb1.broadcast(Recipients::All, m3.clone())
.await
.await
.unwrap();
let mut hasher = Sha256::default();
for msg in [&m1, &m2, &m3] {
if let Some(value) = mb1.get(msg.digest()).await {
hasher.update(&value.content);
}
}
hasher.finalize()
})
};
for seed in 0..10 {
let h1 = run(seed);
let h2 = run(seed);
assert_eq!(h1, h2, "Messages returned in different order for {seed}");
}
}
#[test_traced]
fn test_malformed_network_payload_does_not_break_valid_traffic() {
let runner = deterministic::Runner::timed(Duration::from_secs(10));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 3, 1.0).await;
let attacker = peers[0].clone();
let honest = peers[1].clone();
let victim = peers[2].clone();
let (mut attacker_sender, _) = registrations.remove(&attacker).unwrap();
let mailboxes = spawn_peer_engines(context.clone(), &oracle, &mut registrations).await;
let honest_mailbox = mailboxes.get(&honest).unwrap().clone();
let victim_mailbox = mailboxes.get(&victim).unwrap().clone();
let sent = attacker_sender
.send(
Recipients::One(victim.clone()),
IoBuf::from(vec![0xFF]),
false,
)
.await
.expect("malformed payload send should not fail at transport level");
assert_eq!(sent, vec![victim.clone()]);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let message = TestMessage::shared(b"valid-after-malformed");
let result = honest_mailbox
.broadcast(Recipients::One(victim.clone()), message.clone())
.await;
assert_eq!(result.await.unwrap(), vec![victim.clone()]);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let received = victim_mailbox
.subscribe(message.digest())
.await
.await
.expect("victim should receive valid message after malformed payload");
assert_eq!(received, message);
});
}
#[test_traced]
fn test_dropped_waiters_for_missing_digest_are_cleaned_up() {
let runner = deterministic::Runner::timed(Duration::from_secs(10));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 1, 1.0).await;
let peer = peers[0].clone();
let (sender, receiver) = registrations.remove(&peer).unwrap();
let engine_context = context.with_label("waiter_cleanup");
let config = Config {
public_key: peer,
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) =
Engine::<_, PublicKey, TestMessage, _>::new(engine_context.clone(), config);
engine.start((sender, receiver));
let missing = TestMessage::shared(b"never-arrives");
let missing_digest = missing.digest();
let rx1 = mailbox.subscribe(missing_digest).await;
let rx2 = mailbox.subscribe(missing_digest).await;
let _ = mailbox
.get(TestMessage::shared(b"before-cleanup").digest())
.await;
context.sleep(A_JIFFY).await;
let metrics_before = engine_context.encode();
let waiter_values_before: Vec<f64> = metrics_before
.lines()
.filter(|line| {
line.starts_with("waiters")
|| (line.contains("_waiters")
&& !line.starts_with("# HELP")
&& !line.starts_with("# TYPE"))
})
.filter_map(|line| line.split_whitespace().last())
.filter_map(|value| value.parse::<f64>().ok())
.collect();
assert!(
!waiter_values_before.is_empty(),
"waiters metric not found in output:\n{metrics_before}"
);
assert!(
waiter_values_before.iter().any(|value| *value > 0.0),
"expected positive waiters before cleanup, got:\n{metrics_before}"
);
drop(rx1);
drop(rx2);
let _ = mailbox
.get(TestMessage::shared(b"after-cleanup").digest())
.await;
context.sleep(A_JIFFY).await;
let metrics_after = engine_context.encode();
let waiter_values_after: Vec<f64> = metrics_after
.lines()
.filter(|line| {
line.starts_with("waiters")
|| (line.contains("_waiters")
&& !line.starts_with("# HELP")
&& !line.starts_with("# TYPE"))
})
.filter_map(|line| line.split_whitespace().last())
.filter_map(|value| value.parse::<f64>().ok())
.collect();
assert!(
!waiter_values_after.is_empty(),
"waiters metric not found in output:\n{metrics_after}"
);
assert!(
waiter_values_after.iter().all(|value| *value == 0.0),
"expected zero retained waiters, got:\n{metrics_after}"
);
});
}
#[allow(clippy::type_complexity)]
async fn spawn_peer_engines_with_handles(
context: deterministic::Context,
oracle: &Oracle<PublicKey, deterministic::Context>,
registrations: &mut Registrations,
) -> (
BTreeMap<PublicKey, Mailbox<PublicKey, TestMessage>>,
Vec<commonware_runtime::Handle<()>>,
) {
let mut mailboxes = BTreeMap::new();
let mut handles = Vec::new();
let engine_context = context.with_label("engine");
while let Some((peer, network)) = registrations.pop_first() {
let ctx = engine_context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, engine_mailbox) =
Engine::<_, PublicKey, TestMessage, _>::new(ctx.clone(), config);
mailboxes.insert(peer.clone(), engine_mailbox);
handles.push(engine.start(network));
}
context.sleep(A_JIFFY).await;
(mailboxes, handles)
}
#[test_traced]
fn test_operations_after_shutdown_do_not_panic() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 2, 1.0).await;
let (mut mailboxes, handles) =
spawn_peer_engines_with_handles(context.clone(), &oracle, &mut registrations).await;
let message = TestMessage::shared(b"test message");
let mailbox = mailboxes.remove(&peers[0]).unwrap();
let result = mailbox
.broadcast(Recipients::All, message.clone())
.await
.await;
assert!(result.is_ok(), "broadcast should succeed before shutdown");
for handle in handles {
handle.abort();
}
context.sleep(Duration::from_millis(100)).await;
let result = mailbox
.broadcast(Recipients::All, message.clone())
.await
.await;
assert!(
result.is_err() || result.unwrap().is_empty(),
"broadcast after shutdown should fail or return empty"
);
let digest = message.digest();
let receiver = mailbox.subscribe(digest).await;
let result = receiver.await;
assert!(
result.is_err(),
"subscribe after shutdown should return Canceled"
);
let result = mailbox.get(digest).await;
assert!(result.is_none(), "get after shutdown should return None");
});
}
fn clean_shutdown(seed: u64) {
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(30)));
let runner = deterministic::Runner::new(cfg);
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 2, 1.0).await;
let (mailboxes, handles) =
spawn_peer_engines_with_handles(context.clone(), &oracle, &mut registrations).await;
context.sleep(Duration::from_millis(100)).await;
let running_before = count_running_tasks(&context, "engine");
assert!(
running_before > 0,
"at least one engine task should be running"
);
let message = TestMessage::shared(b"test message");
let mailbox = mailboxes.get(&peers[0]).unwrap().clone();
let result = mailbox
.broadcast(Recipients::All, message.clone())
.await
.await;
assert!(result.is_ok(), "broadcast should succeed");
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let peer_mailbox = mailboxes.get(&peers[1]).unwrap().clone();
let received = peer_mailbox.get(message.digest()).await;
assert_eq!(received, Some(message));
for handle in handles {
handle.abort();
}
context.sleep(Duration::from_millis(100)).await;
let running_after = count_running_tasks(&context, "engine");
assert_eq!(
running_after, 0,
"all engine tasks should be stopped, but {running_after} still running"
);
});
}
#[test]
fn test_clean_shutdown() {
for seed in 0..25 {
clean_shutdown(seed);
}
}
#[test_traced]
fn test_peer_set_update_evicts_disconnected_peer_buffers() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 3, 1.0).await;
let peer_a = peers[0].clone();
let peer_b = peers[1].clone();
let peer_c = peers[2].clone();
let network_b = registrations.remove(&peer_b).unwrap();
let config_b = Config {
public_key: peer_b.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine_b, mailbox_b) =
Engine::<_, PublicKey, TestMessage, _>::new(context.with_label("peer_b"), config_b);
engine_b.start(network_b);
let mut mailboxes = BTreeMap::new();
mailboxes.insert(peer_b.clone(), mailbox_b);
for (peer, network) in registrations {
let ctx = context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) = Engine::<_, PublicKey, TestMessage, _>::new(ctx, config);
mailboxes.insert(peer, mailbox);
engine.start(network);
}
context.sleep(A_JIFFY).await;
let msg = TestMessage::shared(b"eviction-test");
let mailbox_a = mailboxes.get(&peer_a).unwrap().clone();
let result = mailbox_a.broadcast(Recipients::All, msg.clone()).await;
assert_eq!(result.await.unwrap().len(), 2);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let mailbox_b = mailboxes.get(&peer_b).unwrap().clone();
assert_eq!(
mailbox_b.get(msg.digest()).await,
Some(msg.clone()),
"peer B should have the message before eviction"
);
let remaining = commonware_utils::ordered::Set::from_iter_dedup(vec![peer_b, peer_c]);
oracle.manager().track(1, remaining).await;
context.sleep(A_JIFFY).await;
assert!(
mailbox_b.get(msg.digest()).await.is_none(),
"message should be evicted after peer A left the peer set"
);
});
}
#[test_traced]
fn test_peer_set_update_evicts_peers_not_in_latest_set_even_if_still_in_overlap() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (network, oracle) = Network::<deterministic::Context, PublicKey>::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(2),
},
);
network.start();
let mut schemes = (0..3)
.map(|i| PrivateKey::from_seed(i as u64))
.collect::<Vec<_>>();
schemes.sort_by_key(|s| s.public_key());
let peers: Vec<PublicKey> = schemes.iter().map(|c| c.public_key()).collect();
let peer_a = peers[0].clone();
let peer_b = peers[1].clone();
let peer_c = peers[2].clone();
let mut registrations: Registrations = BTreeMap::new();
for peer in peers.iter() {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
registrations.insert(peer.clone(), (sender, receiver));
}
let link = Link {
latency: NETWORK_SPEED,
jitter: Duration::ZERO,
success_rate: 1.0,
};
for p1 in peers.iter() {
for p2 in peers.iter() {
if p2 != p1 {
oracle
.add_link(p1.clone(), p2.clone(), link.clone())
.await
.unwrap();
}
}
}
let all = commonware_utils::ordered::Set::from_iter_dedup(peers.clone());
oracle.manager().track(0, all).await;
let network_b = registrations.remove(&peer_b).unwrap();
let config_b = Config {
public_key: peer_b.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine_b, mailbox_b) =
Engine::<_, PublicKey, TestMessage, _>::new(context.with_label("peer_b"), config_b);
engine_b.start(network_b);
let mut mailboxes = BTreeMap::new();
mailboxes.insert(peer_b.clone(), mailbox_b);
for (peer, network) in registrations {
let ctx = context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) = Engine::<_, PublicKey, TestMessage, _>::new(ctx, config);
mailboxes.insert(peer, mailbox);
engine.start(network);
}
context.sleep(A_JIFFY).await;
let msg = TestMessage::shared(b"eviction-latest-test");
let mailbox_a = mailboxes.get(&peer_a).unwrap().clone();
let result = mailbox_a.broadcast(Recipients::All, msg.clone()).await;
assert_eq!(result.await.unwrap().len(), 2);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let mailbox_b = mailboxes.get(&peer_b).unwrap().clone();
assert_eq!(
mailbox_b.get(msg.digest()).await,
Some(msg.clone()),
"peer B should have the message before eviction"
);
let remaining = commonware_utils::ordered::Set::from_iter_dedup(vec![
peer_b.clone(),
peer_c.clone(),
]);
oracle.manager().track(1, remaining).await;
context.sleep(A_JIFFY).await;
assert!(
mailbox_b.get(msg.digest()).await.is_none(),
"message should be evicted: peer A is not in the latest peer set"
);
let fresh = TestMessage::shared(b"post-eviction-latest-test");
let result = mailbox_a.broadcast(Recipients::All, fresh.clone()).await;
assert_eq!(result.await.unwrap().len(), 2);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert!(
mailbox_b.get(fresh.digest()).await.is_none(),
"message should not be rebuffered after peer A left latest.primary"
);
});
}
#[test_traced]
fn test_initial_latest_peer_set_blocks_sender_not_in_latest_primary() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (network, oracle) = Network::<deterministic::Context, PublicKey>::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut schemes = (0..3)
.map(|i| PrivateKey::from_seed(i as u64))
.collect::<Vec<_>>();
schemes.sort_by_key(|s| s.public_key());
let peers: Vec<PublicKey> = schemes.iter().map(|c| c.public_key()).collect();
let peer_a = peers[0].clone();
let peer_b = peers[1].clone();
let peer_c = peers[2].clone();
let mut registrations: Registrations = BTreeMap::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
registrations.insert(peer.clone(), (sender, receiver));
}
let link = Link {
latency: NETWORK_SPEED,
jitter: Duration::ZERO,
success_rate: 1.0,
};
for p1 in &peers {
for p2 in &peers {
if p1 != p2 {
oracle
.add_link(p1.clone(), p2.clone(), link.clone())
.await
.unwrap();
}
}
}
let latest_primary = commonware_utils::ordered::Set::from_iter_dedup(vec![
peer_b.clone(),
peer_c.clone(),
]);
let latest_secondary =
commonware_utils::ordered::Set::from_iter_dedup(vec![peer_a.clone()]);
oracle
.manager()
.track(0, TrackedPeers::new(latest_primary, latest_secondary))
.await;
let mut mailboxes = BTreeMap::new();
for (peer, network) in registrations {
let ctx = context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) = Engine::<_, PublicKey, TestMessage, _>::new(ctx, config);
mailboxes.insert(peer, mailbox);
engine.start(network);
}
context.sleep(A_JIFFY).await;
let mailbox_a = mailboxes.get(&peer_a).unwrap().clone();
let mailbox_b = mailboxes.get(&peer_b).unwrap().clone();
let msg = TestMessage::shared(b"startup-latest-primary-only");
let result = mailbox_a.broadcast(Recipients::All, msg.clone()).await;
assert_eq!(
result.await.unwrap().len(),
2,
"Recipients::All still delivers to other peers; cache policy is separate"
);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert_eq!(
mailbox_a.get(msg.digest()).await,
None,
"sender not in latest.primary should not buffer, including own broadcasts"
);
assert!(
mailbox_b.get(msg.digest()).await.is_none(),
"peer B should not cache messages from a sender excluded by the initial latest.primary set"
);
});
}
#[test_traced]
fn test_broadcast_queued_before_start_respects_initial_latest_primary() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 1, 1.0).await;
let peer = peers[0].clone();
let network = registrations.remove(&peer).unwrap();
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) =
Engine::<_, PublicKey, TestMessage, _>::new(context.with_label("peer"), config);
let msg = TestMessage::shared(b"queued-before-start");
let result = mailbox.broadcast(Recipients::All, msg.clone()).await;
engine.start(network);
assert!(
result.await.unwrap().is_empty(),
"single-peer broadcast should have no recipients"
);
assert_eq!(
mailbox.get(msg.digest()).await,
Some(msg),
"sender is already in the initial latest.primary set, so its local broadcast should be cached"
);
});
}
#[test_traced]
fn test_engine_starts_before_initial_peer_set_and_delivers_after_tracking() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (network, oracle) = Network::<deterministic::Context, PublicKey>::new(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
);
network.start();
let mut schemes = (0..2)
.map(|i| PrivateKey::from_seed(i as u64))
.collect::<Vec<_>>();
schemes.sort_by_key(|s| s.public_key());
let peers: Vec<PublicKey> = schemes.iter().map(|c| c.public_key()).collect();
let peer_a = peers[0].clone();
let peer_b = peers[1].clone();
let mut registrations: Registrations = BTreeMap::new();
for peer in &peers {
let (sender, receiver) = oracle
.control(peer.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
registrations.insert(peer.clone(), (sender, receiver));
}
let link = Link {
latency: NETWORK_SPEED,
jitter: Duration::ZERO,
success_rate: 1.0,
};
for p1 in &peers {
for p2 in &peers {
if p1 != p2 {
oracle
.add_link(p1.clone(), p2.clone(), link.clone())
.await
.unwrap();
}
}
}
let mut mailboxes = BTreeMap::new();
for (peer, network) in registrations {
let ctx = context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) = Engine::<_, PublicKey, TestMessage, _>::new(ctx, config);
mailboxes.insert(peer, mailbox);
engine.start(network);
}
let mailbox_a = mailboxes.get(&peer_a).unwrap().clone();
let mailbox_b = mailboxes.get(&peer_b).unwrap().clone();
let before = TestMessage::shared(b"before-tracking");
let result = mailbox_a.broadcast(Recipients::All, before.clone()).await;
assert_eq!(
result.await.unwrap().len(),
0,
"simulated network drops until a peer set is tracked"
);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert_eq!(
mailbox_a.get(before.digest()).await,
None,
"without latest.primary, local broadcasts are not buffered"
);
assert!(
mailbox_b.get(before.digest()).await.is_none(),
"without latest.primary, remote peers do not cache inbound messages"
);
oracle
.manager()
.track(
0,
commonware_utils::ordered::Set::from_iter_dedup(peers.clone()),
)
.await;
context.sleep(A_JIFFY).await;
let after = TestMessage::shared(b"after-tracking");
let result = mailbox_a.broadcast(Recipients::All, after.clone()).await;
assert_eq!(result.await.unwrap().len(), 1);
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
assert_eq!(mailbox_b.get(after.digest()).await, Some(after));
});
}
#[test_traced]
fn test_peer_set_update_preserves_shared_messages() {
let runner = deterministic::Runner::timed(Duration::from_secs(5));
runner.start(|context| async move {
let (peers, mut registrations, oracle) =
initialize_simulation(context.clone(), 3, 1.0).await;
let peer_a = peers[0].clone();
let peer_b = peers[1].clone();
let peer_c = peers[2].clone();
let network_b = registrations.remove(&peer_b).unwrap();
let config_b = Config {
public_key: peer_b.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine_b, mailbox_b) =
Engine::<_, PublicKey, TestMessage, _>::new(context.with_label("peer_b"), config_b);
engine_b.start(network_b);
let mut mailboxes = BTreeMap::new();
mailboxes.insert(peer_b.clone(), mailbox_b);
for (peer, network) in registrations {
let ctx = context.with_label(&format!("peer_{}", peer));
let config = Config {
public_key: peer.clone(),
mailbox_size: 1024,
deque_size: CACHE_SIZE,
priority: false,
codec_config: RangeCfg::from(..),
peer_provider: oracle.manager(),
};
let (engine, mailbox) = Engine::<_, PublicKey, TestMessage, _>::new(ctx, config);
mailboxes.insert(peer, mailbox);
engine.start(network);
}
context.sleep(A_JIFFY).await;
let msg = TestMessage::shared(b"shared-msg");
let mailbox_a = mailboxes.get(&peer_a).unwrap().clone();
let mailbox_c = mailboxes.get(&peer_c).unwrap().clone();
mailbox_a.broadcast(Recipients::All, msg.clone()).await;
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
mailbox_c.broadcast(Recipients::All, msg.clone()).await;
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
let mailbox_b = mailboxes.get(&peer_b).unwrap().clone();
assert_eq!(mailbox_b.get(msg.digest()).await, Some(msg.clone()));
let remaining = commonware_utils::ordered::Set::from_iter_dedup(vec![peer_b, peer_c]);
oracle.manager().track(1, remaining).await;
context.sleep(A_JIFFY).await;
assert_eq!(
mailbox_b.get(msg.digest()).await,
Some(msg.clone()),
"message should survive when another peer in the primary set still references it"
);
});
}
}