use crate::{
marshal::{
ancestry::AncestorStream,
application::{
validation::{is_inferred_reproposal_at_certify, LastBuilt},
verification_tasks::VerificationTasks,
},
core::Mailbox,
standard::{
validation::{
fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
},
Standard,
},
Update,
},
simplex::{types::Context, Plan},
types::{Epoch, Epocher, Round},
Application, Automaton, CertifiableAutomaton, CertifiableBlock, Epochable, Relay, Reporter,
VerifyingApplication,
};
use commonware_cryptography::{certificate::Scheme, Digestible};
use commonware_macros::select;
use commonware_runtime::{
telemetry::metrics::histogram::{Buckets, Timed},
Clock, Metrics, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
sync::Mutex,
};
use rand::Rng;
use std::sync::Arc;
use tracing::{debug, warn};
#[derive(Clone)]
pub struct Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E>,
B: CertifiableBlock,
ES: Epocher,
{
context: E,
application: A,
marshal: Mailbox<S, Standard<B>>,
epocher: ES,
last_built: LastBuilt<B>,
verification_tasks: VerificationTasks<<B as Digestible>::Digest>,
build_duration: Timed<E>,
}
impl<E, S, A, B, ES> Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
use prometheus_client::metrics::histogram::Histogram;
let build_histogram = Histogram::new(Buckets::LOCAL);
context.register(
"build_duration",
"Histogram of time taken for the application to build a new block, in seconds",
build_histogram.clone(),
);
let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
Self {
context,
application,
marshal,
epocher,
last_built: Arc::new(Mutex::new(None)),
verification_tasks: VerificationTasks::new(),
build_duration,
}
}
#[inline]
fn deferred_verify(
&mut self,
context: <Self as Automaton>::Context,
block: B,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("deferred_verify")
.with_attribute("round", context.round)
.spawn(move |runtime_context| async move {
let application_valid = match verify_with_parent(
runtime_context,
context,
block,
&mut application,
&mut marshal,
&mut tx,
)
.await
{
Some(valid) => valid,
None => return,
};
tx.send_lossy(application_valid);
});
rx
}
}
impl<E, S, A, B, ES> Automaton for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
type Digest = B::Digest;
type Context = Context<Self::Digest, S::PublicKey>;
async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
if epoch.is_zero() {
return self.application.genesis().await.digest();
}
let prev = epoch.previous().expect("checked to be non-zero above");
let last_height = self
.epocher
.last(prev)
.expect("previous epoch should exist");
let Some(block) = self.marshal.get_block(last_height).await else {
unreachable!("missing starting epoch block at height {}", last_height);
};
block.digest()
}
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();
let build_duration = self.build_duration.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("propose")
.with_attribute("round", consensus_context.round)
.spawn(move |runtime_context| async move {
let (parent_view, parent_digest) = consensus_context.parent;
let parent_request = fetch_parent(
parent_digest,
Some(Round::new(consensus_context.epoch(), parent_view)),
&mut application,
&mut marshal,
)
.await;
let parent = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(
?parent_digest,
reason = "failed to fetch parent block",
"skipping proposal"
);
return;
}
},
};
let last_in_epoch = epocher
.last(consensus_context.epoch())
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.digest();
{
let mut lock = last_built.lock();
*lock = Some((consensus_context.round, parent));
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"re-proposed parent block at epoch boundary"
);
return;
}
let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
let build_request = application.propose(
(
runtime_context.with_label("app_propose"),
consensus_context.clone(),
),
ancestor_stream,
);
let mut build_timer = build_duration.timer();
let built_block = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = build_request => match result {
Some(block) => block,
None => {
debug!(
?parent_digest,
reason = "block building failed",
"skipping proposal"
);
return;
}
},
};
build_timer.observe();
let digest = built_block.digest();
{
let mut lock = last_built.lock();
*lock = Some((consensus_context.round, built_block));
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"proposed new block"
);
});
rx
}
async fn verify(
&mut self,
context: Context<Self::Digest, S::PublicKey>,
digest: Self::Digest,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut marshaled = self.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("optimistic_verify")
.with_attribute("round", context.round)
.spawn(move |_| async move {
let block_request = marshal.subscribe_by_digest(Some(context.round), digest).await;
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping optimistic verification"
);
return;
},
result = block_request => match result {
Ok(block) => block,
Err(_) => {
debug!(
?digest,
reason = "failed to fetch block for optimistic verification",
"skipping optimistic verification"
);
return;
}
},
};
let block = match precheck_epoch_and_reproposal(
&marshaled.epocher,
&mut marshal,
&context,
digest,
block,
)
.await
{
Decision::Complete(valid) => {
if valid {
let round = context.round;
let (task_tx, task_rx) = oneshot::channel();
task_tx.send_lossy(true);
marshaled.verification_tasks.insert(round, digest, task_rx);
}
tx.send_lossy(valid);
return;
}
Decision::Continue(block) => block,
};
if block.context() != context {
debug!(
?context,
block_context = ?block.context(),
"block-embedded context does not match consensus context during optimistic verification"
);
tx.send_lossy(false);
return;
}
let round = context.round;
let task = marshaled.deferred_verify(context, block);
marshaled.verification_tasks.insert(round, digest, task);
tx.send_lossy(true);
});
rx
}
}
impl<E, S, A, B, ES> CertifiableAutomaton for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
let task = self.verification_tasks.take(round, digest);
if let Some(task) = task {
return task;
}
debug!(
?round,
?digest,
"subscribing to block for certification using embedded context"
);
let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
let mut marshaled = self.clone();
let epocher = self.epocher.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("certify")
.with_attribute("round", round)
.spawn(move |_| async move {
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = block_rx => match result {
Ok(block) => block,
Err(_) => {
debug!(
?digest,
reason = "failed to fetch block for certification",
"skipping certification"
);
return;
}
},
};
let embedded_context = block.context();
let is_reproposal = is_inferred_reproposal_at_certify(
&epocher,
block.height(),
embedded_context.round,
round,
);
if is_reproposal {
marshaled.marshal.verified(round, block).await;
tx.send_lossy(true);
return;
}
let verify_rx = marshaled.deferred_verify(embedded_context, block);
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
});
rx
}
}
impl<E, S, A, B, ES> Relay for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
type Digest = B::Digest;
type PublicKey = S::PublicKey;
type Plan = Plan<S::PublicKey>;
async fn broadcast(&mut self, digest: Self::Digest, plan: Plan<S::PublicKey>) {
match plan {
Plan::Propose => {
let Some((round, block)) = self.last_built.lock().take() else {
warn!("missing block to broadcast");
return;
};
if block.digest() != digest {
warn!(
round = %round,
digest = %block.digest(),
height = %block.height(),
"skipping requested broadcast of block with mismatched digest"
);
return;
}
debug!(
round = %round,
digest = %block.digest(),
height = %block.height(),
"requested broadcast of built block"
);
self.marshal.proposed(round, block).await;
}
Plan::Forward { round, peers } => {
self.marshal.forward(round, digest, peers).await;
}
}
}
}
impl<E, S, A, B, ES> Reporter for Deferred<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
+ Reporter<Activity = Update<B>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
ES: Epocher,
{
type Activity = A::Activity;
async fn report(&mut self, update: Self::Activity) {
if let Update::Tip(round, _, _) = &update {
self.verification_tasks.retain_after(round);
}
self.application.report(update).await
}
}
#[cfg(test)]
mod tests {
use super::Deferred;
use crate::{
marshal::mocks::{
harness::{
default_leader, make_raw_block, setup_network_with_participants, Ctx,
StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
},
verifying::MockVerifyingApp,
},
simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
Automaton, CertifiableAutomaton,
};
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider},
sha256::Sha256,
Digestible, Hasher as _,
};
use commonware_macros::{select, test_traced};
use commonware_runtime::{deterministic, Clock, Metrics, Runner};
use commonware_utils::NZUsize;
use std::time::Duration;
#[test_traced("INFO")]
fn test_certify_lower_view_after_higher_view() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
let mut marshaled = Deferred::new(
context.clone(),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
marshal
.clone()
.proposed(Round::new(Epoch::new(0), View::new(1)), parent.clone())
.await;
let round_a = Round::new(Epoch::new(0), View::new(5));
let context_a = Ctx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let block_a = B::new::<Sha256>(context_a.clone(), parent_digest, Height::new(2), 200);
let commitment_a = block_a.digest();
marshal.clone().proposed(round_a, block_a.clone()).await;
let round_b = Round::new(Epoch::new(0), View::new(10));
let context_b = Ctx {
round: round_b,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let block_b = B::new::<Sha256>(context_b.clone(), parent_digest, Height::new(2), 300);
let commitment_b = block_b.digest();
marshal.clone().proposed(round_b, block_b.clone()).await;
context.sleep(Duration::from_millis(10)).await;
let _ = marshaled.verify(context_a, commitment_a).await.await;
let _ = marshaled.verify(context_b, commitment_b).await.await;
let certify_b = marshaled.certify(round_b, commitment_b).await;
assert!(
certify_b.await.unwrap(),
"Block B certification should succeed"
);
let certify_a = marshaled.certify(round_a, commitment_a).await;
select! {
result = certify_a => {
assert!(result.unwrap(), "Block A certification should succeed");
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("Block A certification timed out");
},
}
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_unsupported_epoch() {
#[derive(Clone)]
struct LimitedEpocher {
inner: FixedEpocher,
max_epoch: u64,
}
impl Epocher for LimitedEpocher {
fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
let bounds = self.inner.containing(height)?;
if bounds.epoch().get() > self.max_epoch {
None
} else {
Some(bounds)
}
}
fn first(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.first(epoch)
}
}
fn last(&self, epoch: Epoch) -> Option<Height> {
if epoch.get() > self.max_epoch {
None
} else {
self.inner.last(epoch)
}
}
}
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
let limited_epocher = LimitedEpocher {
inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
max_epoch: 0,
};
let mut marshaled =
Deferred::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
let parent_ctx = Ctx {
round: Round::new(Epoch::zero(), View::new(19)),
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent =
B::new::<Sha256>(parent_ctx.clone(), genesis.digest(), Height::new(19), 1000);
let parent_digest = parent.digest();
marshal
.clone()
.proposed(Round::new(Epoch::zero(), View::new(19)), parent.clone())
.await;
let unsupported_round = Round::new(Epoch::new(1), View::new(20));
let unsupported_context = Ctx {
round: unsupported_round,
leader: me.clone(),
parent: (View::new(19), parent_digest),
};
let block = B::new::<Sha256>(
unsupported_context.clone(),
parent_digest,
Height::new(20),
2000,
);
let block_commitment = block.digest();
marshal
.clone()
.proposed(unsupported_round, block.clone())
.await;
context.sleep(Duration::from_millis(10)).await;
let verify_result = marshaled
.verify(unsupported_context, block_commitment)
.await;
let optimistic_result = verify_result.await;
assert!(
!optimistic_result.unwrap(),
"Optimistic verify should reject block in unsupported epoch"
);
})
}
#[test_traced("WARN")]
fn test_marshaled_rejects_mismatched_context() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
let mut marshaled = Deferred::new(
context.clone(),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent_ctx = Ctx {
round: Round::new(Epoch::zero(), View::new(1)),
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_commitment = parent.digest();
marshal
.clone()
.proposed(Round::new(Epoch::zero(), View::new(1)), parent.clone())
.await;
let round_a = Round::new(Epoch::zero(), View::new(2));
let context_a = Ctx {
round: round_a,
leader: me.clone(),
parent: (View::new(1), parent_commitment),
};
let block_a = B::new::<Sha256>(context_a, parent.digest(), Height::new(2), 200);
let commitment_a = block_a.digest();
marshal.clone().proposed(round_a, block_a).await;
context.sleep(Duration::from_millis(10)).await;
let round_b = Round::new(Epoch::zero(), View::new(3));
let context_b = Ctx {
round: round_b,
leader: participants[1].clone(),
parent: (View::new(1), parent_commitment),
};
let verify_rx = marshaled.verify(context_b, commitment_a).await;
select! {
result = verify_rx => {
assert!(
!result.unwrap(),
"mismatched context hash should be rejected"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should reject mismatched context hash promptly");
},
}
})
}
}