use crate::{
marshal::{
application::{
validation::{is_inferred_reproposal_at_certify, Stage},
verification_tasks::VerificationTasks,
},
core::{CommitmentFallback, DigestFallback, Mailbox},
standard::{
validation::{precheck_epoch_and_reproposal, verify_with_parent, Decision},
Standard,
},
Update,
},
simplex::{types::Context, Plan},
types::{Epocher, Round},
Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
};
use commonware_actor::Feedback;
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_macros::select;
use commonware_p2p::Recipients;
use commonware_runtime::{
telemetry::metrics::{
histogram::{Buckets, Timed},
MetricsExt as _,
},
Clock, Metrics, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
sync::AsyncMutex,
};
use rand::Rng;
use std::sync::Arc;
use tracing::debug;
pub struct Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E>,
B: CertifiableBlock,
ES: Epocher,
{
context: Arc<AsyncMutex<E>>,
application: A,
marshal: Mailbox<S, Standard<B>>,
epocher: ES,
verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
build_duration: Timed,
proposal_parent_fetch_duration: Timed,
ancestor_fetch_duration: Timed,
}
impl<E, S, A, B, ES> Clone for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E>,
B: CertifiableBlock,
ES: Epocher,
{
fn clone(&self) -> Self {
Self {
context: self.context.clone(),
application: self.application.clone(),
marshal: self.marshal.clone(),
epocher: self.epocher.clone(),
verification_tasks: self.verification_tasks.clone(),
build_duration: self.build_duration.clone(),
proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
}
}
}
impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
let build_histogram = context.histogram(
"build_duration",
"Histogram of time taken for the application to build a new block, in seconds",
Buckets::LOCAL,
);
let build_duration = Timed::new(build_histogram);
let parent_fetch_histogram = context.histogram(
"parent_fetch_duration",
"Histogram of time taken to fetch a parent block in propose, in seconds",
Buckets::LOCAL,
);
let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
let ancestor_fetch_histogram = context.histogram(
"ancestor_fetch_duration",
"Histogram of time taken to fetch a block via the ancestry stream, in seconds",
Buckets::LOCAL,
);
let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
Self {
context: Arc::new(AsyncMutex::new(context)),
application,
marshal,
epocher,
verification_tasks: VerificationTasks::new(),
build_duration,
proposal_parent_fetch_duration,
ancestor_fetch_duration,
}
}
#[inline]
async fn deferred_verify(
&mut self,
context: <Self as Automaton>::Context,
block: B,
stage: Stage,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let (mut tx, rx) = oneshot::channel();
let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
let runtime_context = self
.context
.lock()
.await
.child("deferred_verify")
.with_attribute("round", context.round);
runtime_context.spawn(move |runtime_context| async move {
let application_valid = match verify_with_parent(
runtime_context,
context,
block,
&mut application,
&mut marshal,
&mut tx,
stage,
ancestor_fetch_duration,
)
.await
{
Some(valid) => valid,
None => return,
};
tx.send_lossy(application_valid);
});
rx
}
async fn certify_from_embedded_context(
&mut self,
round: Round,
digest: B::Digest,
) -> oneshot::Receiver<bool> {
debug!(
?round,
?digest,
"subscribing to block for certification using embedded context"
);
let block_rx = self
.marshal
.subscribe_by_digest(digest, DigestFallback::FetchByRound { round });
let mut marshaled = self.clone();
let epocher = self.epocher.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("certify")
.with_attribute("round", round);
context.spawn(move |_| async move {
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = block_rx => match result {
Ok(block) => block,
Err(_) => {
debug!(
?digest,
reason = "failed to fetch block for certification",
"skipping certification"
);
return;
}
},
};
let embedded_context = block.context();
let is_reproposal = is_inferred_reproposal_at_certify(
&epocher,
block.height(),
embedded_context.round,
round,
);
if is_reproposal {
if !marshaled.marshal.certified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
tx.send_lossy(true);
return;
}
let verify_rx = marshaled
.deferred_verify(embedded_context, block, Stage::Certified)
.await;
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
});
rx
}
async fn certify_from_existing_task(
&mut self,
round: Round,
digest: B::Digest,
task: oneshot::Receiver<bool>,
) -> oneshot::Receiver<bool> {
self.marshal.hint_notarized(round, digest);
let mut marshaled = self.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("certify_existing")
.with_attribute("round", round);
context.spawn(move |_| async move {
let result = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = task => result,
};
match result {
Ok(result) => {
tx.send_lossy(result);
}
Err(_) => {
debug!(
?round,
?digest,
"verification task closed before certification, falling back to embedded context"
);
let fallback = marshaled.certify_from_embedded_context(round, digest).await;
let result = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = fallback => result,
};
if let Ok(result) = result {
tx.send_lossy(result);
}
}
}
});
rx
}
}
impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
type Digest = B::Digest;
type Context = Context<Self::Digest, S::PublicKey>;
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let build_duration = self.build_duration.clone();
let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("propose")
.with_attribute("round", consensus_context.round);
context.spawn(move |runtime_context| async move {
if let Some(block) = marshal.get_verified(consensus_context.round).await {
let block_context = block.context();
if block_context != consensus_context {
debug!(
round = ?consensus_context.round,
?consensus_context,
?block_context,
"skipping proposal: cached verified block context no longer matches"
);
return;
}
let digest = block.digest();
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"reused verified block from marshal on leader recovery"
);
return;
}
let (parent_view, parent_commitment) = consensus_context.parent;
let parent_request = marshal.subscribe_by_commitment(
parent_commitment,
CommitmentFallback::FetchByRound {
round: Round::new(consensus_context.epoch(), parent_view),
},
);
let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
let parent = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(
?parent_commitment,
reason = "failed to fetch parent block",
"skipping proposal"
);
return;
}
},
};
parent_timer.observe(&runtime_context);
let last_in_epoch = epocher
.last(consensus_context.epoch())
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.digest();
if !marshal.verified(consensus_context.round, parent).await {
debug!(
round = ?consensus_context.round,
?digest,
"marshal rejected re-proposed boundary block"
);
return;
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"re-proposed parent block at epoch boundary"
);
return;
}
let ancestor_stream = marshal.ancestor_stream(
Arc::new(runtime_context.child("ancestor_stream")),
[parent],
ancestor_fetch_duration,
);
let build_request = application.propose(
(
runtime_context.child("app_propose"),
consensus_context.clone(),
),
ancestor_stream,
);
let build_timer = build_duration.timer(&runtime_context);
let built_block = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = build_request => match result {
Some(block) => block,
None => {
debug!(
?parent_commitment,
reason = "block building failed",
"skipping proposal"
);
return;
}
},
};
build_timer.observe(&runtime_context);
let digest = built_block.digest();
if !marshal.proposed(consensus_context.round, built_block).await {
debug!(
round = ?consensus_context.round,
?digest,
"marshal rejected proposed block"
);
return;
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"proposed new block"
);
});
rx
}
async fn verify(
&mut self,
context: Context<Self::Digest, S::PublicKey>,
digest: Self::Digest,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut marshaled = self.clone();
let round = context.round;
let (task_tx, task_rx) = oneshot::channel();
self.verification_tasks.insert(round, digest, task_rx);
let (mut tx, rx) = oneshot::channel();
let runtime_context = self
.context
.lock()
.await
.child("optimistic_verify")
.with_attribute("round", round);
runtime_context.spawn(move |_| async move {
let block_request = marshal.subscribe_by_digest(digest, DigestFallback::Wait);
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping optimistic verification"
);
return;
},
result = block_request => match result {
Ok(block) => block,
Err(_) => {
debug!(
?digest,
reason = "failed to fetch block for optimistic verification",
"skipping optimistic verification"
);
return;
}
},
};
let Some(decision) = precheck_epoch_and_reproposal(
&marshaled.epocher,
&mut marshal,
&context,
digest,
block,
)
.await
else {
return;
};
let block = match decision {
Decision::Complete(valid) => {
task_tx.send_lossy(valid);
tx.send_lossy(valid);
return;
}
Decision::Continue(block) => block,
};
if block.context() != context {
debug!(
?context,
block_context = ?block.context(),
"block-embedded context does not match consensus context during optimistic verification"
);
task_tx.send_lossy(false);
tx.send_lossy(false);
return;
}
let deferred_rx = marshaled
.deferred_verify(context, block, Stage::Verified)
.await;
tx.send_lossy(true);
if let Ok(result) = deferred_rx.await {
task_tx.send_lossy(result);
}
});
rx
}
}
impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
let task = self.verification_tasks.take(round, digest);
if let Some(task) = task {
return self.certify_from_existing_task(round, digest, task).await;
}
self.certify_from_embedded_context(round, digest).await
}
}
impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
type Digest = B::Digest;
type PublicKey = S::PublicKey;
type Plan = Plan<S::PublicKey>;
fn broadcast(&mut self, commitment: Self::Digest, plan: Plan<S::PublicKey>) -> Feedback {
let (round, recipients) = match plan {
Plan::Propose { round } => (round, Recipients::All),
Plan::Forward { round, recipients } => (round, recipients),
};
self.marshal.forward(round, commitment, recipients)
}
}
impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
+ Reporter<Activity = Update<B>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
type Activity = A::Activity;
fn report(&mut self, update: Self::Activity) -> Feedback {
if let Update::Tip(round, _, _) = &update {
self.verification_tasks.retain_after(round);
}
self.application.report(update)
}
}
#[cfg(test)]
mod tests {
use super::Deferred;
use crate::{
marshal::mocks::{
harness::{
default_leader, make_raw_block, setup_network_with_participants, Ctx,
StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
},
verifying::{GatedVerifyingApp, MockVerifyingApp},
},
simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
Automaton, CertifiableAutomaton,
};
use commonware_broadcast::Broadcaster;
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider},
sha256::Sha256,
Digestible, Hasher as _,
};
use commonware_macros::{select, test_traced};
use commonware_runtime::{deterministic, Clock, Runner, Supervisor as _};
use commonware_utils::{channel::fallible::OneshotExt, NZUsize};
use std::time::Duration;
#[test_traced("INFO")]
fn test_certify_lower_view_after_higher_view() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
assert!(
marshal
.verified(Round::new(Epoch::new(0), View::new(1)), parent.clone())
.await
);
let round_a = Round::new(Epoch::new(0), View::new(5));
let context_a = Ctx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
let commitment_a = StandardHarness::commitment(&block_a);
assert!(marshal.verified(round_a, block_a.clone()).await);
let round_b = Round::new(Epoch::new(0), View::new(10));
let context_b = Ctx {
round: round_b,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
let commitment_b = StandardHarness::commitment(&block_b);
assert!(marshal.verified(round_b, block_b.clone()).await);
context.sleep(Duration::from_millis(10)).await;
let _ = marshaled.verify(context_a, commitment_a).await.await;
let _ = marshaled.verify(context_b, commitment_b).await.await;
let certify_b = marshaled.certify(round_b, commitment_b).await;
assert!(
certify_b.await.unwrap(),
"Block B certification should succeed"
);
let certify_a = marshaled.certify(round_a, commitment_a).await;
select! {
result = certify_a => {
assert!(result.unwrap(), "Block A certification should succeed");
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("Block A certification timed out");
},
}
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_unsupported_epoch() {
#[derive(Clone)]
struct LimitedEpocher {
inner: FixedEpocher,
max_epoch: u64,
}
impl Epocher for LimitedEpocher {
fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
let bounds = self.inner.containing(height)?;
if bounds.epoch().get() > self.max_epoch {
None
} else {
Some(bounds)
}
}
fn first(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.first(epoch)
}
}
fn last(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.last(epoch)
}
}
}
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let limited_epocher = LimitedEpocher {
inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
max_epoch: 0,
};
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
limited_epocher,
);
let parent_ctx = Ctx {
round: Round::new(Epoch::zero(), View::new(19)),
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent =
B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
let parent_digest = parent.digest();
assert!(
marshal
.clone()
.verified(Round::new(Epoch::zero(), View::new(19)), parent.clone())
.await
);
let unsupported_round = Round::new(Epoch::new(1), View::new(20));
let unsupported_context = Ctx {
round: unsupported_round,
leader: me.clone(),
parent: (View::new(19), parent_digest),
};
let block = B::new::<Sha256>(
unsupported_context.clone(),
parent_digest,
Height::new(20),
2000,
);
let block_commitment = StandardHarness::commitment(&block);
assert!(
marshal
.clone()
.verified(unsupported_round, block.clone())
.await
);
context.sleep(Duration::from_millis(10)).await;
let verify_result = marshaled
.verify(unsupported_context, block_commitment)
.await;
let optimistic_result = verify_result.await;
assert!(
!optimistic_result.unwrap(),
"Optimistic verify should reject block in unsupported epoch"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_mismatched_context() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent_ctx = Ctx {
round: Round::new(Epoch::zero(), View::new(1)),
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_commitment = StandardHarness::commitment(&parent);
assert!(
marshal
.clone()
.verified(Round::new(Epoch::zero(), View::new(1)), parent.clone())
.await
);
let round_a = Round::new(Epoch::zero(), View::new(2));
let context_a = Ctx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
let commitment_a = StandardHarness::commitment(&block_a);
assert!(marshal.verified(round_a, block_a).await);
context.sleep(Duration::from_millis(10)).await;
let round_b = Round::new(Epoch::zero(), View::new(3));
let context_b = Ctx {
round: round_b,
leader: participants[1].clone(),
parent: (View::new(1), parent_commitment),
};
let verify_rx = marshaled.verify(context_b, commitment_a).await;
select! {
result = verify_rx => {
assert!(
!result.unwrap(),
"mismatched context hash should be rejected"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should reject mismatched context hash promptly");
},
}
})
}
#[test_traced("WARN")]
fn test_deferred_certify_recovers_after_verify_receiver_drop() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let round = Round::new(Epoch::zero(), View::new(1));
let block_context = Ctx {
round,
leader: me,
parent: (View::zero(), genesis.digest()),
};
let block =
B::new::<Sha256>(block_context.clone(), genesis.digest(), Height::new(1), 100);
let digest = block.digest();
let verify_rx = marshaled.verify(block_context, digest).await;
drop(verify_rx);
context.sleep(Duration::from_millis(10)).await;
assert!(marshal.proposed(round, block).await);
let certify_rx = marshaled.certify(round, digest).await;
select! {
result = certify_rx => {
assert!(
result.expect("certify result missing"),
"certify should recover after verify receiver drop"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should recover promptly after verify drop");
},
}
});
}
#[test_traced("WARN")]
fn test_deferred_certify_does_not_bypass_failed_verify_persistence() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let buffer = setup.extra;
let marshal_actor_handle = setup.actor_handle;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let (mock_app, verify_started, release_verify): (GatedVerifyingApp<B, S>, _, _) =
GatedVerifyingApp::new();
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = Ctx {
round: child_round,
leader: me,
parent: (View::new(1), parent_digest),
};
let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
let child_digest = child.digest();
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), parent)
.accepted(),
"buffer broadcast for parent should be accepted"
);
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), child)
.accepted(),
"buffer broadcast for child should be accepted"
);
let optimistic_rx = marshaled.verify(child_ctx, child_digest).await;
let result = optimistic_rx
.await
.expect("optimistic verify should resolve");
assert!(
result,
"optimistic verify should accept the available block"
);
let certify_rx = marshaled.certify(child_round, child_digest).await;
verify_started
.await
.expect("verify should reach application before marshal abort");
marshal_actor_handle.abort();
let _ = marshal_actor_handle.await;
release_verify.send_lossy(());
select! {
result = certify_rx => {
assert!(
result.is_err(),
"certify must not resolve after marshal.verified loses its persistence ack"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should terminate after marshal abort");
},
}
});
}
#[test_traced("WARN")]
fn test_propose_reuses_verified_block_on_restart() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let round = Round::new(Epoch::zero(), View::new(1));
let ctx = Ctx {
round,
leader: me.clone(),
parent: (View::zero(), genesis.digest()),
};
let block_a = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
let digest_a = block_a.digest();
assert!(marshal.verified(round, block_a.clone()).await);
let block_b = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
let digest_b = block_b.digest();
assert_ne!(digest_a, digest_b, "test requires distinct digests");
let mock_app: MockVerifyingApp<B, S> =
MockVerifyingApp::new().with_propose_result(block_b);
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let digest_rx = marshaled.propose(ctx).await;
let digest = digest_rx.await.expect("propose must return a digest");
assert_eq!(
digest, digest_a,
"propose must reuse the block marshal already persisted for this round"
);
});
}
#[test_traced("WARN")]
fn test_propose_skips_when_verified_block_context_changed() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let round = Round::new(Epoch::zero(), View::new(2));
let stale_ctx = Ctx {
round,
leader: me.clone(),
parent: (View::zero(), genesis.digest()),
};
let stale_block = B::new::<Sha256>(stale_ctx, genesis.digest(), Height::new(1), 100);
assert!(marshal.verified(round, stale_block).await);
let new_parent_digest = Sha256::hash(b"late-certified-parent");
let new_ctx = Ctx {
round,
leader: me.clone(),
parent: (View::new(1), new_parent_digest),
};
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut marshaled = Deferred::new(
context.child("deferred"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let digest_rx = marshaled.propose(new_ctx).await;
assert!(
digest_rx.await.is_err(),
"propose must drop the receiver when the cached block's context no longer matches"
);
});
}
}