pub mod shards;
pub mod types;
pub(crate) mod validation;
mod variant;
pub use variant::Coding;
mod marshaled;
pub use marshaled::{Marshaled, MarshaledConfig};
#[cfg(test)]
mod tests {
use crate::{
marshal::{
coding::{
types::{coding_config_for_participants, CodedBlock},
Marshaled, MarshaledConfig,
},
mocks::{
harness::{
self, default_leader, genesis_commitment, make_coding_block,
setup_network_links, setup_network_with_participants, CodingB, CodingCtx,
CodingHarness, EmptyProvider, TestHarness, BLOCKS_PER_EPOCH, LINK, NAMESPACE,
NUM_VALIDATORS, QUORUM, S, UNRELIABLE_LINK, V,
},
verifying::MockVerifyingApp,
},
},
simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
types::{coding::Commitment, Epoch, Epocher, FixedEpocher, Height, Round, View},
Automaton, CertifiableAutomaton,
};
use commonware_codec::FixedSize;
use commonware_coding::ReedSolomon;
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider},
sha256::Sha256,
Committable, Digestible, Hasher as _,
};
use commonware_macros::{select, test_group, test_traced};
use commonware_parallel::Sequential;
use commonware_runtime::{deterministic, Clock, Metrics, Runner};
use commonware_utils::{NZUsize, NZU16};
use std::time::Duration;
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_good_links() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, LINK, false);
let r2 = harness::finalize::<CodingHarness>(seed, LINK, false);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_bad_links() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, false);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_good_links_quorum_sees_finalization() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, LINK, true);
let r2 = harness::finalize::<CodingHarness>(seed, LINK, true);
assert_eq!(r1, r2);
}
}
#[test_group("slow")]
#[test_traced("WARN")]
fn test_coding_finalize_bad_links_quorum_sees_finalization() {
for seed in 0..5 {
let r1 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
let r2 = harness::finalize::<CodingHarness>(seed, UNRELIABLE_LINK, true);
assert_eq!(r1, r2);
}
}
#[test_traced("WARN")]
fn test_coding_ack_pipeline_backlog() {
harness::ack_pipeline_backlog::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_ack_pipeline_backlog_persists_on_restart() {
harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_sync_height_floor() {
harness::sync_height_floor::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_prune_finalized_archives() {
harness::prune_finalized_archives::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_rejects_block_delivery_below_floor() {
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_basic_block_delivery() {
harness::subscribe_basic_block_delivery::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_multiple_subscriptions() {
harness::subscribe_multiple_subscriptions::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_canceled_subscriptions() {
harness::subscribe_canceled_subscriptions::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_subscribe_blocks_from_different_sources() {
harness::subscribe_blocks_from_different_sources::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_info_basic_queries_present_and_missing() {
harness::get_info_basic_queries_present_and_missing::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_info_latest_progression_multiple_finalizations() {
harness::get_info_latest_progression_multiple_finalizations::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_block_by_height_and_latest() {
harness::get_block_by_height_and_latest::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_block_by_commitment_from_sources_and_missing() {
harness::get_block_by_commitment_from_sources_and_missing::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_get_finalization_by_height() {
harness::get_finalization_by_height::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_hint_finalized_triggers_fetch() {
harness::hint_finalized_triggers_fetch::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_ancestry_stream() {
harness::ancestry_stream::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_finalize_same_height_different_views() {
harness::finalize_same_height_different_views::<CodingHarness>();
}
#[test_traced("WARN")]
fn test_coding_init_processed_height() {
harness::init_processed_height::<CodingHarness>();
}
#[test_traced("INFO")]
fn test_coding_broadcast_caches_block() {
harness::broadcast_caches_block::<CodingHarness>();
}
#[test_traced("INFO")]
fn test_certify_lower_view_after_higher_view() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let parent_ctx = CodingCtx {
round: Round::new(Epoch::new(0), View::new(1)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards
.clone()
.proposed(Round::new(Epoch::new(0), View::new(1)), coded_parent)
.await;
let round_a = Round::new(Epoch::new(0), View::new(5));
let context_a = CodingCtx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_a = make_coding_block(context_a.clone(), parent_digest, Height::new(2), 200);
let coded_block_a = CodedBlock::new(block_a.clone(), coding_config, &Sequential);
let commitment_a = coded_block_a.commitment();
shards.clone().proposed(round_a, coded_block_a).await;
let round_b = Round::new(Epoch::new(0), View::new(10));
let context_b = CodingCtx {
round: round_b,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_b = make_coding_block(context_b.clone(), parent_digest, Height::new(2), 300);
let coded_block_b = CodedBlock::new(block_b.clone(), coding_config, &Sequential);
let commitment_b = coded_block_b.commitment();
shards.clone().proposed(round_b, coded_block_b).await;
context.sleep(Duration::from_millis(10)).await;
let _ = marshaled.verify(context_a, commitment_a).await.await;
let _ = marshaled.verify(context_b, commitment_b).await.await;
let certify_b = marshaled.certify(round_b, commitment_b).await;
assert!(
certify_b.await.unwrap(),
"Block B certification should succeed"
);
let certify_a = marshaled.certify(round_a, commitment_a).await;
select! {
result = certify_a => {
assert!(result.unwrap(), "Block A certification should succeed");
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("Block A certification timed out");
},
}
})
}
#[test_traced("INFO")]
fn test_marshaled_reproposal_validation() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let mut parent = genesis.digest();
let mut last_view = View::zero();
let mut last_commitment = genesis_commitment();
for i in 1..BLOCKS_PER_EPOCH.get() {
let round = Round::new(Epoch::new(0), View::new(i));
let ctx = CodingCtx {
round,
leader: me.clone(),
parent: (last_view, last_commitment),
};
let block = make_coding_block(ctx.clone(), parent, Height::new(i), i * 100);
let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
last_commitment = coded_block.commitment();
shards.clone().proposed(round, coded_block).await;
parent = block.digest();
last_view = View::new(i);
}
let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
let boundary_round = Round::new(Epoch::new(0), View::new(boundary_height.get()));
let boundary_context = CodingCtx {
round: boundary_round,
leader: me.clone(),
parent: (last_view, last_commitment),
};
let boundary_block = make_coding_block(
boundary_context.clone(),
parent,
boundary_height,
boundary_height.get() * 100,
);
let coded_boundary =
CodedBlock::new(boundary_block.clone(), coding_config, &Sequential);
let boundary_commitment = coded_boundary.commitment();
shards
.clone()
.proposed(boundary_round, coded_boundary)
.await;
context.sleep(Duration::from_millis(10)).await;
let reproposal_round = Round::new(Epoch::new(0), View::new(20));
let reproposal_context = CodingCtx {
round: reproposal_round,
leader: me.clone(),
parent: (View::new(boundary_height.get()), boundary_commitment), };
let shard_validity = marshaled
.verify(reproposal_context.clone(), boundary_commitment)
.await
.await;
assert!(
shard_validity.unwrap(),
"Re-proposal verify should return true for shard validity"
);
let certify_result = marshaled
.certify(reproposal_round, boundary_commitment)
.await
.await;
assert!(
certify_result.unwrap(),
"Valid re-proposal at epoch boundary should be accepted"
);
let non_boundary_height = Height::new(10);
let non_boundary_round = Round::new(Epoch::new(0), View::new(10));
let non_boundary_context = CodingCtx {
round: non_boundary_round,
leader: me.clone(),
parent: (View::new(9), last_commitment), };
let non_boundary_block = make_coding_block(
non_boundary_context.clone(),
parent,
non_boundary_height,
1000,
);
let coded_non_boundary =
CodedBlock::new(non_boundary_block.clone(), coding_config, &Sequential);
let non_boundary_commitment = coded_non_boundary.commitment();
shards
.clone()
.proposed(non_boundary_round, coded_non_boundary)
.await;
context.sleep(Duration::from_millis(10)).await;
let invalid_reproposal_round = Round::new(Epoch::new(0), View::new(15));
let invalid_reproposal_context = CodingCtx {
round: invalid_reproposal_round,
leader: me.clone(),
parent: (View::new(10), non_boundary_commitment),
};
let shard_validity = marshaled
.verify(invalid_reproposal_context, non_boundary_commitment)
.await
.await;
assert!(
!shard_validity.unwrap(),
"Invalid re-proposal verify should return false"
);
let certify_result = marshaled
.certify(invalid_reproposal_round, non_boundary_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Invalid re-proposal (not at epoch boundary) should be rejected"
);
let cross_epoch_reproposal_round = Round::new(Epoch::new(1), View::new(20));
let cross_epoch_reproposal_context = CodingCtx {
round: cross_epoch_reproposal_round,
leader: me.clone(),
parent: (View::new(boundary_height.get()), boundary_commitment),
};
let shard_validity = marshaled
.verify(cross_epoch_reproposal_context.clone(), boundary_commitment)
.await
.await;
assert!(
!shard_validity.unwrap(),
"Cross-epoch re-proposal verify should return false"
);
let certify_result = marshaled
.certify(cross_epoch_reproposal_round, boundary_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Re-proposal with mismatched epoch should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_mismatched_context_digest() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let parent_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(1)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards
.clone()
.proposed(Round::new(Epoch::zero(), View::new(1)), coded_parent)
.await;
let round_a = Round::new(Epoch::zero(), View::new(2));
let context_a = CodingCtx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_a = make_coding_block(context_a, parent.digest(), Height::new(2), 200);
let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block_a, coding_config, &Sequential);
let commitment_a = coded_block_a.commitment();
let round_b = Round::new(Epoch::zero(), View::new(3));
let context_b = CodingCtx {
round: round_b,
leader: participants[1].clone(),
parent: (View::new(1), parent_commitment),
};
let verify_rx = marshaled.verify(context_b, commitment_a).await;
select! {
result = verify_rx => {
assert!(
!result.unwrap(),
"mismatched context digest should be rejected"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should reject mismatched context digest promptly");
},
}
})
}
#[test_traced("WARN")]
fn test_reproposal_verify_receiver_drop_does_not_synthesize_false() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()).await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let missing_payload = Commitment::from((
Sha256::hash(b"missing_block"),
Sha256::hash(b"missing_root"),
Sha256::hash(b"missing_context"),
coding_config,
));
let round = Round::new(Epoch::zero(), View::new(1));
let reproposal_context = CodingCtx {
round,
leader: me,
parent: (View::zero(), missing_payload),
};
let verify_rx = marshaled.verify(reproposal_context, missing_payload).await;
drop(verify_rx);
let certify_rx = marshaled.certify(round, missing_payload).await;
select! {
result = certify_rx => {
assert!(
result.is_err(),
"certify should resolve without an explicit verdict when verify receiver is dropped"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify task should resolve promptly after verify receiver drop");
},
}
})
}
#[test_traced("WARN")]
fn test_reproposal_missing_block_does_not_synthesize_false() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()).await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let missing_payload = Commitment::from((
Sha256::hash(b"missing_block"),
Sha256::hash(b"missing_root"),
Sha256::hash(b"missing_context"),
coding_config,
));
let round = Round::new(Epoch::zero(), View::new(1));
let reproposal_context = CodingCtx {
round,
leader: me,
parent: (View::zero(), missing_payload),
};
let verify_rx = marshaled.verify(reproposal_context, missing_payload).await;
context.sleep(Duration::from_millis(100)).await;
shards.prune(missing_payload).await;
select! {
result = verify_rx => {
assert!(
result.is_err(),
"verify should resolve without explicit false when re-proposal block is unavailable"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should resolve promptly when re-proposal block is unavailable");
},
}
let certify_rx = marshaled.certify(round, missing_payload).await;
select! {
result = certify_rx => {
assert!(
result.is_err(),
"certify should resolve without explicit false when re-proposal block is unavailable"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should resolve promptly when re-proposal block is unavailable");
},
}
})
}
#[test_traced("WARN")]
fn test_core_subscription_closes_when_coding_buffer_prunes_missing_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let missing_commitment = Commitment::from((
Sha256::hash(b"missing_block"),
Sha256::hash(b"missing_root"),
Sha256::hash(b"missing_context"),
coding_config,
));
let round = Round::new(Epoch::zero(), View::new(1));
let block_rx = marshal
.subscribe_by_commitment(Some(round), missing_commitment)
.await;
context.sleep(Duration::from_millis(100)).await;
shards.prune(missing_commitment).await;
select! {
result = block_rx => {
assert!(
result.is_err(),
"core subscription should close when coding buffer drops subscription"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("core subscription should resolve promptly after coding prune");
},
}
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_unsupported_epoch() {
#[derive(Clone)]
struct LimitedEpocher {
inner: FixedEpocher,
max_epoch: u64,
}
impl Epocher for LimitedEpocher {
fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
let bounds = self.inner.containing(height)?;
if bounds.epoch().get() > self.max_epoch {
None
} else {
Some(bounds)
}
}
fn first(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.first(epoch)
}
}
fn last(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.last(epoch)
}
}
}
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let limited_epocher = LimitedEpocher {
inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
max_epoch: 0,
};
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: limited_epocher,
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let parent_ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(19)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(19), 1000);
let parent_digest = parent.digest();
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards
.clone()
.proposed(Round::new(Epoch::zero(), View::new(19)), coded_parent)
.await;
let block_ctx = CodingCtx {
round: Round::new(Epoch::new(1), View::new(20)),
leader: default_leader(),
parent: (View::new(19), parent_commitment),
};
let block = make_coding_block(block_ctx, parent_digest, Height::new(20), 2000);
let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential);
let block_commitment = coded_block.commitment();
shards
.clone()
.proposed(Round::new(Epoch::new(1), View::new(20)), coded_block)
.await;
context.sleep(Duration::from_millis(10)).await;
let unsupported_round = Round::new(Epoch::new(1), View::new(20));
let unsupported_context = CodingCtx {
round: unsupported_round,
leader: me.clone(),
parent: (View::new(19), parent_commitment),
};
let _shard_validity = marshaled
.verify(unsupported_context, block_commitment)
.await;
let certify_result = marshaled
.certify(unsupported_round, block_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Block in unsupported epoch should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_invalid_ancestry() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let honest_parent_ctx = CodingCtx {
round: Round::new(Epoch::new(1), View::new(21)),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let honest_parent = make_coding_block(
honest_parent_ctx,
genesis.digest(),
Height::new(BLOCKS_PER_EPOCH.get() + 1),
1000,
);
let parent_digest = honest_parent.digest();
let coded_parent = CodedBlock::new(honest_parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards
.clone()
.proposed(Round::new(Epoch::new(1), View::new(21)), coded_parent)
.await;
let byzantine_round = Round::new(Epoch::new(1), View::new(35));
let byzantine_context = CodingCtx {
round: byzantine_round,
leader: me.clone(),
parent: (View::new(21), parent_commitment), };
let malicious_block = make_coding_block(
byzantine_context.clone(),
parent_digest,
Height::new(BLOCKS_PER_EPOCH.get() + 15), 2000,
);
let coded_malicious =
CodedBlock::new(malicious_block.clone(), coding_config, &Sequential);
let malicious_commitment = coded_malicious.commitment();
shards
.clone()
.proposed(byzantine_round, coded_malicious)
.await;
context.sleep(Duration::from_millis(10)).await;
let _shard_validity = marshaled
.verify(byzantine_context, malicious_commitment)
.await;
let certify_result = marshaled
.certify(byzantine_round, malicious_commitment)
.await
.await;
assert!(
!certify_result.unwrap(),
"Byzantine block with non-contiguous heights should be rejected"
);
let byzantine_round2 = Round::new(Epoch::new(1), View::new(22));
let byzantine_context2 = CodingCtx {
round: byzantine_round2,
leader: me.clone(),
parent: (View::new(21), parent_commitment), };
let malicious_block2 = make_coding_block(
byzantine_context2.clone(),
genesis.digest(), Height::new(BLOCKS_PER_EPOCH.get() + 2),
3000,
);
let coded_malicious2 =
CodedBlock::new(malicious_block2.clone(), coding_config, &Sequential);
let malicious_commitment2 = coded_malicious2.commitment();
shards
.clone()
.proposed(byzantine_round2, coded_malicious2)
.await;
context.sleep(Duration::from_millis(10)).await;
let _shard_validity = marshaled
.verify(byzantine_context2, malicious_commitment2)
.await;
let certify_result = marshaled
.certify(byzantine_round2, malicious_commitment2)
.await
.await;
assert!(
!certify_result.unwrap(),
"Byzantine block with mismatched parent commitment should be rejected"
);
})
}
#[test_traced("WARN")]
fn test_certify_without_prior_verify_crash_recovery() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis.clone());
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = CodingCtx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.clone().proposed(parent_round, coded_parent).await;
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = CodingCtx {
round: child_round,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
let coded_child = CodedBlock::new(child, coding_config, &Sequential);
let child_commitment = coded_child.commitment();
shards.clone().proposed(child_round, coded_child).await;
context.sleep(Duration::from_millis(10)).await;
let certify_rx = marshaled.certify(child_round, child_commitment).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify without prior verify should succeed for valid block"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should complete within timeout");
},
}
})
}
#[test_traced("WARN")]
fn test_malformed_commitment_config_rejected_at_deserialization() {
use commonware_codec::{Encode, ReadExt};
let malformed_bytes = [0u8; Commitment::SIZE];
let result = Commitment::read(&mut &malformed_bytes[..]);
assert!(
result.is_err(),
"deserialization of Commitment with zeroed CodingConfig must fail"
);
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let valid = Commitment::from((
Sha256::hash(b"block"),
Sha256::hash(b"root"),
Sha256::hash(b"context"),
coding_config,
));
let encoded = valid.encode();
let decoded =
Commitment::read(&mut &encoded[..]).expect("valid Commitment must deserialize");
assert_eq!(valid, decoded);
}
#[test_traced("WARN")]
fn test_certify_propagates_application_verify_failure() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let shards = setup.extra;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> =
MockVerifyingApp::with_verify_result(genesis.clone(), false);
let cfg = MarshaledConfig {
application: mock_app,
marshal: marshal.clone(),
shards: shards.clone(),
scheme_provider: ConstantProvider::new(schemes[0].clone()),
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_context = CodingCtx {
round: parent_round,
leader: me.clone(),
parent: (View::zero(), genesis_commitment()),
};
let parent = make_coding_block(parent_context, genesis.digest(), Height::new(1), 100);
let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential);
let parent_commitment = coded_parent.commitment();
shards.clone().proposed(parent_round, coded_parent).await;
let round = Round::new(Epoch::zero(), View::new(2));
let verify_context = CodingCtx {
round,
leader: me,
parent: (View::new(1), parent_commitment),
};
let block =
make_coding_block(verify_context.clone(), parent.digest(), Height::new(2), 200);
let coded_block = CodedBlock::new(block, coding_config, &Sequential);
let commitment = coded_block.commitment();
shards.clone().proposed(round, coded_block).await;
context.sleep(Duration::from_millis(10)).await;
let optimistic = marshaled.verify(verify_context, commitment).await;
assert!(
optimistic.await.expect("verify result missing"),
"optimistic verify should pass pre-checks and schedule deferred verification"
);
let certify = marshaled.certify(round, commitment).await;
assert!(
!certify.await.expect("certify result missing"),
"certify should propagate deferred application verify failure"
);
})
}
#[test_traced("WARN")]
fn test_backfill_block_mismatched_commitment() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.clone(),
NZUsize!(1),
participants[..2].iter().cloned(),
)
.await;
let coding_config_a = coding_config_for_participants(NUM_VALIDATORS as u16);
let coding_config_b = commonware_coding::Config {
minimum_shards: coding_config_a.minimum_shards.checked_add(1).unwrap(),
extra_shards: NZU16!(coding_config_a.extra_shards.get() - 1),
};
let v0_setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let v1_setup = CodingHarness::setup_validator(
context.with_label("validator_1"),
&mut oracle,
participants[1].clone(),
ConstantProvider::new(schemes[1].clone()),
)
.await;
setup_network_links(&mut oracle, &participants[..2], LINK).await;
let mut v0_mailbox = v0_setup.mailbox;
let v1_mailbox = v1_setup.mailbox;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let round1 = Round::new(Epoch::zero(), View::new(1));
let block1_ctx = CodingCtx {
round: round1,
leader: participants[0].clone(),
parent: (View::zero(), genesis_commitment()),
};
let block1 = make_coding_block(block1_ctx, genesis.digest(), Height::new(1), 100);
let coded_block_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block1.clone(), coding_config_a, &Sequential);
let commitment_a = coded_block_a.commitment();
let coded_block_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
CodedBlock::new(block1.clone(), coding_config_b, &Sequential);
let commitment_b = coded_block_b.commitment();
assert_eq!(coded_block_a.digest(), coded_block_b.digest());
assert_ne!(commitment_a, commitment_b);
v1_mailbox.proposed(round1, coded_block_b.clone()).await;
context.sleep(Duration::from_millis(100)).await;
let proposal: Proposal<Commitment> = Proposal {
round: round1,
parent: View::zero(),
payload: commitment_a,
};
let finalization = CodingHarness::make_finalization(proposal.clone(), &schemes, QUORUM);
CodingHarness::report_finalization(&mut v0_mailbox, finalization).await;
context.sleep(Duration::from_secs(5)).await;
let stored = v0_mailbox.get_block(Height::new(1)).await;
assert!(
stored.is_none(),
"v0 should reject backfilled block with mismatched commitment"
);
let stored_finalization = v0_mailbox.get_finalization(Height::new(1)).await;
assert!(
stored_finalization.is_none(),
"finalization should not be archived until matching block is available"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_missing_scheme_skips_propose_and_verify() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let genesis_ctx = CodingCtx {
round: Round::zero(),
leader: default_leader(),
parent: (View::zero(), genesis_commitment()),
};
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<CodingB, S> = MockVerifyingApp::new(genesis);
let cfg = MarshaledConfig {
application: mock_app,
marshal: setup.mailbox,
shards: setup.extra,
scheme_provider: EmptyProvider,
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
strategy: Sequential,
};
let mut marshaled = Marshaled::new(context.clone(), cfg);
let ctx = CodingCtx {
round: Round::new(Epoch::zero(), View::new(1)),
leader: me.clone(),
parent: (View::zero(), genesis_commitment()),
};
let rx = marshaled.propose(ctx.clone()).await;
assert!(rx.await.is_err());
let rx = marshaled.verify(ctx, genesis_commitment()).await;
assert!(rx.await.is_err());
});
}
}