pub mod actor;
pub use actor::Actor;
pub mod cache;
pub mod config;
pub use config::Config;
pub mod ingress;
pub use ingress::mailbox::Mailbox;
pub mod resolver;
pub mod store;
use crate::{types::Height, Block};
use commonware_utils::{acknowledgement::Exact, Acknowledgement};
#[derive(Clone, Debug)]
pub enum Update<B: Block, A: Acknowledgement = Exact> {
Tip(Height, B::Commitment),
Block(B, A),
}
#[cfg(test)]
pub mod mocks;
#[cfg(test)]
mod tests {
use super::{
actor,
config::Config,
mocks::{application::Application, block::Block},
resolver::p2p as resolver,
};
use crate::{
application::marshaled::Marshaled,
marshal::ingress::mailbox::{AncestorStream, Identifier},
simplex::{
scheme::bls12381_threshold,
types::{Activity, Context, Finalization, Finalize, Notarization, Notarize, Proposal},
},
types::{Epoch, Epocher, FixedEpocher, Height, Round, View, ViewDelta},
Automaton, Heightable, Reporter, VerifyingApplication,
};
use commonware_broadcast::buffered;
use commonware_cryptography::{
bls12381::primitives::variant::MinPk,
certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
ed25519::PublicKey,
sha256::{Digest as Sha256Digest, Sha256},
Committable, Digestible, Hasher as _,
};
use commonware_macros::test_traced;
use commonware_p2p::{
simulated::{self, Link, Network, Oracle},
Manager,
};
use commonware_parallel::Sequential;
use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Quota, Runner};
use commonware_storage::archive::immutable;
use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU16, NZU64};
use futures::StreamExt;
use rand::{
seq::{IteratorRandom, SliceRandom},
Rng,
};
use std::{
collections::BTreeMap,
num::{NonZeroU16, NonZeroU32, NonZeroU64, NonZeroUsize},
time::{Duration, Instant},
};
use tracing::info;
type D = Sha256Digest;
type B = Block<D>;
type K = PublicKey;
type V = MinPk;
type S = bls12381_threshold::Scheme<K, V>;
type P = ConstantProvider<S, Epoch>;
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
const NAMESPACE: &[u8] = b"test";
const NUM_VALIDATORS: u32 = 4;
const QUORUM: u32 = 3;
const NUM_BLOCKS: u64 = 160;
const BLOCKS_PER_EPOCH: NonZeroU64 = NZU64!(20);
const LINK: Link = Link {
latency: Duration::from_millis(100),
jitter: Duration::from_millis(1),
success_rate: 1.0,
};
const UNRELIABLE_LINK: Link = Link {
latency: Duration::from_millis(200),
jitter: Duration::from_millis(50),
success_rate: 0.7,
};
const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
async fn setup_validator(
context: deterministic::Context,
oracle: &mut Oracle<K, deterministic::Context>,
validator: K,
provider: P,
) -> (
Application<B>,
crate::marshal::ingress::mailbox::Mailbox<S, B>,
Height,
) {
let config = Config {
provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
mailbox_size: 100,
view_retention_timeout: ViewDelta::new(10),
max_repair: NZUsize!(10),
block_codec_config: (),
partition_prefix: format!("validator-{}", validator.clone()),
prunable_items_per_section: NZU64!(10),
replay_buffer: NZUsize!(1024),
key_write_buffer: NZUsize!(1024),
value_write_buffer: NZUsize!(1024),
buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
};
let control = oracle.control(validator.clone());
let backfill = control.register(1, TEST_QUOTA).await.unwrap();
let resolver_cfg = resolver::Config {
public_key: validator.clone(),
manager: oracle.manager(),
blocker: control.clone(),
mailbox_size: config.mailbox_size,
initial: Duration::from_secs(1),
timeout: Duration::from_secs(2),
fetch_retry_timeout: Duration::from_millis(100),
priority_requests: false,
priority_responses: false,
};
let resolver = resolver::init(&context, resolver_cfg, backfill);
let broadcast_config = buffered::Config {
public_key: validator.clone(),
mailbox_size: config.mailbox_size,
deque_size: 10,
priority: false,
codec_config: (),
};
let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
let network = control.register(2, TEST_QUOTA).await.unwrap();
broadcast_engine.start(network);
let start = Instant::now();
let finalizations_by_height = immutable::Archive::init(
context.with_label("finalizations_by_height"),
immutable::Config {
metadata_partition: format!(
"{}-finalizations-by-height-metadata",
config.partition_prefix
),
freezer_table_partition: format!(
"{}-finalizations-by-height-freezer-table",
config.partition_prefix
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{}-finalizations-by-height-freezer-key",
config.partition_prefix
),
freezer_key_buffer_pool: config.buffer_pool.clone(),
freezer_value_partition: format!(
"{}-finalizations-by-height-freezer-value",
config.partition_prefix
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!(
"{}-finalizations-by-height-ordinal",
config.partition_prefix
),
items_per_section: NZU64!(10),
codec_config: S::certificate_codec_config_unbounded(),
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalizations by height archive");
info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
let start = Instant::now();
let finalized_blocks = immutable::Archive::init(
context.with_label("finalized_blocks"),
immutable::Config {
metadata_partition: format!(
"{}-finalized_blocks-metadata",
config.partition_prefix
),
freezer_table_partition: format!(
"{}-finalized_blocks-freezer-table",
config.partition_prefix
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{}-finalized_blocks-freezer-key",
config.partition_prefix
),
freezer_key_buffer_pool: config.buffer_pool.clone(),
freezer_value_partition: format!(
"{}-finalized_blocks-freezer-value",
config.partition_prefix
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
items_per_section: NZU64!(10),
codec_config: config.block_codec_config,
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalized blocks archive");
info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
let (actor, mailbox, processed_height) = actor::Actor::init(
context.clone(),
finalizations_by_height,
finalized_blocks,
config,
)
.await;
let application = Application::<B>::default();
actor.start(application.clone(), buffer, resolver);
(application, mailbox, processed_height)
}
fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
let finalizes: Vec<_> = schemes
.iter()
.take(quorum as usize)
.map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
.collect();
Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
}
fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
let notarizes: Vec<_> = schemes
.iter()
.take(quorum as usize)
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
Notarization::from_notarizes(&schemes[0], ¬arizes, &Sequential).unwrap()
}
fn setup_network(
context: deterministic::Context,
tracked_peer_sets: Option<usize>,
) -> Oracle<K, deterministic::Context> {
let (network, oracle) = Network::new(
context.with_label("network"),
simulated::Config {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets,
},
);
network.start();
oracle
}
async fn setup_network_links(
oracle: &mut Oracle<K, deterministic::Context>,
peers: &[K],
link: Link,
) {
for p1 in peers.iter() {
for p2 in peers.iter() {
if p2 == p1 {
continue;
}
let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await;
}
}
}
#[test_traced("WARN")]
fn test_finalize_good_links() {
for seed in 0..5 {
let result1 = finalize(seed, LINK, false);
let result2 = finalize(seed, LINK, false);
assert_eq!(result1, result2);
}
}
#[test_traced("WARN")]
fn test_finalize_bad_links() {
for seed in 0..5 {
let result1 = finalize(seed, UNRELIABLE_LINK, false);
let result2 = finalize(seed, UNRELIABLE_LINK, false);
assert_eq!(result1, result2);
}
}
#[test_traced("WARN")]
fn test_finalize_good_links_quorum_sees_finalization() {
for seed in 0..5 {
let result1 = finalize(seed, LINK, true);
let result2 = finalize(seed, LINK, true);
assert_eq!(result1, result2);
}
}
#[test_traced("DEBUG")]
fn test_finalize_bad_links_quorum_sees_finalization() {
for seed in 0..5 {
let result1 = finalize(seed, UNRELIABLE_LINK, true);
let result2 = finalize(seed, UNRELIABLE_LINK, true);
assert_eq!(result1, result2);
}
}
fn finalize(seed: u64, link: Link, quorum_sees_finalization: bool) -> String {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(600))),
);
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), Some(3));
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut applications = BTreeMap::new();
let mut actors = Vec::new();
let mut manager = oracle.manager();
manager
.update(0, participants.clone().try_into().unwrap())
.await;
for (i, validator) in participants.iter().enumerate() {
let (application, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
applications.insert(validator.clone(), application);
actors.push(actor);
}
setup_network_links(&mut oracle, &participants, link.clone()).await;
let mut blocks = Vec::<B>::new();
let mut parent = Sha256::hash(b"");
for i in 1..=NUM_BLOCKS {
let block = B::new::<Sha256>(parent, Height::new(i), i);
parent = block.digest();
blocks.push(block);
}
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
blocks.shuffle(&mut context);
for block in blocks.iter() {
let height = block.height();
assert!(
!height.is_zero(),
"genesis block should not have been generated"
);
let bounds = epocher.containing(height).unwrap();
let round = Round::new(bounds.epoch(), View::new(height.get()));
let actor_index: usize = (height.get() % (NUM_VALIDATORS as u64)) as usize;
let mut actor = actors[actor_index].clone();
actor.proposed(round, block.clone()).await;
actor.verified(round, block.clone()).await;
context.sleep(link.latency).await;
let proposal = Proposal {
round,
parent: View::new(height.previous().unwrap().get()),
payload: block.digest(),
};
let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
actor
.report(Activity::Notarization(notarization.clone()))
.await;
let fin = make_finalization(proposal, &schemes, QUORUM);
if quorum_sees_finalization {
let do_finalize = context.gen_bool(0.2);
for (i, actor) in actors
.iter_mut()
.choose_multiple(&mut context, NUM_VALIDATORS as usize)
.iter_mut()
.enumerate()
{
if (do_finalize && i < QUORUM as usize)
|| height == Height::new(NUM_BLOCKS)
|| height == bounds.last()
{
actor.report(Activity::Finalization(fin.clone())).await;
}
}
} else {
for actor in actors.iter_mut() {
if context.gen_bool(0.2)
|| height == Height::new(NUM_BLOCKS)
|| height == bounds.last()
{
actor.report(Activity::Finalization(fin.clone())).await;
}
}
}
}
let mut finished = false;
while !finished {
context.sleep(Duration::from_secs(1)).await;
if applications.len() != NUM_VALIDATORS as usize {
continue;
}
finished = true;
for app in applications.values() {
if app.blocks().len() != NUM_BLOCKS as usize {
finished = false;
break;
}
let Some((height, _)) = app.tip() else {
finished = false;
break;
};
if height < Height::new(NUM_BLOCKS) {
finished = false;
break;
}
}
}
context.auditor().state()
})
}
#[test_traced("WARN")]
fn test_sync_height_floor() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(0xFF)
.with_timeout(Some(Duration::from_secs(300))),
);
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), Some(3));
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut applications = BTreeMap::new();
let mut actors = Vec::new();
let mut manager = oracle.manager();
manager
.update(0, participants.clone().try_into().unwrap())
.await;
for (i, validator) in participants.iter().enumerate().skip(1) {
let (application, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
applications.insert(validator.clone(), application);
actors.push(actor);
}
setup_network_links(&mut oracle, &participants[1..], LINK).await;
let mut blocks = Vec::<B>::new();
let mut parent = Sha256::hash(b"");
for i in 1..=NUM_BLOCKS {
let block = B::new::<Sha256>(parent, Height::new(i), i);
parent = block.digest();
blocks.push(block);
}
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
for block in blocks.iter() {
let height = block.height();
assert!(
!height.is_zero(),
"genesis block should not have been generated"
);
let bounds = epocher.containing(height).unwrap();
let round = Round::new(bounds.epoch(), View::new(height.get()));
let actor_index: usize = (height.get() % (applications.len() as u64)) as usize;
let mut actor = actors[actor_index].clone();
actor.proposed(round, block.clone()).await;
actor.verified(round, block.clone()).await;
context.sleep(LINK.latency).await;
let proposal = Proposal {
round,
parent: View::new(height.previous().unwrap().get()),
payload: block.digest(),
};
let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
actor
.report(Activity::Notarization(notarization.clone()))
.await;
let fin = make_finalization(proposal, &schemes, QUORUM);
for actor in actors.iter_mut() {
actor.report(Activity::Finalization(fin.clone())).await;
}
}
let mut finished = false;
while !finished {
context.sleep(Duration::from_secs(1)).await;
finished = true;
for app in applications.values().skip(1) {
if app.blocks().len() != NUM_BLOCKS as usize {
finished = false;
break;
}
let Some((height, _)) = app.tip() else {
finished = false;
break;
};
if height < Height::new(NUM_BLOCKS) {
finished = false;
break;
}
}
}
let validator = participants.first().unwrap();
let (app, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
setup_network_links(&mut oracle, &participants, LINK).await;
const NEW_SYNC_FLOOR: u64 = 100;
let second_actor = &mut actors[1];
let latest_finalization = second_actor
.get_finalization(Height::new(NUM_BLOCKS))
.await
.unwrap();
actor.set_floor(Height::new(NEW_SYNC_FLOOR)).await;
actor
.report(Activity::Finalization(latest_finalization))
.await;
let mut finished = false;
while !finished {
context.sleep(Duration::from_secs(1)).await;
finished = true;
if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR) as usize {
finished = false;
continue;
}
let Some((height, _)) = app.tip() else {
finished = false;
continue;
};
if height < Height::new(NUM_BLOCKS) {
finished = false;
continue;
}
}
for height in 1..=NUM_BLOCKS {
let block = actor
.get_block(Identifier::Height(Height::new(height)))
.await;
if height <= NEW_SYNC_FLOOR {
assert!(block.is_none());
} else {
assert_eq!(block.unwrap().height(), Height::new(height));
}
}
})
}
#[test_traced("WARN")]
fn test_subscribe_basic_block_delivery() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut actors = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let (_application, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
actors.push(actor);
}
let mut actor = actors[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let block = B::new::<Sha256>(parent, Height::new(1), 1);
let commitment = block.digest();
let subscription_rx = actor
.subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment)
.await;
actor
.verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
.await;
let proposal = Proposal {
round: Round::new(Epoch::new(0), View::new(1)),
parent: View::new(0),
payload: commitment,
};
let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
actor.report(Activity::Notarization(notarization)).await;
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
let received_block = subscription_rx.await.unwrap();
assert_eq!(received_block.digest(), block.digest());
assert_eq!(received_block.height(), Height::new(1));
})
}
#[test_traced("WARN")]
fn test_subscribe_multiple_subscriptions() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut actors = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let (_application, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
actors.push(actor);
}
let mut actor = actors[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let block1 = B::new::<Sha256>(parent, Height::new(1), 1);
let block2 = B::new::<Sha256>(block1.digest(), Height::new(2), 2);
let commitment1 = block1.digest();
let commitment2 = block2.digest();
let sub1_rx = actor
.subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
.await;
let sub2_rx = actor
.subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
.await;
let sub3_rx = actor
.subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
.await;
actor
.verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
.await;
actor
.verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
.await;
for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
let view = View::new(view);
let proposal = Proposal {
round: Round::new(Epoch::zero(), view),
parent: view.previous().unwrap(),
payload: block.digest(),
};
let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
actor.report(Activity::Notarization(notarization)).await;
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
}
let received1_sub1 = sub1_rx.await.unwrap();
let received2 = sub2_rx.await.unwrap();
let received1_sub3 = sub3_rx.await.unwrap();
assert_eq!(received1_sub1.digest(), block1.digest());
assert_eq!(received2.digest(), block2.digest());
assert_eq!(received1_sub3.digest(), block1.digest());
assert_eq!(received1_sub1.height(), Height::new(1));
assert_eq!(received2.height(), Height::new(2));
assert_eq!(received1_sub3.height(), Height::new(1));
})
}
#[test_traced("WARN")]
fn test_subscribe_canceled_subscriptions() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut actors = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let (_application, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
actors.push(actor);
}
let mut actor = actors[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let block1 = B::new::<Sha256>(parent, Height::new(1), 1);
let block2 = B::new::<Sha256>(block1.digest(), Height::new(2), 2);
let commitment1 = block1.digest();
let commitment2 = block2.digest();
let sub1_rx = actor
.subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
.await;
let sub2_rx = actor
.subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
.await;
drop(sub1_rx);
actor
.verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
.await;
actor
.verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
.await;
for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
let view = View::new(view);
let proposal = Proposal {
round: Round::new(Epoch::zero(), view),
parent: view.previous().unwrap(),
payload: block.digest(),
};
let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
actor.report(Activity::Notarization(notarization)).await;
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
}
let received2 = sub2_rx.await.unwrap();
assert_eq!(received2.digest(), block2.digest());
assert_eq!(received2.height(), Height::new(2));
})
}
#[test_traced("WARN")]
fn test_subscribe_blocks_from_different_sources() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut actors = Vec::new();
for (i, validator) in participants.iter().enumerate() {
let (_application, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
actors.push(actor);
}
let mut actor = actors[0].clone();
setup_network_links(&mut oracle, &participants, LINK).await;
let parent = Sha256::hash(b"");
let block1 = B::new::<Sha256>(parent, Height::new(1), 1);
let block2 = B::new::<Sha256>(block1.digest(), Height::new(2), 2);
let block3 = B::new::<Sha256>(block2.digest(), Height::new(3), 3);
let block4 = B::new::<Sha256>(block3.digest(), Height::new(4), 4);
let block5 = B::new::<Sha256>(block4.digest(), Height::new(5), 5);
let sub1_rx = actor.subscribe(None, block1.digest()).await;
let sub2_rx = actor.subscribe(None, block2.digest()).await;
let sub3_rx = actor.subscribe(None, block3.digest()).await;
let sub4_rx = actor.subscribe(None, block4.digest()).await;
let sub5_rx = actor.subscribe(None, block5.digest()).await;
actor
.proposed(Round::new(Epoch::zero(), View::new(1)), block1.clone())
.await;
context.sleep(Duration::from_millis(20)).await;
let received1 = sub1_rx.await.unwrap();
assert_eq!(received1.digest(), block1.digest());
assert_eq!(received1.height(), Height::new(1));
actor
.verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
.await;
let received2 = sub2_rx.await.unwrap();
assert_eq!(received2.digest(), block2.digest());
assert_eq!(received2.height(), Height::new(2));
let proposal3 = Proposal {
round: Round::new(Epoch::new(0), View::new(3)),
parent: View::new(2),
payload: block3.digest(),
};
let notarization3 = make_notarization(proposal3.clone(), &schemes, QUORUM);
actor.report(Activity::Notarization(notarization3)).await;
actor
.verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
.await;
let received3 = sub3_rx.await.unwrap();
assert_eq!(received3.digest(), block3.digest());
assert_eq!(received3.height(), Height::new(3));
let finalization4 = make_finalization(
Proposal {
round: Round::new(Epoch::new(0), View::new(4)),
parent: View::new(3),
payload: block4.digest(),
},
&schemes,
QUORUM,
);
actor.report(Activity::Finalization(finalization4)).await;
actor
.verified(Round::new(Epoch::new(0), View::new(4)), block4.clone())
.await;
let received4 = sub4_rx.await.unwrap();
assert_eq!(received4.digest(), block4.digest());
assert_eq!(received4.height(), Height::new(4));
let remote_actor = &mut actors[1].clone();
remote_actor
.proposed(Round::new(Epoch::zero(), View::new(5)), block5.clone())
.await;
context.sleep(Duration::from_millis(20)).await;
let received5 = sub5_rx.await.unwrap();
assert_eq!(received5.digest(), block5.digest());
assert_eq!(received5.height(), Height::new(5));
})
}
#[test_traced("WARN")]
fn test_get_info_basic_queries_present_and_missing() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_application, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
assert!(actor.get_info(Identifier::Latest).await.is_none());
assert!(actor.get_info(Height::new(1)).await.is_none());
let parent = Sha256::hash(b"");
let block = B::new::<Sha256>(parent, Height::new(1), 1);
let digest = block.digest();
let round = Round::new(Epoch::new(0), View::new(1));
actor.verified(round, block.clone()).await;
let proposal = Proposal {
round,
parent: View::new(0),
payload: digest,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
assert_eq!(
actor.get_info(Identifier::Latest).await,
Some((Height::new(1), digest))
);
assert_eq!(
actor.get_info(Height::new(1)).await,
Some((Height::new(1), digest))
);
assert_eq!(
actor.get_info(&digest).await,
Some((Height::new(1), digest))
);
assert!(actor.get_info(Height::new(2)).await.is_none());
let missing = Sha256::hash(b"missing");
assert!(actor.get_info(&missing).await.is_none());
})
}
#[test_traced("WARN")]
fn test_get_info_latest_progression_multiple_finalizations() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_application, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
assert!(actor.get_info(Identifier::Latest).await.is_none());
let parent0 = Sha256::hash(b"");
let block1 = B::new::<Sha256>(parent0, Height::new(1), 1);
let d1 = block1.digest();
actor
.verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
.await;
let f1 = make_finalization(
Proposal {
round: Round::new(Epoch::new(0), View::new(1)),
parent: View::new(0),
payload: d1,
},
&schemes,
QUORUM,
);
actor.report(Activity::Finalization(f1)).await;
let latest = actor.get_info(Identifier::Latest).await;
assert_eq!(latest, Some((Height::new(1), d1)));
let block2 = B::new::<Sha256>(d1, Height::new(2), 2);
let d2 = block2.digest();
actor
.verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
.await;
let f2 = make_finalization(
Proposal {
round: Round::new(Epoch::new(0), View::new(2)),
parent: View::new(1),
payload: d2,
},
&schemes,
QUORUM,
);
actor.report(Activity::Finalization(f2)).await;
let latest = actor.get_info(Identifier::Latest).await;
assert_eq!(latest, Some((Height::new(2), d2)));
let block3 = B::new::<Sha256>(d2, Height::new(3), 3);
let d3 = block3.digest();
actor
.verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
.await;
let f3 = make_finalization(
Proposal {
round: Round::new(Epoch::new(0), View::new(3)),
parent: View::new(2),
payload: d3,
},
&schemes,
QUORUM,
);
actor.report(Activity::Finalization(f3)).await;
let latest = actor.get_info(Identifier::Latest).await;
assert_eq!(latest, Some((Height::new(3), d3)));
})
}
#[test_traced("WARN")]
fn test_get_block_by_height_and_latest() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (application, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let latest_block = actor.get_block(Identifier::Latest).await;
assert!(latest_block.is_none());
assert!(application.tip().is_none());
let parent = Sha256::hash(b"");
let block = B::new::<Sha256>(parent, Height::new(1), 1);
let commitment = block.digest();
let round = Round::new(Epoch::new(0), View::new(1));
actor.verified(round, block.clone()).await;
let proposal = Proposal {
round,
parent: View::new(0),
payload: commitment,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
let by_height = actor
.get_block(Height::new(1))
.await
.expect("missing block by height");
assert_eq!(by_height.height(), Height::new(1));
assert_eq!(by_height.digest(), commitment);
assert_eq!(application.tip(), Some((Height::new(1), commitment)));
let by_latest = actor
.get_block(Identifier::Latest)
.await
.expect("missing block by latest");
assert_eq!(by_latest.height(), Height::new(1));
assert_eq!(by_latest.digest(), commitment);
let by_height = actor.get_block(Height::new(2)).await;
assert!(by_height.is_none());
})
}
#[test_traced("WARN")]
fn test_get_block_by_commitment_from_sources_and_missing() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_application, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let parent = Sha256::hash(b"");
let ver_block = B::new::<Sha256>(parent, Height::new(1), 1);
let ver_commitment = ver_block.digest();
let round1 = Round::new(Epoch::new(0), View::new(1));
actor.verified(round1, ver_block.clone()).await;
let got = actor
.get_block(&ver_commitment)
.await
.expect("missing block from cache");
assert_eq!(got.digest(), ver_commitment);
let fin_block = B::new::<Sha256>(ver_commitment, Height::new(2), 2);
let fin_commitment = fin_block.digest();
let round2 = Round::new(Epoch::new(0), View::new(2));
actor.verified(round2, fin_block.clone()).await;
let proposal = Proposal {
round: round2,
parent: View::new(1),
payload: fin_commitment,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
let got = actor
.get_block(&fin_commitment)
.await
.expect("missing block from finalized archive");
assert_eq!(got.digest(), fin_commitment);
assert_eq!(got.height(), Height::new(2));
let missing = Sha256::hash(b"definitely-missing");
let missing_block = actor.get_block(&missing).await;
assert!(missing_block.is_none());
})
}
#[test_traced("WARN")]
fn test_get_finalization_by_height() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_application, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let finalization = actor.get_finalization(Height::new(1)).await;
assert!(finalization.is_none());
let parent = Sha256::hash(b"");
let block = B::new::<Sha256>(parent, Height::new(1), 1);
let commitment = block.digest();
let round = Round::new(Epoch::new(0), View::new(1));
actor.verified(round, block.clone()).await;
let proposal = Proposal {
round,
parent: View::new(0),
payload: commitment,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
let finalization = actor
.get_finalization(Height::new(1))
.await
.expect("missing finalization by height");
assert_eq!(finalization.proposal.parent, View::new(0));
assert_eq!(
finalization.proposal.round,
Round::new(Epoch::new(0), View::new(1))
);
assert_eq!(finalization.proposal.payload, commitment);
assert!(actor.get_finalization(Height::new(2)).await.is_none());
})
}
#[test_traced("WARN")]
fn test_hint_finalized_triggers_fetch() {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(42)
.with_timeout(Some(Duration::from_secs(60))),
);
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), Some(3));
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut manager = oracle.manager();
manager
.update(0, participants.clone().try_into().unwrap())
.await;
let (app0, mut actor0, _) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let (_app1, mut actor1, _) = setup_validator(
context.with_label("validator_1"),
&mut oracle,
participants[1].clone(),
ConstantProvider::new(schemes[1].clone()),
)
.await;
setup_network_links(&mut oracle, &participants[..2], LINK).await;
let mut parent = Sha256::hash(b"");
for i in 1..=5u64 {
let block = B::new::<Sha256>(parent, Height::new(i), i);
let commitment = block.digest();
let round = Round::new(Epoch::new(0), View::new(i));
actor0.verified(round, block.clone()).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor0.report(Activity::Finalization(finalization)).await;
parent = commitment;
}
while app0.blocks().len() < 5 {
context.sleep(Duration::from_millis(10)).await;
}
assert!(actor1.get_finalization(Height::new(5)).await.is_none());
actor1
.hint_finalized(Height::new(5), NonEmptyVec::new(participants[0].clone()))
.await;
while actor1.get_finalization(Height::new(5)).await.is_none() {
context.sleep(Duration::from_millis(10)).await;
}
let finalization = actor1
.get_finalization(Height::new(5))
.await
.expect("finalization should be fetched");
assert_eq!(finalization.proposal.round.view(), View::new(5));
})
}
#[test_traced("WARN")]
fn test_ancestry_stream() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_application, mut actor, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut parent = Sha256::hash(b"");
for i in 1..=5 {
let block = B::new::<Sha256>(parent, Height::new(i), i);
let commitment = block.digest();
let round = Round::new(Epoch::new(0), View::new(i));
actor.verified(round, block.clone()).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
parent = block.digest();
}
let (_, commitment) = actor.get_info(Identifier::Latest).await.unwrap();
let ancestry = actor.ancestry((None, commitment)).await.unwrap();
let blocks = ancestry.collect::<Vec<_>>().await;
assert_eq!(blocks.len(), 5);
(0..5).for_each(|i| {
assert_eq!(blocks[i].height(), Height::new(5 - i as u64));
});
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_invalid_ancestry() {
#[derive(Clone)]
struct MockVerifyingApp {
genesis: B,
}
impl crate::Application<deterministic::Context> for MockVerifyingApp {
type Block = B;
type Context = Context<D, K>;
type SigningScheme = S;
async fn genesis(&mut self) -> Self::Block {
self.genesis.clone()
}
async fn propose(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
) -> Option<Self::Block> {
None
}
}
impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
) -> bool {
true
}
}
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_base_app, marshal, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let genesis = B::new::<Sha256>(Sha256::hash(b""), Height::zero(), 0);
let mock_app = MockVerifyingApp {
genesis: genesis.clone(),
};
let mut marshaled = Marshaled::new(
context.clone(),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let honest_parent = B::new::<Sha256>(
genesis.commitment(),
Height::new(BLOCKS_PER_EPOCH.get() + 1),
1000,
);
let parent_commitment = honest_parent.commitment();
let parent_round = Round::new(Epoch::new(1), View::new(21));
marshal
.clone()
.verified(parent_round, honest_parent.clone())
.await;
let malicious_block = B::new::<Sha256>(
parent_commitment,
Height::new(BLOCKS_PER_EPOCH.get() + 15),
2000,
);
let malicious_commitment = malicious_block.commitment();
marshal
.clone()
.proposed(
Round::new(Epoch::new(1), View::new(35)),
malicious_block.clone(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let byzantine_context = Context {
round: Round::new(Epoch::new(1), View::new(35)),
leader: me.clone(),
parent: (View::new(21), parent_commitment), };
let verify = marshaled
.verify(byzantine_context, malicious_commitment)
.await;
assert!(
!verify.await.unwrap(),
"Byzantine block with non-contiguous heights should be rejected"
);
let malicious_block = B::new::<Sha256>(
genesis.commitment(),
Height::new(BLOCKS_PER_EPOCH.get() + 2),
3000,
);
let malicious_commitment = malicious_block.commitment();
marshal
.clone()
.proposed(
Round::new(Epoch::new(1), View::new(22)),
malicious_block.clone(),
)
.await;
context.sleep(Duration::from_millis(10)).await;
let byzantine_context = Context {
round: Round::new(Epoch::new(1), View::new(22)),
leader: me.clone(),
parent: (View::new(21), parent_commitment), };
let verify = marshaled
.verify(byzantine_context, malicious_commitment)
.await;
assert!(
!verify.await.unwrap(),
"Byzantine block with mismatched parent commitment should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_finalize_same_height_different_views() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut actors = Vec::new();
for (i, validator) in participants.iter().enumerate().take(2) {
let (_app, actor, _processed_height) = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await;
actors.push(actor);
}
let parent = Sha256::hash(b"");
let block = B::new::<Sha256>(parent, Height::new(1), 1);
let commitment = block.digest();
actors[0]
.verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
.await;
actors[1]
.verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
.await;
let proposal_v1 = Proposal {
round: Round::new(Epoch::new(0), View::new(1)),
parent: View::new(0),
payload: commitment,
};
let notarization_v1 = make_notarization(proposal_v1.clone(), &schemes, QUORUM);
let finalization_v1 = make_finalization(proposal_v1.clone(), &schemes, QUORUM);
actors[0]
.report(Activity::Notarization(notarization_v1.clone()))
.await;
actors[0]
.report(Activity::Finalization(finalization_v1.clone()))
.await;
let proposal_v2 = Proposal {
round: Round::new(Epoch::new(0), View::new(2)), parent: View::new(0),
payload: commitment, };
let notarization_v2 = make_notarization(proposal_v2.clone(), &schemes, QUORUM);
let finalization_v2 = make_finalization(proposal_v2.clone(), &schemes, QUORUM);
actors[1]
.report(Activity::Notarization(notarization_v2.clone()))
.await;
actors[1]
.report(Activity::Finalization(finalization_v2.clone()))
.await;
context.sleep(Duration::from_millis(100)).await;
let block0 = actors[0].get_block(Height::new(1)).await.unwrap();
let block1 = actors[1].get_block(Height::new(1)).await.unwrap();
assert_eq!(block0, block);
assert_eq!(block1, block);
let fin0 = actors[0].get_finalization(Height::new(1)).await.unwrap();
let fin1 = actors[1].get_finalization(Height::new(1)).await.unwrap();
assert_eq!(fin0.proposal.payload, block.commitment());
assert_eq!(fin0.round().view(), View::new(1));
assert_eq!(fin1.proposal.payload, block.commitment());
assert_eq!(fin1.round().view(), View::new(2));
assert_eq!(
actors[0].get_info(Height::new(1)).await,
Some((Height::new(1), commitment))
);
assert_eq!(
actors[1].get_info(Height::new(1)).await,
Some((Height::new(1), commitment))
);
actors[0]
.report(Activity::Finalization(finalization_v2.clone()))
.await;
actors[1]
.report(Activity::Finalization(finalization_v1.clone()))
.await;
context.sleep(Duration::from_millis(100)).await;
let fin0_after = actors[0].get_finalization(Height::new(1)).await.unwrap();
assert_eq!(fin0_after.round().view(), View::new(1));
let fin0_after = actors[1].get_finalization(Height::new(1)).await.unwrap();
assert_eq!(fin0_after.round().view(), View::new(2));
})
}
#[test_traced("WARN")]
fn test_init_processed_height() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (application, mut actor, initial_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
assert_eq!(initial_height, Height::zero());
let mut parent = Sha256::hash(b"");
let mut blocks = Vec::new();
for i in 1..=3 {
let block = B::new::<Sha256>(parent, Height::new(i), i);
let commitment = block.digest();
let round = Round::new(Epoch::new(0), View::new(i));
actor.verified(round, block.clone()).await;
let proposal = Proposal {
round,
parent: View::new(i - 1),
payload: commitment,
};
let finalization = make_finalization(proposal, &schemes, QUORUM);
actor.report(Activity::Finalization(finalization)).await;
blocks.push(block);
parent = commitment;
}
while application.blocks().len() < 3 {
context.sleep(Duration::from_millis(10)).await;
}
actor.set_floor(Height::new(3)).await;
context.sleep(Duration::from_millis(10)).await;
assert_eq!(application.blocks().len(), 3);
assert_eq!(
application.tip(),
Some((Height::new(3), blocks[2].digest()))
);
let (_restart_application, _restart_actor, restart_height) = setup_validator(
context.with_label("validator_0_restart"),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
assert_eq!(restart_height, Height::new(3));
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_unsupported_epoch() {
#[derive(Clone)]
struct MockVerifyingApp {
genesis: B,
}
impl crate::Application<deterministic::Context> for MockVerifyingApp {
type Block = B;
type Context = Context<D, K>;
type SigningScheme = S;
async fn genesis(&mut self) -> Self::Block {
self.genesis.clone()
}
async fn propose(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
) -> Option<Self::Block> {
None
}
}
impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
async fn verify(
&mut self,
_context: (deterministic::Context, Self::Context),
_ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
) -> bool {
true
}
}
#[derive(Clone)]
struct LimitedEpocher {
inner: FixedEpocher,
max_epoch: u64,
}
impl Epocher for LimitedEpocher {
fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
let bounds = self.inner.containing(height)?;
if bounds.epoch().get() > self.max_epoch {
None
} else {
Some(bounds)
}
}
fn first(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.first(epoch)
}
}
fn last(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.last(epoch)
}
}
}
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let me = participants[0].clone();
let (_base_app, marshal, _processed_height) = setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let genesis = B::new::<Sha256>(Sha256::hash(b""), Height::zero(), 0);
let mock_app = MockVerifyingApp {
genesis: genesis.clone(),
};
let limited_epocher = LimitedEpocher {
inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
max_epoch: 0,
};
let mut marshaled =
Marshaled::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
let parent = B::new::<Sha256>(genesis.commitment(), Height::new(19), 1000);
let parent_commitment = parent.commitment();
let parent_round = Round::new(Epoch::new(0), View::new(19));
marshal.clone().verified(parent_round, parent).await;
let block = B::new::<Sha256>(parent_commitment, Height::new(20), 2000);
let block_commitment = block.commitment();
marshal
.clone()
.proposed(Round::new(Epoch::new(1), View::new(20)), block)
.await;
context.sleep(Duration::from_millis(10)).await;
let unsupported_context = Context {
round: Round::new(Epoch::new(1), View::new(20)),
leader: me.clone(),
parent: (View::new(19), parent_commitment),
};
let verify = marshaled
.verify(unsupported_context, block_commitment)
.await;
assert!(
!verify.await.unwrap(),
"Block in unsupported epoch should be rejected"
);
})
}
#[test_traced("INFO")]
fn test_broadcast_caches_block() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let mut oracle = setup_network(context.clone(), None);
let Fixture {
participants,
schemes,
..
} = bls12381_threshold::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let (i, validator) = participants.iter().enumerate().next().unwrap();
let mut actor = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await
.1;
let parent = Sha256::hash(b"");
let block = B::new::<Sha256>(parent, Height::new(1), 1);
let commitment = block.digest();
actor
.proposed(Round::new(Epoch::new(0), View::new(1)), block.clone())
.await;
actor
.get_block(&commitment)
.await
.expect("block should be cached after broadcast");
let mut actor = setup_validator(
context.with_label(&format!("validator_{i}")),
&mut oracle,
validator.clone(),
ConstantProvider::new(schemes[i].clone()),
)
.await
.1;
let notarization = make_notarization(
Proposal {
round: Round::new(Epoch::new(0), View::new(1)),
parent: View::new(0),
payload: commitment,
},
&schemes,
QUORUM,
);
actor.report(Activity::Notarization(notarization)).await;
let fetched = actor
.get_block(&commitment)
.await
.expect("block should be cached after broadcast");
assert_eq!(fetched, block);
});
}
}