use crate::{
marshal::{
coding::{
shards,
types::{coding_config_for_participants, CodedBlock},
Coding,
},
config::Config,
core::{Actor, Mailbox},
mocks::{application::Application, block::Block},
resolver::p2p as resolver,
standard::Standard,
Identifier,
},
simplex::{
scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
types::{Activity, Context, Finalization, Finalize, Notarization, Notarize, Proposal},
},
types::{coding::Commitment, Epoch, Epocher, FixedEpocher, Height, Round, View, ViewDelta},
Heightable, Reporter,
};
use commonware_broadcast::buffered;
use commonware_coding::{CodecConfig, ReedSolomon};
use commonware_cryptography::{
bls12381::primitives::variant::MinPk,
certificate::{mocks::Fixture, ConstantProvider, Provider, Scheme as _},
ed25519::{PrivateKey, PublicKey},
sha256::{Digest as Sha256Digest, Sha256},
Committable, Digest as DigestTrait, Digestible, Hasher as _, Signer,
};
use commonware_p2p::simulated::{self, Link, Network, Oracle};
use commonware_parallel::Sequential;
use commonware_runtime::{buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner};
use commonware_storage::{
archive::{immutable, prunable},
translator::EightCap,
};
use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU16, NZU64};
use futures::StreamExt;
use rand::{
seq::{IteratorRandom, SliceRandom},
Rng,
};
use std::{
collections::BTreeMap,
future::Future,
num::{NonZeroU16, NonZeroU32, NonZeroU64, NonZeroUsize},
time::{Duration, Instant},
};
use tracing::info;
pub type D = Sha256Digest;
pub type K = PublicKey;
pub type Ctx = Context<D, K>;
pub type B = Block<D, Ctx>;
pub type V = MinPk;
pub type S = bls12381_threshold_vrf::Scheme<K, V>;
pub type P = ConstantProvider<S, Epoch>;
pub type CodingCtx = Context<Commitment, K>;
pub type CodingB = Block<D, CodingCtx>;
pub const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
pub const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
pub const NAMESPACE: &[u8] = b"test";
pub const NUM_VALIDATORS: u32 = 4;
pub const QUORUM: u32 = 3;
pub const NUM_BLOCKS: u64 = 160;
pub const BLOCKS_PER_EPOCH: NonZeroU64 = NZU64!(20);
pub const LINK: Link = Link {
latency: Duration::from_millis(100),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
pub const UNRELIABLE_LINK: Link = Link {
latency: Duration::from_millis(200),
jitter: Duration::from_millis(50),
success_rate: 0.7,
};
pub const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
#[derive(Clone)]
pub struct EmptyProvider;
impl Provider for EmptyProvider {
type Scope = Epoch;
type Scheme = S;
fn scoped(&self, _scope: Epoch) -> Option<std::sync::Arc<S>> {
None
}
}
pub fn default_leader() -> K {
PrivateKey::from_seed(0).public_key()
}
pub fn make_raw_block(parent: D, height: Height, timestamp: u64) -> B {
let parent_view = height
.previous()
.map(|h| View::new(h.get()))
.unwrap_or(View::zero());
let context = Ctx {
round: Round::new(Epoch::zero(), View::new(height.get())),
leader: default_leader(),
parent: (parent_view, parent),
};
B::new::<Sha256>(context, parent, height, timestamp)
}
pub async fn setup_network_with_participants<I>(
context: deterministic::Context,
tracked_peer_sets: NonZeroUsize,
participants: I,
) -> Oracle<K, deterministic::Context>
where
I: IntoIterator<Item = K>,
{
let (network, oracle) = Network::new_with_peers(
context.with_label("network"),
simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets,
},
participants,
)
.await;
network.start();
oracle
}
pub async fn setup_network_links(
oracle: &mut Oracle<K, deterministic::Context>,
peers: &[K],
link: Link,
) {
for p1 in peers.iter() {
for p2 in peers.iter() {
if p2 == p1 {
continue;
}
let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await;
}
}
}
pub struct ValidatorSetup<H: TestHarness> {
pub application: Application<H::ApplicationBlock>,
pub mailbox: Mailbox<S, H::Variant>,
pub extra: H::ValidatorExtra,
pub height: Height,
}
pub struct ValidatorHandle<H: TestHarness> {
pub mailbox: Mailbox<S, H::Variant>,
pub extra: H::ValidatorExtra,
}
impl<H: TestHarness> Clone for ValidatorHandle<H> {
fn clone(&self) -> Self {
Self {
mailbox: self.mailbox.clone(),
extra: self.extra.clone(),
}
}
}
pub trait TestHarness: 'static + Sized {
type ApplicationBlock: crate::Block + Digestible<Digest = D> + Clone + Send + 'static;
type Variant: crate::marshal::core::Variant<
ApplicationBlock = Self::ApplicationBlock,
Commitment = Self::Commitment,
>;
type TestBlock: Heightable + Clone + Send;
type ValidatorExtra: Clone + Send;
type Commitment: DigestTrait;
fn setup_validator(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
) -> impl Future<Output = ValidatorSetup<Self>> + Send;
fn setup_validator_with(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
max_pending_acks: NonZeroUsize,
application: Application<Self::ApplicationBlock>,
) -> impl Future<Output = ValidatorSetup<Self>> + Send;
fn genesis_parent_commitment(num_participants: u16) -> Self::Commitment;
fn make_test_block(
parent: D,
parent_commitment: Self::Commitment,
height: Height,
timestamp: u64,
num_participants: u16,
) -> Self::TestBlock;
fn commitment(block: &Self::TestBlock) -> Self::Commitment;
fn digest(block: &Self::TestBlock) -> D;
fn height(block: &Self::TestBlock) -> Height;
fn propose(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
) -> impl Future<Output = ()> + Send;
fn verify(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
all_handles: &mut [ValidatorHandle<Self>],
) -> impl Future<Output = ()> + Send;
fn make_finalization(
proposal: Proposal<Self::Commitment>,
schemes: &[S],
quorum: u32,
) -> Finalization<S, Self::Commitment>;
fn make_notarization(
proposal: Proposal<Self::Commitment>,
schemes: &[S],
quorum: u32,
) -> Notarization<S, Self::Commitment>;
fn report_finalization(
mailbox: &mut Mailbox<S, Self::Variant>,
finalization: Finalization<S, Self::Commitment>,
) -> impl Future<Output = ()> + Send;
fn report_notarization(
mailbox: &mut Mailbox<S, Self::Variant>,
notarization: Notarization<S, Self::Commitment>,
) -> impl Future<Output = ()> + Send;
fn finalize_timeout() -> Duration;
#[allow(clippy::type_complexity)]
fn setup_prunable_validator(
context: deterministic::Context,
oracle: &Oracle<K, deterministic::Context>,
validator: K,
schemes: &[S],
partition_prefix: &str,
page_cache: CacheRef,
) -> impl Future<
Output = (
Mailbox<S, Self::Variant>,
Self::ValidatorExtra,
Application<Self::ApplicationBlock>,
),
> + Send;
fn verify_for_prune(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
) -> impl Future<Output = ()> + Send;
}
pub struct StandardHarness;
impl TestHarness for StandardHarness {
type ApplicationBlock = B;
type Variant = Standard<B>;
type TestBlock = B;
type ValidatorExtra = ();
type Commitment = D;
async fn setup_validator(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
) -> ValidatorSetup<Self> {
Self::setup_validator_with(
context,
oracle,
validator,
provider,
NZUsize!(1),
Application::default(),
)
.await
}
async fn setup_validator_with(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
max_pending_acks: NonZeroUsize,
application: Application<Self::ApplicationBlock>,
) -> ValidatorSetup<Self> {
let config = Config {
provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
mailbox_size: 100,
view_retention_timeout: ViewDelta::new(10),
max_repair: NZUsize!(10),
max_pending_acks,
block_codec_config: (),
partition_prefix: format!("validator-{}", validator.clone()),
prunable_items_per_section: NZU64!(10),
replay_buffer: NZUsize!(1024),
key_write_buffer: NZUsize!(1024),
value_write_buffer: NZUsize!(1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
};
let control = oracle.control(validator.clone());
let backfill = control.register(1, TEST_QUOTA).await.unwrap();
let resolver_cfg = resolver::Config {
public_key: validator.clone(),
peer_provider: oracle.manager(),
blocker: oracle.control(validator.clone()),
mailbox_size: config.mailbox_size,
initial: Duration::from_secs(1),
timeout: Duration::from_secs(2),
fetch_retry_timeout: Duration::from_millis(100),
priority_requests: false,
priority_responses: false,
};
let resolver = resolver::init(&context, resolver_cfg, backfill);
let broadcast_config = buffered::Config {
public_key: validator.clone(),
mailbox_size: config.mailbox_size,
deque_size: 10,
priority: false,
codec_config: (),
peer_provider: oracle.manager(),
};
let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
let network = control.register(2, TEST_QUOTA).await.unwrap();
broadcast_engine.start(network);
let start = Instant::now();
let finalizations_by_height = immutable::Archive::init(
context.with_label("finalizations_by_height"),
immutable::Config {
metadata_partition: format!(
"{}-finalizations-by-height-metadata",
config.partition_prefix
),
freezer_table_partition: format!(
"{}-finalizations-by-height-freezer-table",
config.partition_prefix
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{}-finalizations-by-height-freezer-key",
config.partition_prefix
),
freezer_key_page_cache: config.page_cache.clone(),
freezer_value_partition: format!(
"{}-finalizations-by-height-freezer-value",
config.partition_prefix
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!(
"{}-finalizations-by-height-ordinal",
config.partition_prefix
),
items_per_section: NZU64!(10),
codec_config: S::certificate_codec_config_unbounded(),
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalizations by height archive");
info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
let start = Instant::now();
let finalized_blocks = immutable::Archive::init(
context.with_label("finalized_blocks"),
immutable::Config {
metadata_partition: format!(
"{}-finalized_blocks-metadata",
config.partition_prefix
),
freezer_table_partition: format!(
"{}-finalized_blocks-freezer-table",
config.partition_prefix
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{}-finalized_blocks-freezer-key",
config.partition_prefix
),
freezer_key_page_cache: config.page_cache.clone(),
freezer_value_partition: format!(
"{}-finalized_blocks-freezer-value",
config.partition_prefix
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
items_per_section: NZU64!(10),
codec_config: config.block_codec_config,
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalized blocks archive");
info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
let (actor, mailbox, height) = Actor::init(
context.clone(),
finalizations_by_height,
finalized_blocks,
config,
)
.await;
actor.start(application.clone(), buffer, resolver);
ValidatorSetup {
application,
mailbox,
extra: (),
height,
}
}
fn genesis_parent_commitment(_num_participants: u16) -> D {
Sha256::hash(b"")
}
fn make_test_block(
parent: D,
_parent_commitment: D,
height: Height,
timestamp: u64,
_num_participants: u16,
) -> B {
make_raw_block(parent, height, timestamp)
}
fn commitment(block: &B) -> D {
block.digest()
}
fn digest(block: &B) -> D {
block.digest()
}
fn height(block: &B) -> Height {
block.height()
}
async fn propose(handle: &mut ValidatorHandle<Self>, round: Round, block: &B) {
handle.mailbox.proposed(round, block.clone()).await;
}
async fn verify(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &B,
_all_handles: &mut [ValidatorHandle<Self>],
) {
handle.mailbox.verified(round, block.clone()).await;
}
fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
let finalizes: Vec<_> = schemes
.iter()
.take(quorum as usize)
.map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
.collect();
Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
}
fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
let notarizes: Vec<_> = schemes
.iter()
.take(quorum as usize)
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
Notarization::from_notarizes(&schemes[0], ¬arizes, &Sequential).unwrap()
}
async fn report_finalization(
mailbox: &mut Mailbox<S, Self::Variant>,
finalization: Finalization<S, D>,
) {
mailbox.report(Activity::Finalization(finalization)).await;
}
async fn report_notarization(
mailbox: &mut Mailbox<S, Self::Variant>,
notarization: Notarization<S, D>,
) {
mailbox.report(Activity::Notarization(notarization)).await;
}
fn finalize_timeout() -> Duration {
Duration::from_secs(600)
}
async fn setup_prunable_validator(
context: deterministic::Context,
oracle: &Oracle<K, deterministic::Context>,
validator: K,
schemes: &[S],
partition_prefix: &str,
page_cache: CacheRef,
) -> (
Mailbox<S, Self::Variant>,
Self::ValidatorExtra,
Application<B>,
) {
let control = oracle.control(validator.clone());
let provider = ConstantProvider::new(schemes[0].clone());
let config = Config {
provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
mailbox_size: 100,
view_retention_timeout: ViewDelta::new(10),
max_repair: NZUsize!(10),
max_pending_acks: NZUsize!(1),
block_codec_config: (),
partition_prefix: partition_prefix.to_string(),
prunable_items_per_section: NZU64!(10),
replay_buffer: NZUsize!(1024),
key_write_buffer: NZUsize!(1024),
value_write_buffer: NZUsize!(1024),
page_cache: page_cache.clone(),
strategy: Sequential,
};
let backfill = control.register(0, TEST_QUOTA).await.unwrap();
let resolver_cfg = resolver::Config {
public_key: validator.clone(),
peer_provider: oracle.manager(),
blocker: control.clone(),
mailbox_size: config.mailbox_size,
initial: Duration::from_secs(1),
timeout: Duration::from_secs(2),
fetch_retry_timeout: Duration::from_millis(100),
priority_requests: false,
priority_responses: false,
};
let resolver = resolver::init(&context, resolver_cfg, backfill);
let broadcast_config = buffered::Config {
public_key: validator.clone(),
mailbox_size: config.mailbox_size,
deque_size: 10,
priority: false,
codec_config: (),
peer_provider: oracle.manager(),
};
let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
let network = control.register(1, TEST_QUOTA).await.unwrap();
broadcast_engine.start(network);
let finalizations_by_height = prunable::Archive::init(
context.with_label("finalizations_by_height"),
prunable::Config {
translator: EightCap,
key_partition: format!("{}-finalizations-by-height-key", partition_prefix),
key_page_cache: page_cache.clone(),
value_partition: format!("{}-finalizations-by-height-value", partition_prefix),
compression: None,
codec_config: S::certificate_codec_config_unbounded(),
items_per_section: NZU64!(10),
key_write_buffer: config.key_write_buffer,
value_write_buffer: config.value_write_buffer,
replay_buffer: config.replay_buffer,
},
)
.await
.expect("failed to initialize finalizations by height archive");
let finalized_blocks = prunable::Archive::init(
context.with_label("finalized_blocks"),
prunable::Config {
translator: EightCap,
key_partition: format!("{}-finalized-blocks-key", partition_prefix),
key_page_cache: page_cache.clone(),
value_partition: format!("{}-finalized-blocks-value", partition_prefix),
compression: None,
codec_config: config.block_codec_config,
items_per_section: NZU64!(10),
key_write_buffer: config.key_write_buffer,
value_write_buffer: config.value_write_buffer,
replay_buffer: config.replay_buffer,
},
)
.await
.expect("failed to initialize finalized blocks archive");
let (actor, mailbox, _) = Actor::init(
context.clone(),
finalizations_by_height,
finalized_blocks,
config,
)
.await;
let application = Application::<B>::default();
actor.start(application.clone(), buffer, resolver);
(mailbox, (), application)
}
async fn verify_for_prune(handle: &mut ValidatorHandle<Self>, round: Round, block: &B) {
handle.mailbox.verified(round, block.clone()).await;
}
}
pub struct InlineHarness;
impl TestHarness for InlineHarness {
type ApplicationBlock = <StandardHarness as TestHarness>::ApplicationBlock;
type Variant = <StandardHarness as TestHarness>::Variant;
type TestBlock = <StandardHarness as TestHarness>::TestBlock;
type ValidatorExtra = <StandardHarness as TestHarness>::ValidatorExtra;
type Commitment = <StandardHarness as TestHarness>::Commitment;
async fn setup_validator(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
) -> ValidatorSetup<Self> {
let setup = StandardHarness::setup_validator(context, oracle, validator, provider).await;
ValidatorSetup {
application: setup.application,
mailbox: setup.mailbox,
extra: setup.extra,
height: setup.height,
}
}
async fn setup_validator_with(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
max_pending_acks: NonZeroUsize,
application: Application<Self::ApplicationBlock>,
) -> ValidatorSetup<Self> {
let setup = StandardHarness::setup_validator_with(
context,
oracle,
validator,
provider,
max_pending_acks,
application,
)
.await;
ValidatorSetup {
application: setup.application,
mailbox: setup.mailbox,
extra: setup.extra,
height: setup.height,
}
}
fn genesis_parent_commitment(num_participants: u16) -> Self::Commitment {
StandardHarness::genesis_parent_commitment(num_participants)
}
fn make_test_block(
parent: D,
parent_commitment: Self::Commitment,
height: Height,
timestamp: u64,
num_participants: u16,
) -> Self::TestBlock {
StandardHarness::make_test_block(
parent,
parent_commitment,
height,
timestamp,
num_participants,
)
}
fn commitment(block: &Self::TestBlock) -> Self::Commitment {
StandardHarness::commitment(block)
}
fn digest(block: &Self::TestBlock) -> D {
StandardHarness::digest(block)
}
fn height(block: &Self::TestBlock) -> Height {
StandardHarness::height(block)
}
async fn propose(handle: &mut ValidatorHandle<Self>, round: Round, block: &Self::TestBlock) {
StandardHarness::propose(
&mut ValidatorHandle::<StandardHarness> {
mailbox: handle.mailbox.clone(),
extra: handle.extra,
},
round,
block,
)
.await;
}
async fn verify(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
_all_handles: &mut [ValidatorHandle<Self>],
) {
StandardHarness::verify(
&mut ValidatorHandle::<StandardHarness> {
mailbox: handle.mailbox.clone(),
extra: handle.extra,
},
round,
block,
&mut [],
)
.await;
}
fn make_finalization(
proposal: Proposal<Self::Commitment>,
schemes: &[S],
quorum: u32,
) -> Finalization<S, Self::Commitment> {
StandardHarness::make_finalization(proposal, schemes, quorum)
}
fn make_notarization(
proposal: Proposal<Self::Commitment>,
schemes: &[S],
quorum: u32,
) -> Notarization<S, Self::Commitment> {
StandardHarness::make_notarization(proposal, schemes, quorum)
}
async fn report_finalization(
mailbox: &mut Mailbox<S, Self::Variant>,
finalization: Finalization<S, Self::Commitment>,
) {
StandardHarness::report_finalization(mailbox, finalization).await;
}
async fn report_notarization(
mailbox: &mut Mailbox<S, Self::Variant>,
notarization: Notarization<S, Self::Commitment>,
) {
StandardHarness::report_notarization(mailbox, notarization).await;
}
fn finalize_timeout() -> Duration {
StandardHarness::finalize_timeout()
}
async fn setup_prunable_validator(
context: deterministic::Context,
oracle: &Oracle<K, deterministic::Context>,
validator: K,
schemes: &[S],
partition_prefix: &str,
page_cache: CacheRef,
) -> (
Mailbox<S, Self::Variant>,
Self::ValidatorExtra,
Application<Self::ApplicationBlock>,
) {
StandardHarness::setup_prunable_validator(
context,
oracle,
validator,
schemes,
partition_prefix,
page_cache,
)
.await
}
async fn verify_for_prune(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
) {
StandardHarness::verify_for_prune(
&mut ValidatorHandle::<StandardHarness> {
mailbox: handle.mailbox.clone(),
extra: handle.extra,
},
round,
block,
)
.await;
}
}
pub struct DeferredHarness;
impl TestHarness for DeferredHarness {
type ApplicationBlock = <InlineHarness as TestHarness>::ApplicationBlock;
type Variant = <InlineHarness as TestHarness>::Variant;
type TestBlock = <InlineHarness as TestHarness>::TestBlock;
type ValidatorExtra = <InlineHarness as TestHarness>::ValidatorExtra;
type Commitment = <InlineHarness as TestHarness>::Commitment;
async fn setup_validator(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
) -> ValidatorSetup<Self> {
let setup = InlineHarness::setup_validator(context, oracle, validator, provider).await;
ValidatorSetup {
application: setup.application,
mailbox: setup.mailbox,
extra: setup.extra,
height: setup.height,
}
}
async fn setup_validator_with(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
max_pending_acks: NonZeroUsize,
application: Application<Self::ApplicationBlock>,
) -> ValidatorSetup<Self> {
let setup = InlineHarness::setup_validator_with(
context,
oracle,
validator,
provider,
max_pending_acks,
application,
)
.await;
ValidatorSetup {
application: setup.application,
mailbox: setup.mailbox,
extra: setup.extra,
height: setup.height,
}
}
fn genesis_parent_commitment(num_participants: u16) -> Self::Commitment {
InlineHarness::genesis_parent_commitment(num_participants)
}
fn make_test_block(
parent: D,
parent_commitment: Self::Commitment,
height: Height,
timestamp: u64,
num_participants: u16,
) -> Self::TestBlock {
InlineHarness::make_test_block(
parent,
parent_commitment,
height,
timestamp,
num_participants,
)
}
fn commitment(block: &Self::TestBlock) -> Self::Commitment {
InlineHarness::commitment(block)
}
fn digest(block: &Self::TestBlock) -> D {
InlineHarness::digest(block)
}
fn height(block: &Self::TestBlock) -> Height {
InlineHarness::height(block)
}
async fn propose(handle: &mut ValidatorHandle<Self>, round: Round, block: &Self::TestBlock) {
InlineHarness::propose(
&mut ValidatorHandle::<InlineHarness> {
mailbox: handle.mailbox.clone(),
extra: handle.extra,
},
round,
block,
)
.await;
}
async fn verify(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
_all_handles: &mut [ValidatorHandle<Self>],
) {
InlineHarness::verify(
&mut ValidatorHandle::<InlineHarness> {
mailbox: handle.mailbox.clone(),
extra: handle.extra,
},
round,
block,
&mut [],
)
.await;
}
fn make_finalization(
proposal: Proposal<Self::Commitment>,
schemes: &[S],
quorum: u32,
) -> Finalization<S, Self::Commitment> {
InlineHarness::make_finalization(proposal, schemes, quorum)
}
fn make_notarization(
proposal: Proposal<Self::Commitment>,
schemes: &[S],
quorum: u32,
) -> Notarization<S, Self::Commitment> {
InlineHarness::make_notarization(proposal, schemes, quorum)
}
async fn report_finalization(
mailbox: &mut Mailbox<S, Self::Variant>,
finalization: Finalization<S, Self::Commitment>,
) {
InlineHarness::report_finalization(mailbox, finalization).await;
}
async fn report_notarization(
mailbox: &mut Mailbox<S, Self::Variant>,
notarization: Notarization<S, Self::Commitment>,
) {
InlineHarness::report_notarization(mailbox, notarization).await;
}
fn finalize_timeout() -> Duration {
InlineHarness::finalize_timeout()
}
async fn setup_prunable_validator(
context: deterministic::Context,
oracle: &Oracle<K, deterministic::Context>,
validator: K,
schemes: &[S],
partition_prefix: &str,
page_cache: CacheRef,
) -> (
Mailbox<S, Self::Variant>,
Self::ValidatorExtra,
Application<Self::ApplicationBlock>,
) {
InlineHarness::setup_prunable_validator(
context,
oracle,
validator,
schemes,
partition_prefix,
page_cache,
)
.await
}
async fn verify_for_prune(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &Self::TestBlock,
) {
InlineHarness::verify_for_prune(
&mut ValidatorHandle::<InlineHarness> {
mailbox: handle.mailbox.clone(),
extra: handle.extra,
},
round,
block,
)
.await;
}
}
pub struct CodingHarness;
type CodingVariant = Coding<CodingB, ReedSolomon<Sha256>, Sha256, K>;
type ShardsMailbox = shards::Mailbox<CodingB, ReedSolomon<Sha256>, Sha256, K>;
pub const GENESIS_CODING_CONFIG: commonware_coding::Config = commonware_coding::Config {
minimum_shards: NZU16!(1),
extra_shards: NZU16!(1),
};
pub fn genesis_commitment() -> Commitment {
Commitment::from((
D::EMPTY,
D::EMPTY,
Sha256Digest::EMPTY,
GENESIS_CODING_CONFIG,
))
}
pub fn make_coding_block(context: CodingCtx, parent: D, height: Height, timestamp: u64) -> CodingB {
CodingB::new::<Sha256>(context, parent, height, timestamp)
}
impl TestHarness for CodingHarness {
type ApplicationBlock = CodingB;
type Variant = CodingVariant;
type TestBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
type ValidatorExtra = ShardsMailbox;
type Commitment = Commitment;
async fn setup_validator(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
) -> ValidatorSetup<Self> {
Self::setup_validator_with(
context,
oracle,
validator,
provider,
NZUsize!(1),
Application::default(),
)
.await
}
async fn setup_validator_with(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
max_pending_acks: NonZeroUsize,
application: Application<Self::ApplicationBlock>,
) -> ValidatorSetup<Self> {
let config = Config {
provider: provider.clone(),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
mailbox_size: 100,
view_retention_timeout: ViewDelta::new(10),
max_repair: NZUsize!(10),
max_pending_acks,
block_codec_config: (),
partition_prefix: format!("validator-{}", validator.clone()),
prunable_items_per_section: NZU64!(10),
replay_buffer: NZUsize!(1024),
key_write_buffer: NZUsize!(1024),
value_write_buffer: NZUsize!(1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
};
let control = oracle.control(validator.clone());
let backfill = control.register(1, TEST_QUOTA).await.unwrap();
let resolver_cfg = resolver::Config {
public_key: validator.clone(),
peer_provider: oracle.manager(),
blocker: oracle.control(validator.clone()),
mailbox_size: config.mailbox_size,
initial: Duration::from_secs(1),
timeout: Duration::from_secs(2),
fetch_retry_timeout: Duration::from_millis(100),
priority_requests: false,
priority_responses: false,
};
let resolver = resolver::init(&context, resolver_cfg, backfill);
let start = Instant::now();
let finalizations_by_height = immutable::Archive::init(
context.with_label("finalizations_by_height"),
immutable::Config {
metadata_partition: format!(
"{}-finalizations-by-height-metadata",
config.partition_prefix
),
freezer_table_partition: format!(
"{}-finalizations-by-height-freezer-table",
config.partition_prefix
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{}-finalizations-by-height-freezer-key",
config.partition_prefix
),
freezer_key_page_cache: config.page_cache.clone(),
freezer_value_partition: format!(
"{}-finalizations-by-height-freezer-value",
config.partition_prefix
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!(
"{}-finalizations-by-height-ordinal",
config.partition_prefix
),
items_per_section: NZU64!(10),
codec_config: S::certificate_codec_config_unbounded(),
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalizations by height archive");
info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
let start = Instant::now();
let finalized_blocks = immutable::Archive::init(
context.with_label("finalized_blocks"),
immutable::Config {
metadata_partition: format!(
"{}-finalized_blocks-metadata",
config.partition_prefix
),
freezer_table_partition: format!(
"{}-finalized_blocks-freezer-table",
config.partition_prefix
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{}-finalized_blocks-freezer-key",
config.partition_prefix
),
freezer_key_page_cache: config.page_cache.clone(),
freezer_value_partition: format!(
"{}-finalized_blocks-freezer-value",
config.partition_prefix
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
items_per_section: NZU64!(10),
codec_config: config.block_codec_config,
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalized blocks archive");
info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
let shard_config: shards::Config<_, _, _, _, _, Sha256, _, _> = shards::Config {
scheme_provider: provider.clone(),
blocker: oracle.control(validator.clone()),
shard_codec_cfg: CodecConfig {
maximum_shard_size: 1024 * 1024,
},
block_codec_cfg: (),
strategy: Sequential,
mailbox_size: 10,
peer_buffer_size: NZUsize!(64),
background_channel_capacity: 1024,
peer_provider: oracle.manager(),
};
let (shard_engine, shard_mailbox) = shards::Engine::new(context.clone(), shard_config);
let network = control.register(2, TEST_QUOTA).await.unwrap();
shard_engine.start(network);
let (actor, mailbox, height) = Actor::init(
context.clone(),
finalizations_by_height,
finalized_blocks,
config,
)
.await;
actor.start(application.clone(), shard_mailbox.clone(), resolver);
ValidatorSetup {
application,
mailbox,
extra: shard_mailbox,
height,
}
}
fn make_test_block(
parent: D,
parent_commitment: Commitment,
height: Height,
timestamp: u64,
num_participants: u16,
) -> CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256> {
let parent_view = height
.previous()
.map(|h| View::new(h.get()))
.unwrap_or(View::zero());
let context = CodingCtx {
round: Round::new(Epoch::zero(), View::new(height.get())),
leader: default_leader(),
parent: (parent_view, parent_commitment),
};
let raw = CodingB::new::<Sha256>(context, parent, height, timestamp);
let coding_config = coding_config_for_participants(num_participants);
CodedBlock::new(raw, coding_config, &Sequential)
}
fn genesis_parent_commitment(_num_participants: u16) -> Commitment {
genesis_commitment()
}
fn commitment(block: &CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>) -> Commitment {
block.commitment()
}
fn digest(block: &CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>) -> D {
block.digest()
}
fn height(block: &CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>) -> Height {
block.height()
}
async fn propose(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>,
) {
handle.mailbox.proposed(round, block.clone()).await;
}
async fn verify(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>,
_all_handles: &mut [ValidatorHandle<Self>],
) {
handle.mailbox.verified(round, block.clone()).await;
}
fn make_finalization(
proposal: Proposal<Commitment>,
schemes: &[S],
quorum: u32,
) -> Finalization<S, Commitment> {
let finalizes: Vec<_> = schemes
.iter()
.take(quorum as usize)
.map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
.collect();
Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
}
fn make_notarization(
proposal: Proposal<Commitment>,
schemes: &[S],
quorum: u32,
) -> Notarization<S, Commitment> {
let notarizes: Vec<_> = schemes
.iter()
.take(quorum as usize)
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
Notarization::from_notarizes(&schemes[0], ¬arizes, &Sequential).unwrap()
}
async fn report_finalization(
mailbox: &mut Mailbox<S, Self::Variant>,
finalization: Finalization<S, Commitment>,
) {
mailbox.report(Activity::Finalization(finalization)).await;
}
async fn report_notarization(
mailbox: &mut Mailbox<S, Self::Variant>,
notarization: Notarization<S, Commitment>,
) {
mailbox.report(Activity::Notarization(notarization)).await;
}
fn finalize_timeout() -> Duration {
Duration::from_secs(900)
}
async fn setup_prunable_validator(
context: deterministic::Context,
oracle: &Oracle<K, deterministic::Context>,
validator: K,
schemes: &[S],
partition_prefix: &str,
page_cache: CacheRef,
) -> (
Mailbox<S, Self::Variant>,
Self::ValidatorExtra,
Application<CodingB>,
) {
let control = oracle.control(validator.clone());
let provider = ConstantProvider::new(schemes[0].clone());
let config = Config {
provider: provider.clone(),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
mailbox_size: 100,
view_retention_timeout: ViewDelta::new(10),
max_repair: NZUsize!(10),
max_pending_acks: NZUsize!(1),
block_codec_config: (),
partition_prefix: partition_prefix.to_string(),
prunable_items_per_section: NZU64!(10),
replay_buffer: NZUsize!(1024),
key_write_buffer: NZUsize!(1024),
value_write_buffer: NZUsize!(1024),
page_cache: page_cache.clone(),
strategy: Sequential,
};
let backfill = control.register(0, TEST_QUOTA).await.unwrap();
let resolver_cfg = resolver::Config {
public_key: validator.clone(),
peer_provider: oracle.manager(),
blocker: control.clone(),
mailbox_size: config.mailbox_size,
initial: Duration::from_secs(1),
timeout: Duration::from_secs(2),
fetch_retry_timeout: Duration::from_millis(100),
priority_requests: false,
priority_responses: false,
};
let resolver = resolver::init(&context, resolver_cfg, backfill);
let shard_config: shards::Config<_, _, _, _, _, Sha256, _, _> = shards::Config {
scheme_provider: provider.clone(),
blocker: oracle.control(validator.clone()),
shard_codec_cfg: CodecConfig {
maximum_shard_size: 1024 * 1024,
},
block_codec_cfg: (),
strategy: Sequential,
mailbox_size: 10,
peer_buffer_size: NZUsize!(64),
background_channel_capacity: 1024,
peer_provider: oracle.manager(),
};
let (shard_engine, shard_mailbox) = shards::Engine::new(context.clone(), shard_config);
let network = control.register(1, TEST_QUOTA).await.unwrap();
shard_engine.start(network);
let finalizations_by_height = prunable::Archive::init(
context.with_label("finalizations_by_height"),
prunable::Config {
translator: EightCap,
key_partition: format!("{}-finalizations-by-height-key", partition_prefix),
key_page_cache: page_cache.clone(),
value_partition: format!("{}-finalizations-by-height-value", partition_prefix),
compression: None,
codec_config: S::certificate_codec_config_unbounded(),
items_per_section: NZU64!(10),
key_write_buffer: config.key_write_buffer,
value_write_buffer: config.value_write_buffer,
replay_buffer: config.replay_buffer,
},
)
.await
.expect("failed to initialize finalizations by height archive");
let finalized_blocks = prunable::Archive::init(
context.with_label("finalized_blocks"),
prunable::Config {
translator: EightCap,
key_partition: format!("{}-finalized-blocks-key", partition_prefix),
key_page_cache: page_cache.clone(),
value_partition: format!("{}-finalized-blocks-value", partition_prefix),
compression: None,
codec_config: config.block_codec_config,
items_per_section: NZU64!(10),
key_write_buffer: config.key_write_buffer,
value_write_buffer: config.value_write_buffer,
replay_buffer: config.replay_buffer,
},
)
.await
.expect("failed to initialize finalized blocks archive");
let (actor, mailbox, _) = Actor::init(
context.clone(),
finalizations_by_height,
finalized_blocks,
config,
)
.await;
let application = Application::<CodingB>::default();
actor.start(application.clone(), shard_mailbox.clone(), resolver);
(mailbox, shard_mailbox, application)
}
async fn verify_for_prune(
handle: &mut ValidatorHandle<Self>,
round: Round,
block: &CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>,
) {
handle.mailbox.verified(round, block.clone()).await;
}
}
pub fn finalize<H: TestHarness>(seed: u64, link: Link, quorum_sees_finalization: bool) -> String {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(H::finalize_timeout())),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(3), participants.clone())
.await;
let mut applications = BTreeMap::new();
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
applications.insert(validator.clone(), setup.application);
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
setup_network_links(&mut oracle, &participants, link.clone()).await;
let mut blocks = Vec::new();
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
for i in 1..=NUM_BLOCKS {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
parent = H::digest(&block);
parent_commitment = H::commitment(&block);
blocks.push(block);
}
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
blocks.shuffle(&mut context);
for block in blocks.iter() {
let height = H::height(block);
assert!(
!height.is_zero(),
"genesis block should not have been generated"
);
let bounds = epocher.containing(height).unwrap();
let round = Round::new(bounds.epoch(), View::new(height.get()));
let actor_index: usize = (height.get() % (NUM_VALIDATORS as u64)) as usize;
let mut handle = handles[actor_index].clone();
H::propose(&mut handle, round, block).await;
H::verify(&mut handle, round, block, &mut handles).await;
context.sleep(link.latency).await;
let proposal = Proposal {
round,
parent: View::new(height.previous().unwrap().get()),
payload: H::commitment(block),
};
let notarization = H::make_notarization(proposal.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization).await;
let fin = H::make_finalization(proposal, &schemes, QUORUM);
if quorum_sees_finalization {
let do_finalize = context.gen_bool(0.2);
for (i, h) in handles
.iter_mut()
.choose_multiple(&mut context, NUM_VALIDATORS as usize)
.iter_mut()
.enumerate()
{
if (do_finalize && i < QUORUM as usize)
|| height.get() == NUM_BLOCKS
|| height == bounds.last()
{
H::report_finalization(&mut h.mailbox, fin.clone()).await;
}
}
} else {
for h in handles.iter_mut() {
if context.gen_bool(0.2)
|| height.get() == NUM_BLOCKS
|| height == bounds.last()
{
H::report_finalization(&mut h.mailbox, fin.clone()).await;
}
}
}
}
let mut finished = false;
while !finished {
context.sleep(Duration::from_secs(1)).await;
if applications.len() != NUM_VALIDATORS as usize {
continue;
}
finished = true;
for app in applications.values() {
if app.blocks().len() != NUM_BLOCKS as usize {
finished = false;
break;
}
let Some((height, _)) = app.tip() else {
finished = false;
break;
};
if height.get() < NUM_BLOCKS {
finished = false;
break;
}
}
}
context.auditor().state()
})
}
pub fn ack_pipeline_backlog<H: TestHarness>() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(0xA11CE)
.with_timeout(Some(Duration::from_secs(120))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let validator = participants[0].clone();
let application = Application::<H::ApplicationBlock>::manual_ack();
let setup = H::setup_validator_with(
context.with_label("validator_0"),
&mut oracle,
validator,
ConstantProvider::new(schemes[0].clone()),
NZUsize!(3),
application,
)
.await;
let application = setup.application;
let mut handles = vec![ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
}];
let mut handle = handles[0].clone();
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(NUM_VALIDATORS as u16);
for i in 1..=5 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
NUM_VALIDATORS as u16,
);
let commitment = H::commitment(&block);
parent = H::digest(&block);
parent_commitment = commitment;
let round = Round::new(
epocher.containing(H::height(&block)).unwrap().epoch(),
View::new(i),
);
H::verify(&mut handle, round, &block, &mut handles).await;
let proposal = Proposal {
round,
parent: View::new(i.saturating_sub(1)),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
}
while application.blocks().len() < 3 || application.pending_ack_heights().len() < 3 {
context.sleep(Duration::from_millis(10)).await;
}
assert_eq!(
application.pending_ack_heights(),
vec![Height::new(1), Height::new(2), Height::new(3)]
);
assert!(!application.blocks().contains_key(&Height::new(4)));
assert!(!application.blocks().contains_key(&Height::new(5)));
for expected in 1..=5 {
let expected = Height::new(expected);
while application.pending_ack_heights().first().copied() != Some(expected) {
context.sleep(Duration::from_millis(10)).await;
}
let acknowledged = application
.acknowledge_next()
.expect("pending ack should be present");
assert_eq!(acknowledged, expected);
}
while application.blocks().len() < 5 || !application.pending_ack_heights().is_empty() {
context.sleep(Duration::from_millis(10)).await;
}
});
}
pub fn ack_pipeline_backlog_persists_on_restart<H: TestHarness>() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(0xA11CF)
.with_timeout(Some(Duration::from_secs(120))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let validator = participants[0].clone();
let application = Application::<H::ApplicationBlock>::manual_ack();
let setup = H::setup_validator_with(
context.with_label("validator_0"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
NZUsize!(3),
application,
)
.await;
let application = setup.application;
let mut handles = vec![ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
}];
let mut handle = handles[0].clone();
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(NUM_VALIDATORS as u16);
for i in 1..=3 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
NUM_VALIDATORS as u16,
);
let commitment = H::commitment(&block);
parent = H::digest(&block);
parent_commitment = commitment;
let round = Round::new(
epocher.containing(H::height(&block)).unwrap().epoch(),
View::new(i),
);
H::verify(&mut handle, round, &block, &mut handles).await;
let proposal = Proposal {
round,
parent: View::new(i.saturating_sub(1)),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
}
while application.pending_ack_heights().len() < 3 {
context.sleep(Duration::from_millis(10)).await;
}
assert_eq!(
application.pending_ack_heights(),
vec![Height::new(1), Height::new(2), Height::new(3)]
);
assert_eq!(application.acknowledge_next(), Some(Height::new(1)));
assert_eq!(application.acknowledge_next(), Some(Height::new(2)));
assert_eq!(application.acknowledge_next(), Some(Height::new(3)));
context.sleep(Duration::from_millis(10)).await;
assert_eq!(
application.tip().map(|(height, _)| height),
Some(Height::new(3))
);
let restart = H::setup_validator_with(
context.with_label("validator_0_restart"),
&mut oracle,
validator,
ConstantProvider::new(schemes[0].clone()),
NZUsize!(3),
Application::manual_ack(),
)
.await;
assert_eq!(restart.height, Height::new(3));
});
}
pub fn sync_height_floor<H: TestHarness>() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(0xFF)
.with_timeout(Some(Duration::from_secs(300))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(3), participants.clone())
.await;
let mut applications = BTreeMap::new();
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate().skip(1) {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
applications.insert(validator.clone(), setup.application);
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
setup_network_links(&mut oracle, &participants[1..], LINK).await;
let mut blocks = Vec::new();
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
for i in 1..=NUM_BLOCKS {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
parent = H::digest(&block);
parent_commitment = H::commitment(&block);
blocks.push(block);
}
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
for block in blocks.iter() {
let height = H::height(block);
assert!(
!height.is_zero(),
"genesis block should not have been generated"
);
let bounds = epocher.containing(height).unwrap();
let round = Round::new(bounds.epoch(), View::new(height.get()));
let actor_index: usize = (height.get() % (applications.len() as u64)) as usize;
let mut handle = handles[actor_index].clone();
H::propose(&mut handle, round, block).await;
H::verify(&mut handle, round, block, &mut handles).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(height.previous().unwrap().get()),
payload: H::commitment(block),
};
let notarization = H::make_notarization(proposal.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization).await;
let fin = H::make_finalization(proposal, &schemes, QUORUM);
for h in handles.iter_mut() {
H::report_finalization(&mut h.mailbox, fin.clone()).await;
}
}
let mut finished = false;
while !finished {
context.sleep(Duration::from_secs(1)).await;
finished = true;
for app in applications.values().skip(1) {
if app.blocks().len() != NUM_BLOCKS as usize {
finished = false;
break;
}
let Some((height, _)) = app.tip() else {
finished = false;
break;
};
if height.get() < NUM_BLOCKS {
finished = false;
break;
}
}
}
let validator = participants.first().unwrap();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let app = setup.application;
let mut mailbox = setup.mailbox;
setup_network_links(&mut oracle, &participants, LINK).await;
const NEW_SYNC_FLOOR: u64 = 100;
let second_handle = &mut handles[1];
let latest_finalization = second_handle
.mailbox
.get_finalization(Height::new(NUM_BLOCKS))
.await
.unwrap();
mailbox.set_floor(Height::new(NEW_SYNC_FLOOR)).await;
H::report_finalization(&mut mailbox, latest_finalization).await;
let mut finished = false;
while !finished {
context.sleep(Duration::from_secs(1)).await;
finished = true;
if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR) as usize {
finished = false;
continue;
}
let Some((height, _)) = app.tip() else {
finished = false;
continue;
};
if height.get() < NUM_BLOCKS {
finished = false;
continue;
}
}
for height in 1..=NUM_BLOCKS {
let block = mailbox
.get_block(Identifier::Height(Height::new(height)))
.await;
if height <= NEW_SYNC_FLOOR {
assert!(block.is_none());
} else {
assert_eq!(block.unwrap().height().get(), height);
}
}
})
}
pub fn prune_finalized_archives<H: TestHarness>() {
let runner = deterministic::Runner::new(
deterministic::Config::new().with_timeout(Some(Duration::from_secs(120))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let validator = participants[0].clone();
let partition_prefix = format!("prune-test-{}", validator.clone());
let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE);
let init_marshal = |ctx: deterministic::Context| {
let validator = validator.clone();
let schemes = schemes.clone();
let partition_prefix = partition_prefix.clone();
let page_cache = page_cache.clone();
let oracle = &oracle;
async move {
H::setup_prunable_validator(
ctx,
oracle,
validator,
&schemes,
&partition_prefix,
page_cache,
)
.await
}
};
let (mut mailbox, extra, application) = init_marshal(context.with_label("init")).await;
let _ = extra;
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(NUM_VALIDATORS as u16);
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
for i in 1..=20u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
NUM_VALIDATORS as u16,
);
let commitment = H::commitment(&block);
parent = H::digest(&block);
parent_commitment = commitment;
let bounds = epocher.containing(Height::new(i)).unwrap();
let round = Round::new(bounds.epoch(), View::new(i));
let mut handle = ValidatorHandle {
mailbox: mailbox.clone(),
extra: extra.clone(),
};
H::verify_for_prune(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut mailbox, finalization).await;
}
while application.blocks().len() < 20 {
context.sleep(Duration::from_millis(10)).await;
}
for i in 1..=20u64 {
assert!(
mailbox.get_block(Height::new(i)).await.is_some(),
"block {i} should exist before pruning"
);
assert!(
mailbox.get_finalization(Height::new(i)).await.is_some(),
"finalization {i} should exist before pruning"
);
}
mailbox.prune(Height::new(25)).await;
context.sleep(Duration::from_millis(50)).await;
for i in 1..=20u64 {
assert!(
mailbox.get_block(Height::new(i)).await.is_some(),
"block {i} should still exist after pruning above floor"
);
}
mailbox.prune(Height::new(10)).await;
context.sleep(Duration::from_millis(100)).await;
for i in 1..10u64 {
assert!(
mailbox.get_block(Height::new(i)).await.is_none(),
"block {i} should be pruned"
);
assert!(
mailbox.get_finalization(Height::new(i)).await.is_none(),
"finalization {i} should be pruned"
);
}
for i in 10..=20u64 {
assert!(
mailbox.get_block(Height::new(i)).await.is_some(),
"block {i} should still exist after pruning"
);
assert!(
mailbox.get_finalization(Height::new(i)).await.is_some(),
"finalization {i} should still exist after pruning"
);
}
mailbox.prune(Height::new(20)).await;
context.sleep(Duration::from_millis(100)).await;
for i in 10..20u64 {
assert!(
mailbox.get_block(Height::new(i)).await.is_none(),
"block {i} should be pruned after second prune"
);
assert!(
mailbox.get_finalization(Height::new(i)).await.is_none(),
"finalization {i} should be pruned after second prune"
);
}
assert!(
mailbox.get_block(Height::new(20)).await.is_some(),
"block 20 should still exist"
);
assert!(
mailbox.get_finalization(Height::new(20)).await.is_some(),
"finalization 20 should still exist"
);
drop(mailbox);
drop(extra);
let (mailbox, _extra, _application) = init_marshal(context.with_label("restart")).await;
for i in 1..20u64 {
assert!(
mailbox.get_block(Height::new(i)).await.is_none(),
"block {i} should still be pruned after restart"
);
assert!(
mailbox.get_finalization(Height::new(i)).await.is_none(),
"finalization {i} should still be pruned after restart"
);
}
assert!(
mailbox.get_block(Height::new(20)).await.is_some(),
"block 20 should still exist after restart"
);
assert!(
mailbox.get_finalization(Height::new(20)).await.is_some(),
"finalization 20 should still exist after restart"
);
})
}
pub fn reject_stale_block_delivery_after_floor_update<H: TestHarness>() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(0xBADC0DE)
.with_timeout(Some(Duration::from_secs(120))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let victim = participants[0].clone();
let attacker = participants[1].clone();
let peers = vec![victim.clone(), attacker.clone()];
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), peers.clone()).await;
let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE);
let (mut victim_mailbox, victim_extra, _victim_application) = H::setup_prunable_validator(
context.with_label("victim"),
&oracle,
victim.clone(),
&schemes,
&format!("stale-floor-victim-{}", victim),
page_cache.clone(),
)
.await;
let (attacker_mailbox, attacker_extra, _attacker_application) =
H::setup_prunable_validator(
context.with_label("attacker"),
&oracle,
attacker.clone(),
&schemes,
&format!("stale-floor-attacker-{}", attacker),
page_cache,
)
.await;
let _ = victim_extra;
setup_network_links(&mut oracle, &peers, LINK).await;
oracle
.remove_link(attacker.clone(), victim.clone())
.await
.unwrap();
let stale_height = Height::new(5);
let round = Round::new(Epoch::zero(), View::new(stale_height.get()));
let stale_block = H::make_test_block(
Sha256::hash(b"stale-parent"),
H::genesis_parent_commitment(NUM_VALIDATORS as u16),
stale_height,
stale_height.get(),
NUM_VALIDATORS as u16,
);
let commitment = H::commitment(&stale_block);
let mut attacker_handle = ValidatorHandle {
mailbox: attacker_mailbox,
extra: attacker_extra,
};
H::propose(&mut attacker_handle, round, &stale_block).await;
let mut no_handles: Vec<ValidatorHandle<H>> = Vec::new();
H::verify(
&mut attacker_handle,
round,
&stale_block,
no_handles.as_mut_slice(),
)
.await;
let proposal = Proposal {
round,
parent: View::new(stale_height.get() - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut victim_mailbox, finalization).await;
context.sleep(Duration::from_millis(500)).await;
let floor = Height::new(10);
victim_mailbox.set_floor(floor).await;
let _ = victim_mailbox.get_finalization(floor).await;
oracle
.add_link(attacker.clone(), victim.clone(), LINK)
.await
.unwrap();
context.sleep(Duration::from_secs(3)).await;
let blocked = oracle.blocked().await.unwrap();
assert!(
!blocked
.iter()
.any(|(blocker, blocked)| blocker == &victim && blocked == &attacker),
"stale delivery below floor must not block the serving peer"
);
assert!(
victim_mailbox.get_block(stale_height).await.is_none(),
"stale block below floor must not be persisted"
);
assert!(
victim_mailbox
.get_finalization(stale_height)
.await
.is_none(),
"stale finalization below floor must not be persisted"
);
});
}
pub fn subscribe_basic_block_delivery<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
let mut handle = handles[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let subscription_rx = handle
.mailbox
.subscribe_by_digest(Some(Round::new(Epoch::zero(), View::new(1))), digest)
.await;
H::propose(&mut handle, Round::new(Epoch::zero(), View::new(1)), &block).await;
H::verify(
&mut handle,
Round::new(Epoch::zero(), View::new(1)),
&block,
&mut handles,
)
.await;
let proposal = Proposal {
round: Round::new(Epoch::zero(), View::new(1)),
parent: View::zero(),
payload: commitment,
};
let notarization = H::make_notarization(proposal.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization).await;
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
let received_block = subscription_rx.await.unwrap();
assert_eq!(received_block.digest(), digest);
assert_eq!(received_block.height().get(), 1);
})
}
pub fn subscribe_multiple_subscriptions<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
let mut handle = handles[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block1 = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let block2 = H::make_test_block(
H::digest(&block1),
H::commitment(&block1),
Height::new(2),
2,
participants.len() as u16,
);
let digest1 = H::digest(&block1);
let digest2 = H::digest(&block2);
let sub1_rx = handle
.mailbox
.subscribe_by_digest(Some(Round::new(Epoch::zero(), View::new(1))), digest1)
.await;
let sub2_rx = handle
.mailbox
.subscribe_by_digest(Some(Round::new(Epoch::zero(), View::new(2))), digest2)
.await;
let sub3_rx = handle
.mailbox
.subscribe_by_digest(Some(Round::new(Epoch::zero(), View::new(1))), digest1)
.await;
for (view, block) in [(1u64, &block1), (2, &block2)] {
let round = Round::new(Epoch::zero(), View::new(view));
H::propose(&mut handle, round, block).await;
H::verify(&mut handle, round, block, &mut handles).await;
let proposal = Proposal {
round,
parent: View::new(view.checked_sub(1).unwrap()),
payload: H::commitment(block),
};
let notarization = H::make_notarization(proposal.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization).await;
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
}
let received1_sub1 = sub1_rx.await.unwrap();
let received2 = sub2_rx.await.unwrap();
let received1_sub3 = sub3_rx.await.unwrap();
assert_eq!(received1_sub1.digest(), digest1);
assert_eq!(received2.digest(), digest2);
assert_eq!(received1_sub3.digest(), digest1);
assert_eq!(received1_sub1.height().get(), 1);
assert_eq!(received2.height().get(), 2);
assert_eq!(received1_sub3.height().get(), 1);
})
}
pub fn subscribe_canceled_subscriptions<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
let mut handle = handles[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block1 = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let block2 = H::make_test_block(
H::digest(&block1),
H::commitment(&block1),
Height::new(2),
2,
participants.len() as u16,
);
let digest1 = H::digest(&block1);
let digest2 = H::digest(&block2);
let sub1_rx = handle
.mailbox
.subscribe_by_digest(Some(Round::new(Epoch::zero(), View::new(1))), digest1)
.await;
let sub2_rx = handle
.mailbox
.subscribe_by_digest(Some(Round::new(Epoch::zero(), View::new(2))), digest2)
.await;
drop(sub1_rx);
for (view, block) in [(1u64, &block1), (2, &block2)] {
let round = Round::new(Epoch::zero(), View::new(view));
H::propose(&mut handle, round, block).await;
H::verify(&mut handle, round, block, &mut handles).await;
let proposal = Proposal {
round,
parent: View::new(view.checked_sub(1).unwrap()),
payload: H::commitment(block),
};
let notarization = H::make_notarization(proposal.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization).await;
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
}
let received2 = sub2_rx.await.unwrap();
assert_eq!(received2.digest(), digest2);
assert_eq!(received2.height().get(), 2);
})
}
pub fn subscribe_blocks_from_different_sources<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
let mut handle = handles[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let n = participants.len() as u16;
let block1 = H::make_test_block(
parent,
H::genesis_parent_commitment(n),
Height::new(1),
1,
n,
);
let block2 = H::make_test_block(
H::digest(&block1),
H::commitment(&block1),
Height::new(2),
2,
n,
);
let block3 = H::make_test_block(
H::digest(&block2),
H::commitment(&block2),
Height::new(3),
3,
n,
);
let block4 = H::make_test_block(
H::digest(&block3),
H::commitment(&block3),
Height::new(4),
4,
n,
);
let block5 = H::make_test_block(
H::digest(&block4),
H::commitment(&block4),
Height::new(5),
5,
n,
);
let sub1_rx = handle
.mailbox
.subscribe_by_digest(None, H::digest(&block1))
.await;
let sub2_rx = handle
.mailbox
.subscribe_by_digest(None, H::digest(&block2))
.await;
let sub3_rx = handle
.mailbox
.subscribe_by_digest(None, H::digest(&block3))
.await;
let sub4_rx = handle
.mailbox
.subscribe_by_digest(None, H::digest(&block4))
.await;
let sub5_rx = handle
.mailbox
.subscribe_by_digest(None, H::digest(&block5))
.await;
H::propose(
&mut handle,
Round::new(Epoch::zero(), View::new(1)),
&block1,
)
.await;
context.sleep(Duration::from_millis(20)).await;
let received1 = sub1_rx.await.unwrap();
assert_eq!(received1.digest(), H::digest(&block1));
assert_eq!(received1.height().get(), 1);
H::propose(
&mut handle,
Round::new(Epoch::zero(), View::new(2)),
&block2,
)
.await;
H::verify(
&mut handle,
Round::new(Epoch::zero(), View::new(2)),
&block2,
&mut handles,
)
.await;
let received2 = sub2_rx.await.unwrap();
assert_eq!(received2.digest(), H::digest(&block2));
assert_eq!(received2.height().get(), 2);
let proposal3 = Proposal {
round: Round::new(Epoch::zero(), View::new(3)),
parent: View::new(2),
payload: H::commitment(&block3),
};
let notarization3 = H::make_notarization(proposal3.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization3).await;
H::propose(
&mut handle,
Round::new(Epoch::zero(), View::new(3)),
&block3,
)
.await;
H::verify(
&mut handle,
Round::new(Epoch::zero(), View::new(3)),
&block3,
&mut handles,
)
.await;
let received3 = sub3_rx.await.unwrap();
assert_eq!(received3.digest(), H::digest(&block3));
assert_eq!(received3.height().get(), 3);
let finalization4 = H::make_finalization(
Proposal {
round: Round::new(Epoch::zero(), View::new(4)),
parent: View::new(3),
payload: H::commitment(&block4),
},
&schemes,
QUORUM,
);
H::report_finalization(&mut handle.mailbox, finalization4).await;
H::propose(
&mut handle,
Round::new(Epoch::zero(), View::new(4)),
&block4,
)
.await;
H::verify(
&mut handle,
Round::new(Epoch::zero(), View::new(4)),
&block4,
&mut handles,
)
.await;
let received4 = sub4_rx.await.unwrap();
assert_eq!(received4.digest(), H::digest(&block4));
assert_eq!(received4.height().get(), 4);
let proposal5 = Proposal {
round: Round::new(Epoch::zero(), View::new(5)),
parent: View::new(4),
payload: H::commitment(&block5),
};
let notarization5 = H::make_notarization(proposal5.clone(), &schemes, QUORUM);
H::report_notarization(&mut handle.mailbox, notarization5).await;
let finalization5 = H::make_finalization(proposal5, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization5).await;
H::propose(
&mut handle,
Round::new(Epoch::zero(), View::new(5)),
&block5,
)
.await;
H::verify(
&mut handle,
Round::new(Epoch::zero(), View::new(5)),
&block5,
&mut handles,
)
.await;
let received5 = sub5_rx.await.unwrap();
assert_eq!(received5.digest(), H::digest(&block5));
assert_eq!(received5.height().get(), 5);
})
}
pub fn get_info_basic_queries_present_and_missing<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
assert!(handle.mailbox.get_info(Identifier::Latest).await.is_none());
assert!(handle.mailbox.get_info(Height::new(1)).await.is_none());
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(1));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::zero(),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
assert_eq!(
handle.mailbox.get_info(Identifier::Latest).await,
Some((Height::new(1), digest))
);
assert_eq!(
handle.mailbox.get_info(Height::new(1)).await,
Some((Height::new(1), digest))
);
assert_eq!(
handle.mailbox.get_info(&digest).await,
Some((Height::new(1), digest))
);
assert!(handle.mailbox.get_info(Height::new(2)).await.is_none());
let missing = Sha256::hash(b"missing");
assert!(handle.mailbox.get_info(&missing).await.is_none());
})
}
pub fn get_info_latest_progression_multiple_finalizations<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let mut digests = Vec::new();
for i in 1..=5u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(i));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
assert_eq!(
handle.mailbox.get_info(Identifier::Latest).await,
Some((Height::new(i), digest))
);
parent = digest;
parent_commitment = commitment;
digests.push(digest);
}
for (i, digest) in digests.iter().enumerate() {
let height = Height::new(i as u64 + 1);
assert_eq!(
handle.mailbox.get_info(height).await,
Some((height, *digest))
);
}
})
}
pub fn get_block_by_height_and_latest<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
assert!(handle
.mailbox
.get_block(Identifier::Height(Height::new(1)))
.await
.is_none());
assert!(handle.mailbox.get_block(Identifier::Latest).await.is_none());
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let mut blocks = Vec::new();
for i in 1..=3u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(i));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
parent = digest;
parent_commitment = commitment;
blocks.push((digest, block));
}
for (i, (digest, _block)) in blocks.iter().enumerate() {
let height = Height::new(i as u64 + 1);
let fetched = handle
.mailbox
.get_block(Identifier::Height(height))
.await
.unwrap();
assert_eq!(fetched.digest(), *digest);
assert_eq!(fetched.height(), height);
}
let latest = handle.mailbox.get_block(Identifier::Latest).await.unwrap();
assert_eq!(latest.digest(), blocks[2].0);
assert_eq!(latest.height(), Height::new(3));
assert!(handle
.mailbox
.get_block(Identifier::Height(Height::new(10)))
.await
.is_none());
})
}
pub fn get_block_by_commitment_from_sources_and_missing<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(1));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::zero(),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
let fetched = handle.mailbox.get_block(&digest).await.unwrap();
assert_eq!(fetched.digest(), digest);
assert_eq!(fetched.height(), Height::new(1));
let missing = Sha256::hash(b"missing");
assert!(handle.mailbox.get_block(&missing).await.is_none());
})
}
pub fn get_finalization_by_height<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
assert!(handle
.mailbox
.get_finalization(Height::new(1))
.await
.is_none());
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
for i in 1..=3u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(i));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal.clone(), &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
let fin = handle
.mailbox
.get_finalization(Height::new(i))
.await
.unwrap();
assert_eq!(fin.proposal.payload, commitment);
assert_eq!(fin.round().view(), View::new(i));
parent = digest;
parent_commitment = commitment;
}
assert!(handle
.mailbox
.get_finalization(Height::new(10))
.await
.is_none());
})
}
pub fn hint_finalized_triggers_fetch<H: TestHarness>() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(42)
.with_timeout(Some(Duration::from_secs(60))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(3), participants.clone())
.await;
let setup0 = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let app0 = setup0.application;
let mut handle0 = ValidatorHandle {
mailbox: setup0.mailbox,
extra: setup0.extra,
};
let setup1 = H::setup_validator(
context.with_label("validator_1"),
&mut oracle,
participants[1].clone(),
ConstantProvider::new(schemes[1].clone()),
)
.await;
let handle1: ValidatorHandle<H> = ValidatorHandle {
mailbox: setup1.mailbox,
extra: setup1.extra,
};
setup_network_links(&mut oracle, &participants[..2], LINK).await;
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
for i in 1..=5u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::new(0), View::new(i));
H::propose(&mut handle0, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle0.mailbox, finalization).await;
parent = digest;
parent_commitment = commitment;
}
while app0.blocks().len() < 5 {
context.sleep(Duration::from_millis(10)).await;
}
assert!(handle1
.mailbox
.get_finalization(Height::new(5))
.await
.is_none());
handle1
.mailbox
.hint_finalized(Height::new(5), NonEmptyVec::new(participants[0].clone()))
.await;
while handle1
.mailbox
.get_finalization(Height::new(5))
.await
.is_none()
{
context.sleep(Duration::from_millis(10)).await;
}
let finalization = handle1
.mailbox
.get_finalization(Height::new(5))
.await
.expect("finalization should be fetched");
assert_eq!(finalization.proposal.round.view(), View::new(5));
})
}
pub fn ancestry_stream<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
for i in 1..=5u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(i));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
parent = digest;
parent_commitment = commitment;
}
let (_, commitment) = handle.mailbox.get_info(Identifier::Latest).await.unwrap();
let ancestry = handle.mailbox.ancestry((None, commitment)).await.unwrap();
let blocks = ancestry.collect::<Vec<_>>().await;
assert_eq!(blocks.len(), 5);
(0..5).for_each(|i| {
assert_eq!(blocks[i].height().get(), 5 - i as u64);
});
})
}
pub fn finalize_same_height_different_views<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let mut handles = Vec::new();
for (i, validator) in participants.iter().enumerate().take(2) {
let setup = H::setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
handles.push(ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
});
}
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
for handle in handles.iter_mut() {
H::propose(handle, Round::new(Epoch::new(0), View::new(1)), &block).await;
}
context.sleep(LINK.latency).await;
let proposal_v1 = Proposal {
round: Round::new(Epoch::new(0), View::new(1)),
parent: View::new(0),
payload: commitment,
};
let notarization_v1 = H::make_notarization(proposal_v1.clone(), &schemes, QUORUM);
let finalization_v1 = H::make_finalization(proposal_v1.clone(), &schemes, QUORUM);
H::report_notarization(&mut handles[0].mailbox, notarization_v1.clone()).await;
H::report_finalization(&mut handles[0].mailbox, finalization_v1.clone()).await;
let proposal_v2 = Proposal {
round: Round::new(Epoch::new(0), View::new(2)), parent: View::new(0),
payload: commitment, };
let notarization_v2 = H::make_notarization(proposal_v2.clone(), &schemes, QUORUM);
let finalization_v2 = H::make_finalization(proposal_v2.clone(), &schemes, QUORUM);
H::report_notarization(&mut handles[1].mailbox, notarization_v2.clone()).await;
H::report_finalization(&mut handles[1].mailbox, finalization_v2.clone()).await;
context.sleep(Duration::from_millis(100)).await;
let block0 = handles[0].mailbox.get_block(Height::new(1)).await.unwrap();
let block1 = handles[1].mailbox.get_block(Height::new(1)).await.unwrap();
assert_eq!(block0.digest(), digest);
assert_eq!(block1.digest(), digest);
let fin0 = handles[0]
.mailbox
.get_finalization(Height::new(1))
.await
.unwrap();
let fin1 = handles[1]
.mailbox
.get_finalization(Height::new(1))
.await
.unwrap();
assert_eq!(fin0.proposal.payload, commitment);
assert_eq!(fin0.round().view(), View::new(1));
assert_eq!(fin1.proposal.payload, commitment);
assert_eq!(fin1.round().view(), View::new(2));
assert_eq!(
handles[0].mailbox.get_info(Height::new(1)).await,
Some((Height::new(1), digest))
);
assert_eq!(
handles[1].mailbox.get_info(Height::new(1)).await,
Some((Height::new(1), digest))
);
H::report_finalization(&mut handles[0].mailbox, finalization_v2.clone()).await;
H::report_finalization(&mut handles[1].mailbox, finalization_v1.clone()).await;
context.sleep(Duration::from_millis(100)).await;
let fin0_after = handles[0]
.mailbox
.get_finalization(Height::new(1))
.await
.unwrap();
assert_eq!(fin0_after.round().view(), View::new(1));
let fin1_after = handles[1]
.mailbox
.get_finalization(Height::new(1))
.await
.unwrap();
assert_eq!(fin1_after.round().view(), View::new(2));
})
}
pub fn init_processed_height<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let validator = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let app = setup.application;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
let initial_height = setup.height;
assert_eq!(initial_height, Height::zero());
let mut parent = Sha256::hash(b"");
let mut parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
for i in 1..=5u64 {
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(i),
i,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
let round = Round::new(Epoch::zero(), View::new(i));
H::propose(&mut handle, round, &block).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = H::make_finalization(proposal, &schemes, QUORUM);
H::report_finalization(&mut handle.mailbox, finalization).await;
parent = digest;
parent_commitment = commitment;
}
while app.blocks().len() < 5 {
context.sleep(Duration::from_millis(10)).await;
}
drop(handle);
let setup2 = H::setup_validator(
context.with_label("validator_0_restart"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let recovered_height = setup2.height;
assert_eq!(recovered_height, Height::new(5));
})
}
pub fn broadcast_caches_block<H: TestHarness>() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let validator = participants[0].clone();
let setup = H::setup_validator(
context.with_label("validator_0"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = ValidatorHandle {
mailbox: setup.mailbox,
extra: setup.extra,
};
let parent = Sha256::hash(b"");
let parent_commitment = H::genesis_parent_commitment(participants.len() as u16);
let block = H::make_test_block(
parent,
parent_commitment,
Height::new(1),
1,
participants.len() as u16,
);
let digest = H::digest(&block);
let commitment = H::commitment(&block);
H::propose(&mut handle, Round::new(Epoch::new(0), View::new(1)), &block).await;
handle
.mailbox
.get_block(&digest)
.await
.expect("block should be cached after broadcast");
let setup2 = H::setup_validator(
context.with_label("validator_0_restart"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle2: ValidatorHandle<H> = ValidatorHandle {
mailbox: setup2.mailbox,
extra: setup2.extra,
};
let notarization = H::make_notarization(
Proposal {
round: Round::new(Epoch::new(0), View::new(1)),
parent: View::new(0),
payload: commitment,
},
&schemes,
QUORUM,
);
H::report_notarization(&mut handle2.mailbox, notarization).await;
handle2
.mailbox
.get_block(&digest)
.await
.expect("block should be cached after broadcast");
})
}