pub mod shards;
pub mod types;
pub(crate) mod validation;
mod variant;
pub use variant::Coding;
mod marshaled;
pub use marshaled::{Marshaled, MarshaledConfig};
#[cfg(test)]
mod tests {
use crate::{
marshal::{
ancestry::BlockProvider,
coding::{
shards,
types::{coding_config_for_participants, hash_context, CodedBlock},
Coding, Marshaled, MarshaledConfig,
},
config::{Config, Start},
core,
mocks::{
application::Application,
harness::{
self, default_leader, genesis_commitment, make_coding_block,
setup_network_links, setup_network_with_participants, CodingB, CodingCtx,
CodingHarness, EmptyProvider, TestHarness, BLOCKS_PER_EPOCH, D, K, LINK,
NAMESPACE, NUM_VALIDATORS, QUORUM, S, TEST_QUOTA, UNRELIABLE_LINK, V,
},
verifying::MockVerifyingApp,
},
resolver::handler,
},
simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
types::{coding::Commitment, Epoch, Epocher, FixedEpocher, Height, Round, View, ViewDelta},
Automaton, Block, CertifiableAutomaton, CertifiableBlock,
};
use bytes::Bytes;
use commonware_actor::{mailbox, Feedback};
use commonware_codec::{Encode, FixedSize};
use commonware_coding::{CodecConfig, Config as CodingConfig, ReedSolomon};
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
sha256::Sha256,
Committable, Digestible, Hasher,
};
use commonware_macros::{select, test_group, test_traced};
use commonware_p2p::Recipients;
use commonware_parallel::Sequential;
use commonware_resolver::{Delivery, Fetch, Resolver, TargetedResolver};
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Supervisor as _,
};
use commonware_storage::archive::immutable;
use commonware_utils::{
channel::oneshot, sync::Mutex, vec::NonEmptyVec, NZUsize, NZU16, NZU64,
};
use std::{sync::Arc, time::Duration};
type TestCodingVariant = Coding<CodingB, ReedSolomon<Sha256>, Sha256, K>;
type TestCodedBlock = CodedBlock<CodingB, ReedSolomon<Sha256>, Sha256>;
type CodingSendRecord = (Round, TestCodedBlock, Recipients<K>);
const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
minimum_shards: NZU16!(1),
extra_shards: NZU16!(1),
};
#[test]
fn mailbox_provides_application_blocks() {
fn assert_provider<P: BlockProvider<Block = CodingB>>() {}
assert_provider::<core::Mailbox<S, TestCodingVariant>>();
}
#[derive(Clone, Default)]
struct RecordingCodingBuffer {
digest_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
commitment_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
sends: Arc<Mutex<Vec<CodingSendRecord>>>,
}
impl RecordingCodingBuffer {
fn subscription_count(&self) -> usize {
self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len()
}
fn commitment_subscription_count(&self) -> usize {
self.commitment_subscriptions.lock().len()
}
}
impl core::Buffer<TestCodingVariant> for RecordingCodingBuffer {
type PublicKey = K;
async fn find_by_digest(&self, _digest: D) -> Option<TestCodedBlock> {
None
}
async fn find_by_commitment(&self, _commitment: Commitment) -> Option<TestCodedBlock> {
None
}
fn subscribe_by_digest(&self, _digest: D) -> Option<oneshot::Receiver<TestCodedBlock>> {
let (sender, receiver) = oneshot::channel();
self.digest_subscriptions.lock().push(sender);
Some(receiver)
}
fn subscribe_by_commitment(
&self,
_commitment: Commitment,
) -> Option<oneshot::Receiver<TestCodedBlock>> {
let (sender, receiver) = oneshot::channel();
self.commitment_subscriptions.lock().push(sender);
Some(receiver)
}
fn finalized(&self, _commitment: Commitment) {}
fn send(&self, round: Round, block: TestCodedBlock, recipients: Recipients<K>) {
self.sends.lock().push((round, block, recipients));
}
}
type CodingFetchRecord = Fetch<handler::Key<Commitment>, handler::Annotation>;
type CodingTargetedFetch = (handler::Key<Commitment>, NonEmptyVec<K>);
#[derive(Clone, Default)]
struct RecordingResolver {
fetches: Arc<Mutex<Vec<CodingFetchRecord>>>,
targeted: Arc<Mutex<Vec<CodingTargetedFetch>>>,
auto_delivery: Arc<Mutex<Option<Bytes>>>,
delivery_responses: Arc<Mutex<Vec<oneshot::Receiver<bool>>>>,
sender: Option<mailbox::Sender<handler::Message<Commitment>>>,
}
impl RecordingResolver {
fn holding(metrics: impl Metrics) -> (handler::Receiver<Commitment>, Self) {
let (sender, receiver) = mailbox::new(metrics, NZUsize!(100));
(
handler::Receiver::new(receiver),
Self {
fetches: Arc::new(Mutex::new(Vec::new())),
targeted: Arc::new(Mutex::new(Vec::new())),
auto_delivery: Arc::new(Mutex::new(None)),
delivery_responses: Arc::new(Mutex::new(Vec::new())),
sender: Some(sender),
},
)
}
fn record_fetch(&self, fetch: CodingFetchRecord) {
self.fetches.lock().push(fetch.clone());
let Some(value) = self.auto_delivery.lock().take() else {
return;
};
let Some(sender) = &self.sender else {
return;
};
let (response, response_rx) = oneshot::channel();
self.delivery_responses.lock().push(response_rx);
let _ = sender.enqueue(handler::Message::Deliver {
delivery: Delivery {
key: fetch.key,
subscribers: NonEmptyVec::new(fetch.subscriber),
},
value,
response,
});
}
fn respond_to_next_fetch(&self, value: Bytes) {
let replaced = self.auto_delivery.lock().replace(value);
assert!(
replaced.is_none(),
"recording resolver already has an automatic delivery"
);
}
async fn wait_for_delivery_response(&self) -> bool {
let response = self
.delivery_responses
.lock()
.pop()
.expect("delivery response missing");
response.await.expect("delivery response sender dropped")
}
fn fetches(&self) -> Vec<CodingFetchRecord> {
self.fetches.lock().clone()
}
fn targeted(&self) -> Vec<CodingTargetedFetch> {
self.targeted.lock().clone()
}
}
impl Resolver for RecordingResolver {
type Key = handler::Key<Commitment>;
type Subscriber = handler::Annotation;
fn fetch<F>(&mut self, fetch: F) -> Feedback
where
F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
{
self.record_fetch(fetch.into());
Feedback::Ok
}
fn fetch_all<F>(&mut self, fetches: Vec<F>) -> Feedback
where
F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
{
for fetch in fetches {
self.record_fetch(fetch.into());
}
Feedback::Ok
}
fn retain(
&mut self,
_predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
) -> Feedback {
Feedback::Ok
}
}
impl TargetedResolver for RecordingResolver {
type PublicKey = K;
fn fetch_targeted(
&mut self,
fetch: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
targets: NonEmptyVec<Self::PublicKey>,
) -> Feedback {
self.targeted.lock().push((fetch.into().key, targets));
Feedback::Ok
}
fn fetch_all_targeted<F>(
&mut self,
fetches: Vec<(F, NonEmptyVec<Self::PublicKey>)>,
) -> Feedback
where
F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
{
let mut targeted = self.targeted.lock();
for (fetch, targets) in fetches {
targeted.push((fetch.into().key, targets));
}
Feedback::Ok
}
}
async fn start_coding_actor_with_recording(
context: deterministic::Context,
partition_prefix: &str,
provider: ConstantProvider<S, Epoch>,
buffer: RecordingCodingBuffer,
) -> (
core::Mailbox<S, TestCodingVariant>,
RecordingResolver,
commonware_runtime::Handle<()>,
) {
let config = Config {
provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
start: Start::Genesis(CodingHarness::genesis_block(NUM_VALIDATORS as u16)),
mailbox_size: NZUsize!(100),
view_retention_timeout: ViewDelta::new(10),
max_repair: NZUsize!(10),
max_pending_acks: NZUsize!(1),
block_codec_config: (),
partition_prefix: partition_prefix.to_string(),
prunable_items_per_section: NZU64!(10),
replay_buffer: NZUsize!(1024),
key_write_buffer: NZUsize!(1024),
value_write_buffer: NZUsize!(1024),
page_cache: CacheRef::from_pooler(
&context,
harness::PAGE_SIZE,
harness::PAGE_CACHE_SIZE,
),
strategy: Sequential,
};
let finalizations_by_height = immutable::Archive::init(
context.child("finalizations_by_height"),
immutable::Config {
metadata_partition: format!("{partition_prefix}-finalizations-by-height-metadata"),
freezer_table_partition: format!(
"{partition_prefix}-finalizations-by-height-freezer-table"
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!(
"{partition_prefix}-finalizations-by-height-freezer-key"
),
freezer_key_page_cache: config.page_cache.clone(),
freezer_value_partition: format!(
"{partition_prefix}-finalizations-by-height-freezer-value"
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!("{partition_prefix}-finalizations-by-height-ordinal"),
items_per_section: NZU64!(10),
codec_config: S::certificate_codec_config_unbounded(),
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalizations by height archive");
let finalized_blocks = immutable::Archive::init(
context.child("finalized_blocks"),
immutable::Config {
metadata_partition: format!("{partition_prefix}-finalized_blocks-metadata"),
freezer_table_partition: format!(
"{partition_prefix}-finalized_blocks-freezer-table"
),
freezer_table_initial_size: 64,
freezer_table_resize_frequency: 10,
freezer_table_resize_chunk_size: 10,
freezer_key_partition: format!("{partition_prefix}-finalized_blocks-freezer-key"),
freezer_key_page_cache: config.page_cache.clone(),
freezer_value_partition: format!(
"{partition_prefix}-finalized_blocks-freezer-value"
),
freezer_value_target_size: 1024,
freezer_value_compression: None,
ordinal_partition: format!("{partition_prefix}-finalized_blocks-ordinal"),
items_per_section: NZU64!(10),
codec_config: config.block_codec_config,
replay_buffer: config.replay_buffer,
freezer_key_write_buffer: config.key_write_buffer,
freezer_value_write_buffer: config.value_write_buffer,
ordinal_write_buffer: config.key_write_buffer,
},
)
.await
.expect("failed to initialize finalized blocks archive");
let (actor, mailbox, _) = core::Actor::init(
context.child("actor"),
finalizations_by_height,
finalized_blocks,
config,
)
.await;
let (resolver_rx, resolver) = RecordingResolver::holding(context.child("resolver"));
let actor_handle = actor.start(
Application::<CodingB>::default(),
buffer,
(resolver_rx, resolver.clone()),
);
(mailbox, resolver, actor_handle)
}
async fn start_shard_mailbox(
context: deterministic::Context,
participants: Vec<K>,
provider: ConstantProvider<S, Epoch>,
) -> shards::Mailbox<CodingB, ReedSolomon<Sha256>, Sha256, K> {
let me = participants[0].clone();
let oracle =
setup_network_with_participants(context.child("network"), NZUsize!(1), participants)
.await;
let control = oracle.control(me.clone());
let shard_config: shards::Config<_, _, _, _, _, Sha256, _, _> = shards::Config {
scheme_provider: provider,
blocker: control.clone(),
shard_codec_cfg: CodecConfig {
maximum_shard_size: 1024 * 1024,
},
block_codec_cfg: (),
strategy: Sequential,
mailbox_size: NZUsize!(10),
peer_buffer_size: NZUsize!(64),
background_channel_capacity: NZUsize!(1024),
peer_provider: oracle.manager(),
};
let (shard_engine, shard_mailbox) =
shards::Engine::new(context.child("shards"), shard_config);
let network = control.register(0, TEST_QUOTA).await.unwrap();
shard_engine.start(network);
shard_mailbox
}
fn genesis_block() -> CodingB {
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0)
}
fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
Commitment::from((
block.digest(),
block.digest(),
hash_context::<H, _>(&block.context()),
GENESIS_CODING_CONFIG,
))
}
fn missing_candidate(me: K) -> (CodingCtx, TestCodedBlock) {
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let genesis = genesis_block();
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
let round = Round::new(Epoch::zero(), View::new(1));
let candidate_ctx = CodingCtx {
round,
leader: me,
parent: (View::zero(), genesis_parent_commitment),
};
let candidate =
make_coding_block(candidate_ctx.clone(), genesis.digest(), Height::new(1), 100);
let coded_candidate: TestCodedBlock =
CodedBlock::new(candidate, coding_config, &Sequential);
(candidate_ctx, coded_candidate)
}
#[test_traced("WARN")]
fn test_coding_block_provider_parent_fetches_by_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let buffer = RecordingCodingBuffer::default();
let (marshal, _resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-provider-parent-commitment",
provider,
buffer.clone(),
)
.await;
let (parent_ctx, parent) = missing_candidate(participants[0].clone());
let child_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(2)),
leader: participants[0].clone(),
parent: (parent_ctx.round.view(), parent.commitment()),
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let subscription = marshal.subscribe_parent(&child);
context.sleep(Duration::from_millis(100)).await;
assert_eq!(
buffer.commitment_subscription_count(),
1,
"parent walkback should use the coding parent commitment"
);
drop(subscription);
});
}
#[test_traced("WARN")]
fn test_coding_verify_missing_candidate_waits_without_fetching() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let me = participants[0].clone();
let buffer = RecordingCodingBuffer::default();
let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-verify-missing-candidate",
provider.clone(),
buffer.clone(),
)
.await;
let shards =
start_shard_mailbox(context.child("shard_stack"), participants, provider.clone())
.await;
let (candidate_ctx, candidate) = missing_candidate(me);
let commitment = candidate.commitment();
let cfg = MarshaledConfig {
application: MockVerifyingApp::<CodingB, S>::new(),
marshal,
shards,
scheme_provider: provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let verify_rx = marshaled.verify(candidate_ctx, commitment).await;
context.sleep(Duration::from_millis(100)).await;
assert!(
buffer.subscription_count() > 0,
"missing candidate should register a local buffer wait"
);
assert!(
resolver.fetches().is_empty(),
"missing candidate verify must not fetch from peers"
);
assert!(
resolver.targeted().is_empty(),
"missing candidate verify must not issue targeted fetches"
);
drop(verify_rx);
});
}
#[test_traced("WARN")]
fn test_coding_certify_missing_candidate_fetches_by_round() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let me = participants[0].clone();
let buffer = RecordingCodingBuffer::default();
let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-certify-missing-candidate",
provider.clone(),
buffer.clone(),
)
.await;
let shards =
start_shard_mailbox(context.child("shard_stack"), participants, provider.clone())
.await;
let cfg = MarshaledConfig {
application: MockVerifyingApp::<CodingB, S>::new(),
marshal,
shards,
scheme_provider: provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let (candidate_ctx, candidate) = missing_candidate(me);
let commitment = candidate.commitment();
let round = candidate_ctx.round;
let proposal = Proposal::new(round, View::zero(), commitment);
let notarization = CodingHarness::make_notarization(proposal, &schemes, QUORUM);
resolver.respond_to_next_fetch((notarization, candidate).encode());
let certify_rx = marshaled.certify(round, commitment).await;
let result = certify_rx.await.expect("certify result missing");
assert!(result, "fetched notarized candidate should certify");
assert!(
resolver.wait_for_delivery_response().await,
"notarized delivery should validate"
);
assert!(
resolver.fetches().iter().any(|fetch| matches!(
(&fetch.key, &fetch.subscriber),
(
handler::Key::Notarized { round: request_round },
handler::Annotation::Notarization { round: subscriber_round },
) if *request_round == round && *subscriber_round == round
)),
"certify should fetch notarized block by round"
);
assert!(
buffer.subscription_count() > 0,
"missing candidate should register a local buffer wait"
);
assert!(
resolver.targeted().is_empty(),
"missing candidate certify must not issue targeted fetches"
);
});
}
#[test_traced("WARN")]
fn test_coding_certify_pending_verify_fetches_by_round() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let provider = ConstantProvider::new(schemes[0].clone());
let me = participants[0].clone();
let buffer = RecordingCodingBuffer::default();
let (marshal, resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("actor_stack"),
"coding-certify-pending-verify",
provider.clone(),
buffer,
)
.await;
let shards =
start_shard_mailbox(context.child("shard_stack"), participants, provider.clone())
.await;
let cfg = MarshaledConfig {
application: MockVerifyingApp::<CodingB, S>::new(),
marshal,
shards,
scheme_provider: provider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let (candidate_ctx, candidate) = missing_candidate(me);
let commitment = candidate.commitment();
let round = candidate_ctx.round;
let _verify_rx = marshaled.verify(candidate_ctx, commitment).await;
let proposal = Proposal::new(round, View::zero(), commitment);
let notarization = CodingHarness::make_notarization(proposal, &schemes, QUORUM);
resolver.respond_to_next_fetch((notarization, candidate).encode());
let certify_rx = marshaled.certify(round, commitment).await;
let result = certify_rx.await.expect("certify result missing");
assert!(
result,
"pending verify should complete after certification recovery"
);
assert!(
resolver.wait_for_delivery_response().await,
"notarized delivery should validate"
);
assert!(
resolver.fetches().iter().any(|fetch| matches!(
(&fetch.key, &fetch.subscriber),
(
handler::Key::Notarized { round: request_round },
handler::Annotation::Notarization { round: subscriber_round },
) if *request_round == round && *subscriber_round == round
)),
"certify should recover a pending verify by notarized round"
);
assert!(
resolver.targeted().is_empty(),
"certify recovery must not issue targeted fetches"
);
});
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_good_links() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, LINK, false);
let r2 = harness::finalize::<CodingHarness>(seed, LINK, false);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_bad_links() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_good_links_quorum_sees_finalization() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, LINK, true);
let r2 = harness::finalize::<CodingHarness>(seed, LINK, true);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_bad_links_quorum_sees_finalization() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_hailstorm_restarts() {
for seed in 0..2 {
let r1 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 1, LINK);
let r2 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 1, LINK);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_hailstorm_multi_restarts() {
for seed in 0..2 {
let r1 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 2, LINK);
let r2 = harness::hailstorm::<CodingHarness>(seed, 4, 4, 2, LINK);
assert_eq!(r1, r2);
}
}
#[test_traced("WARN")]
fn test_coding_ack_pipeline_backlog() {
harness::ack_pipeline_backlog::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_ack_pipeline_backlog_persists_on_restart() {
harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_genesis_emitted_once() {
harness::genesis_emitted_once::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_proposed_success_implies_recoverable_after_restart() {
harness::proposed_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
}
#[test_traced("WARN")]
fn test_coding_verified_success_implies_recoverable_after_restart() {
harness::verified_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
}
#[test_traced("WARN")]
fn test_coding_certified_success_implies_recoverable_after_restart() {
harness::certified_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
}
#[test_traced("WARN")]
fn test_coding_delivery_visibility_implies_recoverable_after_restart() {
harness::delivery_visibility_implies_recoverable_after_restart::<CodingHarness>(0..16);
}
#[test_traced("WARN")]
fn test_coding_sync_height_floor() {
harness::sync_height_floor::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_prune_finalized_archives() {
harness::prune_finalized_archives::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_rejects_block_delivery_below_floor() {
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_commitment_fetch_height_hint_mismatch_wakes_subscriber() {
harness::commitment_fetch_height_hint_mismatch_wakes_subscriber::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_basic_block_delivery() {
harness::subscribe_basic_block_delivery::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_multiple_subscriptions() {
harness::subscribe_multiple_subscriptions::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_canceled_subscriptions() {
harness::subscribe_canceled_subscriptions::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_blocks_from_different_sources() {
harness::subscribe_blocks_from_different_sources::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_info_basic_queries_present_and_missing() {
harness::get_info_basic_queries_present_and_missing::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_info_latest_progression_multiple_finalizations() {
harness::get_info_latest_progression_multiple_finalizations::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_block_by_height_and_latest() {
harness::get_block_by_height_and_latest::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_block_by_commitment_from_sources_and_missing() {
harness::get_block_by_commitment_from_sources_and_missing::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_finalization_by_height() {
harness::get_finalization_by_height::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_hint_finalized_triggers_fetch() {
harness::hint_finalized_triggers_fetch::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_ancestry_stream() {
harness::ancestry_stream::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_finalize_same_height_different_views() {
harness::finalize_same_height_different_views::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_certify_persists_equivocated_block() {
harness::certify_persists_equivocated_block::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_certify_at_later_view_survives_earlier_view_pruning() {
harness::certify_at_later_view_survives_earlier_view_pruning::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_certify_first_block_fetches_genesis_parent() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
let round = Round::new(Epoch::zero(), View::new(1));
let block_ctx = CodingCtx {
round,
leader: me.clone(),
parent: (View::zero(), genesis_parent_commitment),
};
let block = make_coding_block(block_ctx.clone(), genesis.digest(), Height::new(1), 100);
let coded_block = CodedBlock::new(block, coding_config, &Sequential);
let commitment = coded_block.commitment();
shards.proposed(round, coded_block);
context.sleep(Duration::from_millis(10)).await;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal,
shards,
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let shard_validity = marshaled
.verify(block_ctx, commitment)
.await
.await
.expect("verify result missing");
assert!(shard_validity, "shard validity should pass");
let certify_result = marshaled
.certify(round, commitment)
.await
.await
.expect("certify result missing");
assert!(
certify_result,
"height-1 block should certify with genesis as parent"
);
});
}
#[test_traced("WARN")]
fn test_coding_store_finalization_does_not_prune_buffer_before_repair() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = harness::ValidatorHandle::<CodingHarness> {
mailbox: setup.mailbox,
extra: setup.extra,
};
let parent_block = CodingHarness::make_test_block(
Sha256::hash(b""),
CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16),
Height::new(1),
1,
NUM_VALIDATORS as u16,
);
let parent_digest = CodingHarness::digest(&parent_block);
let parent_commitment = CodingHarness::commitment(&parent_block);
let descendant_block = CodingHarness::make_test_block(
parent_digest,
parent_commitment,
Height::new(2),
2,
NUM_VALIDATORS as u16,
);
let descendant_commitment = CodingHarness::commitment(&descendant_block);
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(1)),
&parent_block,
)
.await;
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(2)),
&descendant_block,
)
.await;
let descendant_proposal = Proposal {
round: Round::new(Epoch::new(0), View::new(2)),
parent: View::new(1),
payload: descendant_commitment,
};
let descendant_finalization =
CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM);
CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await;
while handle.mailbox.get_block(Height::new(2)).await.is_none() {
context.sleep(Duration::from_millis(10)).await;
}
let parent = handle.mailbox.get_block(Height::new(1)).await;
assert!(
parent.is_some(),
"parent must be archived from shard buffer before height-prune evicts it"
);
});
}
#[test_traced("WARN")]
fn test_coding_init_processed_height() {
harness::init_processed_height::<CodingHarness>();
}
#[test_traced("INFO")]
fn test_coding_broadcast_caches_block() {
harness::broadcast_caches_block::<CodingHarness>();
}
#[test_traced("INFO")]
fn test_certify_lower_view_after_higher_view() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let parent_ctx = CodingCtx {
round: Round::new(Epoch::new(0), View::new(1)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(Round::new(Epoch::new(0), View::new(1)), coded_parent);
let round_a = Round::new(Epoch::new(0), View::new(5));
let context_a = CodingCtx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_a = make_coding_block(context_a.clone(), parent_digest, Height::new(2), 200);
let coded_block_a = CodedBlock::new(block_a.clone(), coding_config, &Sequential);
let commitment_a = coded_block_a.commitment();
shards.proposed(round_a, coded_block_a);
let round_b = Round::new(Epoch::new(0), View::new(10));
let context_b = CodingCtx {
round: round_b,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_b = make_coding_block(context_b.clone(), parent_digest, Height::new(2), 300);
let coded_block_b = CodedBlock::new(block_b.clone(), coding_config, &Sequential);
let commitment_b = coded_block_b.commitment();
shards.proposed(round_b, coded_block_b);
context.sleep(Duration::from_millis(10)).await;
let _ = marshaled.verify(context_a, commitment_a).await.await;
let _ = marshaled.verify(context_b, commitment_b).await.await;
let certify_b = marshaled.certify(round_b, commitment_b).await;
assert!(
certify_b.await.unwrap(),
"Block B certification should succeed"
);
let certify_a = marshaled.certify(round_a, commitment_a).await;
select! {
result = certify_a => {
assert!(result.unwrap(), "Block A certification should succeed");
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("Block A certification timed out");
},
}
})
}
#[test_traced("INFO")]
fn test_marshaled_reproposal_validation() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mut parent = genesis.digest();
let mut last_view = View::zero();
let mut last_commitment = genesis_commitment();
for i in 1..BLOCKS_PER_EPOCH.get() {
let round = Round::new(Epoch::new(0), View::new(i));
let ctx = CodingCtx {
round,
leader: me.clone(),
parent: (last_view, last_commitment),
};
let block = make_coding_block(ctx.clone(), parent, Height::new(i), i * 100);
let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
last_commitment = coded_block.commitment();
shards.proposed(round, coded_block);
parent = block.digest();
last_view = View::new(i);
}
let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
let boundary_round = Round::new(Epoch::new(0), View::new(boundary_height.get()));
let boundary_context = CodingCtx {
round: boundary_round,
leader: me.clone(),
parent: (last_view, last_commitment),
};
let boundary_block = make_coding_block(
boundary_context.clone(),
parent,
boundary_height,
boundary_height.get() * 100,
);
let coded_boundary =
CodedBlock::new(boundary_block.clone(), coding_config, &Sequential);
let boundary_commitment = coded_boundary.commitment();
shards.proposed(boundary_round, coded_boundary);
context.sleep(Duration::from_millis(10)).await;
let reproposal_round = Round::new(Epoch::new(0), View::new(20));
let reproposal_context = CodingCtx {
round: reproposal_round,
leader: me.clone(),
parent: (View::new(boundary_height.get()), boundary_commitment), };
let shard_validity = marshaled
.verify(reproposal_context.clone(), boundary_commitment)
.await
.await;
assert!(
shard_validity.unwrap(),
"Re-proposal verify should return true for shard validity"
);
let certify_result = marshaled
.certify(reproposal_round, boundary_commitment)
.await
.await;
assert!(
certify_result.unwrap(),
"Valid re-proposal at epoch boundary should be accepted"
);
let non_boundary_height = Height::new(10);
let non_boundary_round = Round::new(Epoch::new(0), View::new(10));
let non_boundary_context = CodingCtx {
round: non_boundary_round,
leader: me.clone(),
parent: (View::new(9), last_commitment), };
let non_boundary_block = make_coding_block(
non_boundary_context.clone(),
parent,
non_boundary_height,
1000,
);
let coded_non_boundary =
CodedBlock::new(non_boundary_block.clone(), coding_config, &Sequential);
let non_boundary_commitment = coded_non_boundary.commitment();
shards.proposed(non_boundary_round, coded_non_boundary);
context.sleep(Duration::from_millis(10)).await;
let invalid_reproposal_round = Round::new(Epoch::new(0), View::new(15));
let invalid_reproposal_context = CodingCtx {
round: invalid_reproposal_round,
leader: me.clone(),
parent: (View::new(10), non_boundary_commitment),
};
let shard_validity = marshaled
.verify(invalid_reproposal_context, non_boundary_commitment)
.await
.await;
assert!(
!shard_validity.unwrap(),
"Invalid re-proposal verify should return false"
);
let certify_result = marshaled
.certify(invalid_reproposal_round, non_boundary_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Invalid re-proposal (not at epoch boundary) should be rejected"
);
let cross_epoch_reproposal_round = Round::new(Epoch::new(1), View::new(20));
let cross_epoch_reproposal_context = CodingCtx {
round: cross_epoch_reproposal_round,
leader: me.clone(),
parent: (View::new(boundary_height.get()), boundary_commitment),
};
let shard_validity = marshaled
.verify(cross_epoch_reproposal_context.clone(), boundary_commitment)
.await
.await;
assert!(
!shard_validity.unwrap(),
"Cross-epoch re-proposal verify should return false"
);
let certify_result = marshaled
.certify(cross_epoch_reproposal_round, boundary_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Re-proposal with mismatched epoch should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_mismatched_context_digest() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let parent_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(1)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(Round::new(Epoch::zero(), View::new(1)), coded_parent);
let round_a = Round::new(Epoch::zero(), View::new(2));
let context_a = CodingCtx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_a = make_coding_block(context_a, parent.digest(), Height::new(2), 200);
let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block_a, coding_config, &Sequential);
let commitment_a = coded_block_a.commitment();
let round_b = Round::new(Epoch::zero(), View::new(3));
let context_b = CodingCtx {
round: round_b,
leader: participants[1].clone(),
parent: (View::new(1), parent_commitment),
};
let verify_rx = marshaled.verify(context_b, commitment_a).await;
select! {
result = verify_rx => {
assert!(
!result.unwrap(),
"mismatched context digest should be rejected"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should reject mismatched context digest promptly");
},
}
})
}
#[test_traced("WARN")]
fn test_reproposal_certify_recovers_after_verify_receiver_drop() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
let boundary_round = Round::new(Epoch::zero(), View::new(boundary_height.get()));
let boundary_context = CodingCtx {
round: boundary_round,
leader: me.clone(),
parent: (View::zero(), genesis_commitment()),
};
let boundary_block = make_coding_block(
boundary_context,
genesis.digest(),
boundary_height,
boundary_height.get() * 100,
);
let coded_boundary = CodedBlock::new(boundary_block, coding_config, &Sequential);
let boundary_commitment = coded_boundary.commitment();
let reproposal_round = Round::new(Epoch::zero(), View::new(boundary_height.get() + 1));
let reproposal_context = CodingCtx {
round: reproposal_round,
leader: me,
parent: (View::new(boundary_height.get()), boundary_commitment),
};
let verify_rx = marshaled
.verify(reproposal_context, boundary_commitment)
.await;
drop(verify_rx);
context.sleep(Duration::from_millis(10)).await;
shards.proposed(boundary_round, coded_boundary);
context.sleep(Duration::from_millis(10)).await;
let certify_rx = marshaled
.certify(reproposal_round, boundary_commitment)
.await;
select! {
result = certify_rx => {
assert!(
result.expect("certify result missing"),
"certify should recover after verify receiver drop"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should recover after verify receiver drop");
},
}
})
}
#[test_traced("WARN")]
fn test_reproposal_missing_block_does_not_synthesize_false() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let missing_payload = Commitment::from((
Sha256::hash(b"missing_block"),
Sha256::hash(b"missing_root"),
Sha256::hash(b"missing_context"),
coding_config,
));
let round = Round::new(Epoch::zero(), View::new(1));
let reproposal_context = CodingCtx {
round,
leader: me,
parent: (View::zero(), missing_payload),
};
let verify_rx = marshaled.verify(reproposal_context, missing_payload).await;
context.sleep(Duration::from_millis(100)).await;
shards.prune(missing_payload);
select! {
result = verify_rx => {
assert!(
result.is_err(),
"verify should resolve without explicit false when re-proposal block is unavailable"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should resolve promptly when re-proposal block is unavailable");
},
}
let mut certify_rx = marshaled.certify(round, missing_payload).await;
context.sleep(Duration::from_millis(100)).await;
assert!(
matches!(
certify_rx.try_recv(),
Err(commonware_utils::channel::oneshot::error::TryRecvError::Empty)
),
"certify should remain pending without explicit false or stale cancellation"
);
drop(certify_rx);
})
}
#[test_traced("WARN")]
fn test_core_subscription_closes_when_coding_buffer_prunes_missing_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let missing_commitment = Commitment::from((
Sha256::hash(b"missing_block"),
Sha256::hash(b"missing_root"),
Sha256::hash(b"missing_context"),
coding_config,
));
let round = Round::new(Epoch::zero(), View::new(1));
let block_rx = marshal.subscribe_by_commitment(
missing_commitment,
core::CommitmentFallback::FetchByRound { round },
);
context.sleep(Duration::from_millis(100)).await;
shards.prune(missing_commitment);
select! {
result = block_rx => {
assert!(
result.is_err(),
"core subscription should close when coding buffer drops subscription"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("core subscription should resolve promptly after coding prune");
},
}
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_unsupported_epoch() {
#[derive(Clone)]
struct LimitedEpocher {
inner: FixedEpocher,
max_epoch: u64,
}
impl Epocher for LimitedEpocher {
fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
let bounds = self.inner.containing(height)?;
if bounds.epoch().get() > self.max_epoch {
None
} else {
Some(bounds)
}
}
fn first(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.first(epoch)
}
}
fn last(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.last(epoch)
}
}
}
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let limited_epocher = LimitedEpocher {
inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
max_epoch: 0,
};
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: limited_epocher,
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let parent_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(19)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(19), 1000);
let parent_digest = parent.digest();
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(Round::new(Epoch::zero(), View::new(19)), coded_parent);
let block_ctx = CodingCtx {
round: Round::new(Epoch::new(1), View::new(20)),
leader: default_leader(),
parent: (View::new(19), parent_commitment),
};
let block = make_coding_block(block_ctx, parent_digest, Height::new(20), 2000);
let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
let block_commitment = coded_block.commitment();
shards.proposed(Round::new(Epoch::new(1), View::new(20)), coded_block);
context.sleep(Duration::from_millis(10)).await;
let unsupported_round = Round::new(Epoch::new(1), View::new(20));
let unsupported_context = CodingCtx {
round: unsupported_round,
leader: me.clone(),
parent: (View::new(19), parent_commitment),
};
let _shard_validity = marshaled
.verify(unsupported_context, block_commitment)
.await;
let certify_result = marshaled
.certify(unsupported_round, block_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Block in unsupported epoch should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_invalid_ancestry() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let honest_parent_ctx = CodingCtx {
round: Round::new(Epoch::new(1), View::new(21)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let honest_parent = make_coding_block(
honest_parent_ctx,
genesis.digest(),
Height::new(BLOCKS_PER_EPOCH.get() + 1),
1000,
);
let parent_digest = honest_parent.digest();
let coded_parent = CodedBlock::new(honest_parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(Round::new(Epoch::new(1), View::new(21)), coded_parent);
let byzantine_round = Round::new(Epoch::new(1), View::new(35));
let byzantine_context = CodingCtx {
round: byzantine_round,
leader: me.clone(),
parent: (View::new(21), parent_commitment), };
let malicious_block = make_coding_block(
byzantine_context.clone(),
parent_digest,
Height::new(BLOCKS_PER_EPOCH.get() + 15), 2000,
);
let coded_malicious =
CodedBlock::new(malicious_block.clone(), coding_config, &Sequential);
let malicious_commitment = coded_malicious.commitment();
shards.proposed(byzantine_round, coded_malicious);
context.sleep(Duration::from_millis(10)).await;
let _shard_validity = marshaled
.verify(byzantine_context, malicious_commitment)
.await;
let certify_result = marshaled
.certify(byzantine_round, malicious_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Byzantine block with non-contiguous heights should be rejected"
);
let byzantine_round2 = Round::new(Epoch::new(1), View::new(22));
let byzantine_context2 = CodingCtx {
round: byzantine_round2,
leader: me.clone(),
parent: (View::new(21), parent_commitment), };
let malicious_block2 = make_coding_block(
byzantine_context2.clone(),
genesis.digest(), Height::new(BLOCKS_PER_EPOCH.get() + 2),
3000,
);
let coded_malicious2 =
CodedBlock::new(malicious_block2.clone(), coding_config, &Sequential);
let malicious_commitment2 = coded_malicious2.commitment();
shards.proposed(byzantine_round2, coded_malicious2);
context.sleep(Duration::from_millis(10)).await;
let _shard_validity = marshaled
.verify(byzantine_context2, malicious_commitment2)
.await;
let certify_result = marshaled
.certify(byzantine_round2, malicious_commitment2)
.await
.await;
assert!(
!certify_result.unwrap(),
"Byzantine block with mismatched parent commitment should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_certify_without_prior_verify_crash_recovery() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = CodingCtx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(parent_round, coded_parent);
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = CodingCtx {
round: child_round,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let coded_child = CodedBlock::new(child, coding_config, &Sequential);
let child_commitment = coded_child.commitment();
shards.proposed(child_round, coded_child);
context.sleep(Duration::from_millis(10)).await;
let certify_rx = marshaled.certify(child_round, child_commitment).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify without prior verify should succeed for valid block"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should complete within timeout");
},
}
})
}
#[test_traced("WARN")]
fn test_malformed_commitment_config_rejected_at_deserialization() {
use commonware_codec::{Encode, ReadExt};
let malformed_bytes = [0u8; Commitment::SIZE];
let result = Commitment::read(&mut &malformed_bytes[..]);
assert!(
result.is_err(),
"deserialization of Commitment with zeroed CodingConfig must fail"
);
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let valid = Commitment::from((
Sha256::hash(b"block"),
Sha256::hash(b"root"),
Sha256::hash(b"context"),
coding_config,
));
let encoded = valid.encode();
let decoded =
Commitment::read(&mut &encoded[..]).expect("valid Commitment must deserialize");
assert_eq!(valid, decoded);
}
#[test_traced("WARN")]
fn test_certify_propagates_application_verify_failure() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> =
MockVerifyingApp::with_verify_result(false);
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_context = CodingCtx {
round: parent_round,
leader: me.clone(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_context, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(parent_round, coded_parent);
let round = Round::new(Epoch::zero(), View::new(2));
let verify_context = CodingCtx {
round,
leader: me,
parent: (View::new(1), parent_commitment),
};
let block =
make_coding_block(verify_context.clone(), parent.digest(), Height::new(2), 200);
let coded_block = CodedBlock::new(block, coding_config, &Sequential);
let commitment = coded_block.commitment();
shards.proposed(round, coded_block);
context.sleep(Duration::from_millis(10)).await;
let optimistic = marshaled.verify(verify_context, commitment).await;
assert!(
optimistic.await.expect("verify result missing"),
"optimistic verify should pass pre-checks and schedule deferred verification"
);
let certify = marshaled.certify(round, commitment).await;
assert!(
!certify.await.expect("certify result missing"),
"certify should propagate deferred application verify failure"
);
})
}
#[test_traced("WARN")]
fn test_backfill_block_mismatched_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants[..2].iter().cloned(),
)
.await;
let coding_config_a = coding_config_for_participants(NUM_VALIDATORS as u16);
let coding_config_b = commonware_coding::Config {
minimum_shards: coding_config_a.minimum_shards.checked_add(1).unwrap(),
extra_shards: NZU16!(coding_config_a.extra_shards.get() - 1),
};
let v0_setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let v1_setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 1),
&mut oracle,
participants[1].clone(),
ConstantProvider::new(schemes[1].clone()),
)
.await;
setup_network_links(&mut oracle, &participants[..2], LINK).await;
let mut v0_mailbox = v0_setup.mailbox;
let v1_mailbox = v1_setup.mailbox;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let round1 = Round::new(Epoch::zero(), View::new(1));
let block1_ctx = CodingCtx {
round: round1,
leader: participants[0].clone(),
parent: (View::zero(), genesis_commitment()),
};
let block1 = make_coding_block(block1_ctx, genesis.digest(), Height::new(1), 100);
let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block1.clone(), coding_config_a, &Sequential);
let commitment_a = coded_block_a.commitment();
let coded_block_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block1.clone(), coding_config_b, &Sequential);
let commitment_b = coded_block_b.commitment();
assert_eq!(coded_block_a.digest(), coded_block_b.digest());
assert_ne!(commitment_a, commitment_b);
assert!(v1_mailbox.verified(round1, coded_block_b.clone()).await);
context.sleep(Duration::from_millis(100)).await;
let proposal: Proposal<Commitment> = Proposal {
round: round1,
parent: View::zero(),
payload: commitment_a,
};
let finalization = CodingHarness::make_finalization(proposal.clone(), &schemes, QUORUM);
CodingHarness::report_finalization(&mut v0_mailbox, finalization).await;
context.sleep(Duration::from_secs(5)).await;
let stored = v0_mailbox.get_block(Height::new(1)).await;
assert!(
stored.is_none(),
"v0 should reject backfilled block with mismatched commitment"
);
let stored_finalization = v0_mailbox.get_finalization(Height::new(1)).await;
assert!(
stored_finalization.is_none(),
"finalization should not be archived until matching block is available"
);
})
}
#[test_traced("WARN")]
#[should_panic(expected = "floor block parent commitment mismatch")]
fn test_coding_floor_anchor_panics_on_parent_commitment_mismatch() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let (mailbox, resolver, _actor_handle) = start_coding_actor_with_recording(
context.child("validator"),
"floor-parent-commitment-mismatch",
ConstantProvider::new(schemes[0].clone()),
RecordingCodingBuffer::default(),
)
.await;
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_context = CodingCtx {
round: parent_round,
leader: participants[0].clone(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_context, Sha256::hash(b""), Height::new(1), 100);
let floor_round = Round::new(Epoch::zero(), View::new(2));
let bad_context = CodingCtx {
round: floor_round,
leader: participants[0].clone(),
parent: (View::new(1), genesis_commitment()),
};
let floor_block = make_coding_block(bad_context, parent.digest(), Height::new(2), 200);
let coded_floor = CodedBlock::new(floor_block, coding_config, &Sequential);
assert_ne!(
coded_floor.parent(),
coded_floor.context().parent.1.block::<D>()
);
let finalization = CodingHarness::make_finalization(
Proposal::new(
floor_round,
View::new(1),
CodingHarness::commitment(&coded_floor),
),
&schemes,
QUORUM,
);
resolver.respond_to_next_fetch(coded_floor.encode());
mailbox.set_floor(finalization);
context.sleep(Duration::from_secs(5)).await;
})
}
#[test_traced("WARN")]
fn test_marshaled_missing_scheme_skips_propose_and_verify() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: setup.mailbox,
shards: setup.extra,
scheme_provider: EmptyProvider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(1)),
leader: me.clone(),
parent: (View::zero(), genesis_commitment()),
};
let rx = marshaled.propose(ctx.clone()).await;
assert!(rx.await.is_err());
let rx = marshaled.verify(ctx, genesis_commitment()).await;
assert!(rx.await.is_err());
});
}
#[test_traced("WARN")]
fn test_marshaled_certify_persists_block_before_resolving() {
for seed in 0u64..16 {
certify_persists_block_before_resolving_at(seed);
}
}
fn certify_persists_block_before_resolving_at(seed: u64) {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(60))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let marshal_actor_handle = setup.actor_handle;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = CodingCtx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.proposed(parent_round, coded_parent);
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = CodingCtx {
round: child_round,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let child = make_coding_block(child_ctx.clone(), parent.digest(), Height::new(2), 200);
let coded_child = CodedBlock::new(child.clone(), coding_config, &Sequential);
let child_commitment = coded_child.commitment();
let child_digest = coded_child.digest();
shards.proposed(child_round, coded_child);
context.sleep(Duration::from_millis(10)).await;
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let shard_validity = marshaled
.verify(child_ctx, child_commitment)
.await
.await
.expect("verify result missing");
assert!(shard_validity, "shard validity should pass");
let certify_result = marshaled
.certify(child_round, child_commitment)
.await
.await
.expect("certify result missing");
assert!(certify_result, "certify should succeed");
marshal_actor_handle.abort();
drop(marshaled);
drop(marshal);
drop(shards);
let setup2 = CodingHarness::setup_validator(
context
.child("validator_restart")
.with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal2 = setup2.mailbox;
let post_restart = marshal2.get_block(&child_digest).await;
assert!(
post_restart.is_some(),
"certify resolved true ⟹ block must be durably persisted"
);
});
}
#[test_traced("WARN")]
fn test_marshaled_proposed_block_persists_across_restart() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let marshal_actor_handle = setup.actor_handle;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
let propose_round = Round::new(Epoch::zero(), View::new(1));
let propose_context = CodingCtx {
round: propose_round,
leader: me.clone(),
parent: (View::zero(), genesis_parent_commitment),
};
let block_to_propose = make_coding_block(
propose_context.clone(),
genesis.digest(),
Height::new(1),
100,
);
let block_digest = block_to_propose.digest();
let expected_commitment = CodedBlock::<_, ReedSolomon<Sha256>, Sha256>::new(
block_to_propose.clone(),
coding_config,
&Sequential,
)
.commitment();
let mock_app: MockVerifyingApp<CodingB, S> =
MockVerifyingApp::new().with_propose_result(block_to_propose);
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let commitment = marshaled
.propose(propose_context)
.await
.await
.expect("propose should produce a commitment");
assert_eq!(commitment, expected_commitment);
marshal_actor_handle.abort();
drop(marshaled);
drop(marshal);
drop(shards);
let setup2 = CodingHarness::setup_validator(
context
.child("validator_restart")
.with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal2 = setup2.mailbox;
let post_restart = marshal2.get_block(&block_digest).await;
assert!(
post_restart.is_some(),
"proposer should recover its own block after restart"
);
});
}
#[test_traced("WARN")]
fn test_propose_reuses_verified_block_on_restart() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
let round = Round::new(Epoch::zero(), View::new(1));
let ctx = CodingCtx {
round,
leader: me.clone(),
parent: (View::zero(), genesis_parent_commitment),
};
let block_a = make_coding_block(ctx.clone(), genesis.digest(), Height::new(1), 100);
let coded_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block_a.clone(), coding_config, &Sequential);
let commitment_a = coded_a.commitment();
assert!(marshal.verified(round, coded_a).await);
let block_b = make_coding_block(ctx.clone(), genesis.digest(), Height::new(1), 200);
let coded_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block_b.clone(), coding_config, &Sequential);
let commitment_b = coded_b.commitment();
assert_ne!(
commitment_a, commitment_b,
"test requires distinct commitments"
);
let mock_app: MockVerifyingApp<CodingB, S> =
MockVerifyingApp::new().with_propose_result(block_b);
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let commitment = marshaled
.propose(ctx)
.await
.await
.expect("propose must return a commitment");
assert_eq!(
commitment, commitment_a,
"propose must reuse the block marshal already persisted for this round"
);
});
}
#[test_traced("WARN")]
fn test_propose_skips_when_verified_block_context_changed() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
let round = Round::new(Epoch::zero(), View::new(2));
let stale_ctx = CodingCtx {
round,
leader: me.clone(),
parent: (View::zero(), genesis_parent_commitment),
};
let stale_block = make_coding_block(stale_ctx, genesis.digest(), Height::new(1), 100);
let stale_coded: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(stale_block, coding_config, &Sequential);
assert!(marshal.verified(round, stale_coded).await);
let new_parent_commitment = Commitment::from((
Sha256::hash(b"different-parent-block"),
Sha256::hash(b"different-parent-inner"),
Sha256::hash(b"different-parent-ctx"),
coding_config,
));
let new_ctx = CodingCtx {
round,
leader: me.clone(),
parent: (View::new(1), new_parent_commitment),
};
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new();
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.child("marshaled"), cfg);
let commitment_rx = marshaled.propose(new_ctx).await;
assert!(
commitment_rx.await.is_err(),
"propose must drop the receiver when the cached block's context no longer matches"
);
});
}
}