pub mod scheme;
pub mod types;
cfg_if::cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] {
mod ack_manager;
use ack_manager::AckManager;
mod config;
pub use config::Config;
mod engine;
pub use engine::Engine;
mod metrics;
mod tip_manager;
use tip_manager::TipManager;
}
}
#[cfg(test)]
pub mod mocks;
#[cfg(test)]
mod tests {
use super::{
mocks,
types::{ChunkSigner, ChunkVerifier},
Config, Engine,
};
use crate::{
ordered_broadcast::scheme::{
bls12381_multisig, bls12381_threshold, ed25519, secp256r1, Scheme,
},
types::{Epoch, EpochDelta, Height, HeightDelta},
};
use commonware_cryptography::{
bls12381::primitives::variant::{MinPk, MinSig},
certificate::{self, mocks::Fixture},
ed25519::{PrivateKey, PublicKey},
sha256::Digest as Sha256Digest,
Signer as _,
};
use commonware_macros::{select, test_group, test_traced};
use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef,
deterministic::{self, Context},
Clock, Metrics, Quota, Runner, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
NZUsize, NZU16, NZU64,
};
use futures::future::join_all;
use std::{
collections::{BTreeMap, HashMap},
num::{NonZeroU16, NonZeroU32, NonZeroUsize},
time::Duration,
};
use tracing::debug;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
const TEST_NAMESPACE: &[u8] = b"ordered_broadcast_test";
type Registrations<P> = BTreeMap<
P,
(
(Sender<P, deterministic::Context>, Receiver<P>),
(Sender<P, deterministic::Context>, Receiver<P>),
),
>;
async fn register_participants(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
participants: &[PublicKey],
) -> Registrations<PublicKey> {
let mut registrations = BTreeMap::new();
for participant in participants.iter() {
let control = oracle.control(participant.clone());
let (a1, a2) = control.register(0, TEST_QUOTA).await.unwrap();
let (b1, b2) = control.register(1, TEST_QUOTA).await.unwrap();
registrations.insert(participant.clone(), ((a1, a2), (b1, b2)));
}
registrations
}
enum Action {
Link(Link),
Update(Link),
Unlink,
}
async fn link_participants(
oracle: &mut Oracle<PublicKey, deterministic::Context>,
participants: &[PublicKey],
action: Action,
restrict_to: Option<fn(usize, usize, usize) -> bool>,
) {
for (i1, v1) in participants.iter().enumerate() {
for (i2, v2) in participants.iter().enumerate() {
if v2 == v1 {
continue;
}
if let Some(f) = restrict_to {
if !f(participants.len(), i1, i2) {
continue;
}
}
if matches!(action, Action::Update(_) | Action::Unlink) {
oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
}
if let Action::Link(ref link) | Action::Update(ref link) = action {
oracle
.add_link(v1.clone(), v2.clone(), link.clone())
.await
.unwrap();
}
}
}
}
const RELIABLE_LINK: Link = Link {
latency: Duration::from_millis(10),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
async fn initialize_simulation<S: certificate::Scheme>(
context: Context,
fixture: &Fixture<S>,
link: Link,
) -> (
Oracle<PublicKey, deterministic::Context>,
Registrations<PublicKey>,
) {
let (network, mut oracle) = Network::new_with_peers(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
fixture.participants.clone(),
)
.await;
network.start();
let registrations = register_participants(&mut oracle, &fixture.participants).await;
link_participants(&mut oracle, &fixture.participants, Action::Link(link), None).await;
(oracle, registrations)
}
#[allow(clippy::too_many_arguments)]
fn spawn_validator_engines<S>(
context: Context,
fixture: &Fixture<S>,
sequencer_pks: &[PublicKey],
registrations: &mut Registrations<PublicKey>,
rebroadcast_timeout: Duration,
invalid_when: fn(Height) -> bool,
misses_allowed: Option<usize>,
epoch: Epoch,
) -> BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>
where
S: Scheme<PublicKey, Sha256Digest>,
{
let mut reporters = BTreeMap::new();
let namespace = b"my testing namespace";
for (idx, validator) in fixture.participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{validator}"));
let monitor = mocks::Monitor::new(epoch);
let sequencers = mocks::Sequencers::<PublicKey>::new(sequencer_pks.to_vec());
let validators_provider = mocks::Provider::new();
assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
let automaton = mocks::Automaton::<PublicKey>::new(invalid_when);
let chunk_verifier = ChunkVerifier::new(namespace);
let (reporter, reporter_mailbox) = mocks::Reporter::new(
context.clone(),
chunk_verifier.clone(),
fixture.verifier.clone(),
misses_allowed,
);
context.with_label("reporter").spawn(|_| reporter.run());
reporters.insert(validator.clone(), reporter_mailbox);
let engine = Engine::new(
context.with_label("engine"),
Config {
sequencer_signer: Some(ChunkSigner::new(
namespace,
fixture.private_keys[idx].clone(),
)),
chunk_verifier,
sequencers_provider: sequencers,
validators_provider,
automaton: automaton.clone(),
relay: automaton.clone(),
reporter: reporters.get(validator).unwrap().clone(),
monitor,
priority_proposals: false,
priority_acks: false,
rebroadcast_timeout,
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
height_bound: HeightDelta::new(2),
journal_heights_per_section: NZU64!(10),
journal_replay_buffer: NZUsize!(4096),
journal_write_buffer: NZUsize!(4096),
journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);
let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
engine.start((a1, a2), (b1, b2));
}
reporters
}
async fn await_reporters<S>(
context: Context,
sequencers: Vec<PublicKey>,
reporters: &BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>,
threshold: (Height, Epoch, bool),
) where
S: certificate::Scheme,
{
let (threshold_height, threshold_epoch, require_contiguous) =
(threshold.0, threshold.1, threshold.2);
let mut receivers = Vec::new();
for (reporter, mailbox) in reporters.iter() {
for sequencer in sequencers.iter() {
let (tx, rx) = oneshot::channel();
receivers.push(rx);
context.with_label("reporter_watcher").spawn({
let reporter = reporter.clone();
let sequencer = sequencer.clone();
let mut mailbox = mailbox.clone();
move |context| async move {
loop {
let (height, epoch) = mailbox
.get_tip(sequencer.clone())
.await
.unwrap_or((Height::zero(), Epoch::zero()));
debug!(height = %height, epoch = %epoch, ?sequencer, ?reporter, "reporter");
let contiguous_height = mailbox
.get_contiguous_tip(sequencer.clone())
.await
.unwrap_or(Height::zero());
if height >= threshold_height
&& epoch >= threshold_epoch
&& (!require_contiguous || contiguous_height >= threshold_height)
{
tx.send_lossy(sequencer.clone());
break;
}
context.sleep(Duration::from_millis(100)).await;
}
}
});
}
}
let results = join_all(receivers).await;
assert_eq!(results.len(), sequencers.len() * reporters.len());
for result in results {
assert!(result.is_ok(), "reporter was cancelled");
}
}
async fn get_max_height<S: certificate::Scheme>(
reporters: &mut BTreeMap<PublicKey, mocks::ReporterMailbox<PublicKey, S, Sha256Digest>>,
) -> Height {
let mut max_height = Height::zero();
for (sequencer, mailbox) in reporters.iter_mut() {
let (height, _) = mailbox
.get_tip(sequencer.clone())
.await
.unwrap_or((Height::zero(), Epoch::zero()));
if height > max_height {
max_height = height;
}
}
max_height
}
fn all_online<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(120));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let (_oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validators"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(5),
|_| false,
Some(5),
epoch,
);
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(Height::new(100), epoch, true),
)
.await;
});
}
#[test_group("slow")]
#[test_traced]
fn test_all_online() {
all_online(bls12381_threshold::fixture::<MinPk, _>);
all_online(bls12381_threshold::fixture::<MinSig, _>);
all_online(bls12381_multisig::fixture::<MinPk, _>);
all_online(bls12381_multisig::fixture::<MinSig, _>);
all_online(ed25519::fixture);
all_online(secp256r1::fixture);
}
fn unclean_shutdown<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: Fn(&mut deterministic::Context, &[u8], u32) -> Fixture<S> + Clone,
{
let mut prev_checkpoint = None;
let epoch = Epoch::new(111);
let num_validators = 4;
let crash_after = Duration::from_secs(5);
let target_height = Height::new(30);
loop {
let fixture = fixture.clone();
let f = |mut context: deterministic::Context| async move {
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let (network, mut oracle) = Network::new_with_peers(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
fixture.participants.clone(),
)
.await;
network.start();
let mut registrations =
register_participants(&mut oracle, &fixture.participants).await;
link_participants(
&mut oracle,
&fixture.participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
let reporters = spawn_validator_engines(
context.with_label("validator"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(5),
|_| false,
None,
epoch,
);
let crash = context.sleep(crash_after);
let run = await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(target_height, epoch, true),
);
select! {
_ = crash => false,
_ = run => true,
}
};
let (complete, checkpoint) = prev_checkpoint
.map_or_else(
|| deterministic::Runner::timed(Duration::from_secs(180)),
deterministic::Runner::from,
)
.start_and_recover(f);
if complete {
break;
}
prev_checkpoint = Some(checkpoint);
}
}
#[test_group("slow")]
#[test_traced]
fn test_unclean_shutdown() {
unclean_shutdown(bls12381_threshold::fixture::<MinPk, _>);
unclean_shutdown(bls12381_threshold::fixture::<MinSig, _>);
unclean_shutdown(bls12381_multisig::fixture::<MinPk, _>);
unclean_shutdown(bls12381_multisig::fixture::<MinSig, _>);
unclean_shutdown(ed25519::fixture);
unclean_shutdown(secp256r1::fixture);
}
fn network_partition<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let mut reporters = spawn_validator_engines(
context.with_label("validators"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(1),
|_| false,
None,
epoch,
);
link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
context.sleep(Duration::from_secs(30)).await;
let max_height = get_max_height(&mut reporters).await;
link_participants(
&mut oracle,
&fixture.participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(
max_height.saturating_add(HeightDelta::new(100)),
epoch,
false,
),
)
.await;
});
}
#[test_group("slow")]
#[test_traced]
fn test_network_partition() {
network_partition(bls12381_threshold::fixture::<MinPk, _>);
network_partition(bls12381_threshold::fixture::<MinSig, _>);
network_partition(bls12381_multisig::fixture::<MinPk, _>);
network_partition(bls12381_multisig::fixture::<MinSig, _>);
network_partition(ed25519::fixture);
network_partition(secp256r1::fixture);
}
fn slow_and_lossy_links<S, F>(fixture: F, seed: u64) -> String
where
S: Scheme<PublicKey, Sha256Digest>,
F: Fn(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let cfg = deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(40)));
let runner = deterministic::Runner::new(cfg);
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let delayed_link = Link {
latency: Duration::from_millis(50),
jitter: Duration::from_millis(40),
success_rate: 0.5,
};
link_participants(
&mut oracle,
&fixture.participants,
Action::Update(delayed_link),
None,
)
.await;
let reporters = spawn_validator_engines(
context.with_label("validators"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_millis(150),
|_| false,
None,
epoch,
);
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(Height::new(40), epoch, false),
)
.await;
context.auditor().state()
})
}
#[test_group("slow")]
#[test_traced]
fn test_slow_and_lossy_links() {
slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, 0);
slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, 0);
slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, 0);
slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, 0);
slow_and_lossy_links(ed25519::fixture, 0);
slow_and_lossy_links(secp256r1::fixture, 0);
}
#[test_group("slow")]
#[test_traced]
fn test_determinism() {
for seed in 1..6 {
let ts_pk_state_1 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
let ts_pk_state_2 = slow_and_lossy_links(bls12381_threshold::fixture::<MinPk, _>, seed);
assert_eq!(ts_pk_state_1, ts_pk_state_2);
let ts_sig_state_1 =
slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
let ts_sig_state_2 =
slow_and_lossy_links(bls12381_threshold::fixture::<MinSig, _>, seed);
assert_eq!(ts_sig_state_1, ts_sig_state_2);
let ed_state_1 = slow_and_lossy_links(ed25519::fixture, seed);
let ed_state_2 = slow_and_lossy_links(ed25519::fixture, seed);
assert_eq!(ed_state_1, ed_state_2);
let secp_state_1 = slow_and_lossy_links(secp256r1::fixture, seed);
let secp_state_2 = slow_and_lossy_links(secp256r1::fixture, seed);
assert_eq!(secp_state_1, secp_state_2);
let ms_pk_state_1 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
let ms_pk_state_2 = slow_and_lossy_links(bls12381_multisig::fixture::<MinPk, _>, seed);
assert_eq!(ms_pk_state_1, ms_pk_state_2);
let ms_sig_state_1 =
slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
let ms_sig_state_2 =
slow_and_lossy_links(bls12381_multisig::fixture::<MinSig, _>, seed);
assert_eq!(ms_sig_state_1, ms_sig_state_2);
let states = [
("threshold-minpk", ts_pk_state_1),
("threshold-minsig", ts_sig_state_1),
("multisig-minpk", ms_pk_state_1),
("multisig-minsig", ms_sig_state_1),
("ed25519", ed_state_1),
("secp256r1", secp_state_1),
];
for pair in states.windows(2) {
assert_ne!(
pair[0].1, pair[1].1,
"state {} equals state {}",
pair[0].0, pair[1].0
);
}
}
}
fn invalid_signature_injection<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let (_oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let reporters = spawn_validator_engines(
context.with_label("validators"),
&fixture,
&fixture.participants,
&mut registrations,
Duration::from_secs(5),
|i| i.get() % 10 == 0,
None,
epoch,
);
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(Height::new(100), epoch, true),
)
.await;
});
}
#[test_group("slow")]
#[test_traced]
fn test_invalid_signature_injection() {
invalid_signature_injection(bls12381_threshold::fixture::<MinPk, _>);
invalid_signature_injection(bls12381_threshold::fixture::<MinSig, _>);
invalid_signature_injection(bls12381_multisig::fixture::<MinPk, _>);
invalid_signature_injection(bls12381_multisig::fixture::<MinSig, _>);
invalid_signature_injection(ed25519::fixture);
invalid_signature_injection(secp256r1::fixture);
}
fn updated_epoch<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
let mut reporters = BTreeMap::new();
let mut validators_providers = HashMap::new();
let mut monitors = HashMap::new();
let namespace = b"my testing namespace";
for (idx, validator) in fixture.participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{validator}"));
let monitor = mocks::Monitor::new(epoch);
monitors.insert(validator.clone(), monitor.clone());
let sequencers = mocks::Sequencers::<PublicKey>::new(fixture.participants.clone());
let validators_provider = mocks::Provider::new();
assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
validators_providers.insert(validator.clone(), validators_provider.clone());
let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
let chunk_verifier = ChunkVerifier::new(namespace);
let (reporter, reporter_mailbox) = mocks::Reporter::new(
context.clone(),
chunk_verifier.clone(),
fixture.verifier.clone(),
Some(5),
);
context.with_label("reporter").spawn(|_| reporter.run());
reporters.insert(validator.clone(), reporter_mailbox);
let engine = Engine::new(
context.with_label("engine"),
Config {
sequencer_signer: Some(ChunkSigner::new(
namespace,
fixture.private_keys[idx].clone(),
)),
chunk_verifier,
sequencers_provider: sequencers,
validators_provider,
relay: automaton.clone(),
automaton: automaton.clone(),
reporter: reporters.get(validator).unwrap().clone(),
monitor,
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
height_bound: HeightDelta::new(2),
rebroadcast_timeout: Duration::from_secs(1),
priority_acks: false,
priority_proposals: false,
journal_heights_per_section: NZU64!(10),
journal_replay_buffer: NZUsize!(4096),
journal_write_buffer: NZUsize!(4096),
journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
engine.start((a1, a2), (b1, b2));
}
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(Height::new(100), epoch, true),
)
.await;
link_participants(&mut oracle, &fixture.participants, Action::Unlink, None).await;
context.sleep(Duration::from_secs(30)).await;
let max_height = get_max_height(&mut reporters).await;
let next_epoch = epoch.next();
for (validator, monitor) in monitors.iter() {
monitor.update(next_epoch);
let idx = fixture
.participants
.iter()
.position(|v| v == validator)
.unwrap();
let validators_provider = validators_providers.get(validator).unwrap();
assert!(validators_provider.register(next_epoch, fixture.schemes[idx].clone()));
}
link_participants(
&mut oracle,
&fixture.participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
await_reporters(
context.with_label("reporter"),
reporters.keys().cloned().collect::<Vec<_>>(),
&reporters,
(
max_height.saturating_add(HeightDelta::new(100)),
next_epoch,
true,
),
)
.await;
});
}
#[test_group("slow")]
#[test_traced]
fn test_updated_epoch() {
updated_epoch(bls12381_threshold::fixture::<MinPk, _>);
updated_epoch(bls12381_threshold::fixture::<MinSig, _>);
updated_epoch(bls12381_multisig::fixture::<MinPk, _>);
updated_epoch(bls12381_multisig::fixture::<MinSig, _>);
updated_epoch(ed25519::fixture);
updated_epoch(secp256r1::fixture);
}
fn external_sequencer<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 4;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let sequencer = PrivateKey::from_seed(u64::MAX);
let mut participants = fixture.participants.clone();
participants.push(sequencer.public_key());
let (network, mut oracle) = Network::new_with_peers(
context.with_label("network"),
commonware_p2p::simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: NZUsize!(1),
},
participants.clone(),
)
.await;
network.start();
let mut registrations = register_participants(&mut oracle, &participants).await;
link_participants(
&mut oracle,
&participants,
Action::Link(RELIABLE_LINK),
None,
)
.await;
let mut reporters = BTreeMap::new();
let namespace = b"my testing namespace";
for (idx, validator) in fixture.participants.iter().enumerate() {
let context = context.with_label(&format!("validator_{validator}"));
let monitor = mocks::Monitor::new(epoch);
let sequencers = mocks::Sequencers::<PublicKey>::new(vec![sequencer.public_key()]);
let validators_provider = mocks::Provider::new();
assert!(validators_provider.register(epoch, fixture.schemes[idx].clone()));
let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
let chunk_verifier = ChunkVerifier::new(namespace);
let (reporter, reporter_mailbox) = mocks::Reporter::new(
context.clone(),
chunk_verifier.clone(),
fixture.verifier.clone(),
Some(5),
);
context.with_label("reporter").spawn(|_| reporter.run());
reporters.insert(validator.clone(), reporter_mailbox);
let engine = Engine::new(
context.with_label("engine"),
Config {
sequencer_signer: None::<ChunkSigner<PrivateKey>>, chunk_verifier,
sequencers_provider: sequencers,
validators_provider,
relay: automaton.clone(),
automaton: automaton.clone(),
reporter: reporters.get(validator).unwrap().clone(),
monitor,
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
height_bound: HeightDelta::new(2),
rebroadcast_timeout: Duration::from_secs(5),
priority_acks: false,
priority_proposals: false,
journal_heights_per_section: NZU64!(10),
journal_replay_buffer: NZUsize!(4096),
journal_write_buffer: NZUsize!(4096),
journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
engine.start((a1, a2), (b1, b2));
}
{
let context = context.with_label("sequencer");
let automaton = mocks::Automaton::<PublicKey>::new(|_| false);
let chunk_verifier = ChunkVerifier::new(namespace);
let (reporter, reporter_mailbox) = mocks::Reporter::new(
context.clone(),
chunk_verifier.clone(),
fixture.verifier.clone(),
Some(5),
);
context.with_label("reporter").spawn(|_| reporter.run());
reporters.insert(sequencer.public_key(), reporter_mailbox);
let validators_provider = mocks::Provider::new();
assert!(validators_provider.register(epoch, fixture.verifier.clone()));
let engine = Engine::new(
context.with_label("engine"),
Config {
sequencer_signer: Some(ChunkSigner::new(namespace, sequencer.clone())),
chunk_verifier,
sequencers_provider: mocks::Sequencers::<PublicKey>::new(vec![
sequencer.public_key()
]),
validators_provider,
relay: automaton.clone(),
automaton,
reporter: reporters.get(&sequencer.public_key()).unwrap().clone(),
monitor: mocks::Monitor::new(epoch),
epoch_bounds: (EpochDelta::new(1), EpochDelta::new(1)),
height_bound: HeightDelta::new(2),
rebroadcast_timeout: Duration::from_secs(5),
priority_acks: false,
priority_proposals: false,
journal_heights_per_section: NZU64!(10),
journal_replay_buffer: NZUsize!(4096),
journal_write_buffer: NZUsize!(4096),
journal_name_prefix: format!(
"ordered-broadcast-seq-{}-",
sequencer.public_key()
),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(
&context,
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);
let ((a1, a2), (b1, b2)) = registrations.remove(&sequencer.public_key()).unwrap();
engine.start((a1, a2), (b1, b2));
}
await_reporters(
context.with_label("reporter"),
vec![sequencer.public_key()],
&reporters,
(Height::new(100), epoch, true),
)
.await;
});
}
#[test_group("slow")]
#[test_traced]
fn test_external_sequencer() {
external_sequencer(bls12381_threshold::fixture::<MinPk, _>);
external_sequencer(bls12381_threshold::fixture::<MinSig, _>);
external_sequencer(bls12381_multisig::fixture::<MinPk, _>);
external_sequencer(bls12381_multisig::fixture::<MinSig, _>);
external_sequencer(ed25519::fixture);
external_sequencer(secp256r1::fixture);
}
fn run_1k<S, F>(fixture: F)
where
S: Scheme<PublicKey, Sha256Digest>,
F: FnOnce(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let cfg = deterministic::Config::new();
let runner = deterministic::Runner::new(cfg);
runner.start(|mut context| async move {
let epoch = Epoch::new(111);
let num_validators = 10;
let fixture = fixture(&mut context, TEST_NAMESPACE, num_validators);
let delayed_link = Link {
latency: Duration::from_millis(80),
jitter: Duration::from_millis(10),
success_rate: 0.98,
};
let (mut oracle, mut registrations) =
initialize_simulation(context.with_label("simulation"), &fixture, RELIABLE_LINK)
.await;
link_participants(
&mut oracle,
&fixture.participants,
Action::Update(delayed_link),
None,
)
.await;
let sequencers: Vec<PublicKey> =
fixture.participants[0..num_validators as usize / 2].to_vec();
let reporters = spawn_validator_engines(
context.with_label("validators"),
&fixture,
&sequencers,
&mut registrations,
Duration::from_millis(150),
|_| false,
None,
epoch,
);
await_reporters(
context.with_label("reporter"),
sequencers,
&reporters,
(Height::new(1_000), epoch, false),
)
.await;
})
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_threshold_min_pk() {
run_1k(bls12381_threshold::fixture::<MinPk, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_threshold_min_sig() {
run_1k(bls12381_threshold::fixture::<MinSig, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_multisig_min_pk() {
run_1k(bls12381_multisig::fixture::<MinPk, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_bls12381_multisig_min_sig() {
run_1k(bls12381_multisig::fixture::<MinSig, _>);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_ed25519() {
run_1k(ed25519::fixture);
}
#[test_group("slow")]
#[test_traced]
fn test_1k_secp256r1() {
run_1k(secp256r1::fixture);
}
}