use crate::{
marshal::{
ancestry::AncestorStream,
application::validation::LastBuilt,
core::Mailbox,
standard::{
validation::{
fetch_parent, precheck_epoch_and_reproposal, verify_with_parent, Decision,
},
Standard,
},
Update,
},
simplex::{types::Context, Plan},
types::{Epoch, Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
VerifyingApplication,
};
use commonware_cryptography::certificate::Scheme;
use commonware_macros::select;
use commonware_runtime::{
telemetry::metrics::histogram::{Buckets, Timed},
Clock, Metrics, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
sync::Mutex,
};
use prometheus_client::metrics::histogram::Histogram;
use rand::Rng;
use std::{collections::BTreeSet, sync::Arc};
use tracing::{debug, warn};
type AvailableBlocks<D> = Arc<Mutex<BTreeSet<(Round, D)>>>;
async fn await_block_subscription<T, D>(
tx: &mut oneshot::Sender<bool>,
block_rx: oneshot::Receiver<T>,
digest: &D,
stage: &'static str,
) -> Option<T>
where
D: std::fmt::Debug + ?Sized,
{
select! {
_ = tx.closed() => {
debug!(
stage,
reason = "consensus dropped receiver",
"skipping block wait"
);
None
},
result = block_rx => {
if result.is_err() {
debug!(
stage,
?digest,
reason = "failed to fetch block",
"skipping block wait"
);
}
result.ok()
},
}
}
#[derive(Clone)]
pub struct Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E>,
B: Block + Clone,
ES: Epocher,
{
context: E,
application: A,
marshal: Mailbox<S, Standard<B>>,
epocher: ES,
last_built: LastBuilt<B>,
available_blocks: AvailableBlocks<B::Digest>,
build_duration: Timed<E>,
}
impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: Block + Clone,
ES: Epocher,
{
pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
let build_histogram = Histogram::new(Buckets::LOCAL);
context.register(
"build_duration",
"Histogram of time taken for the application to build a new block, in seconds",
build_histogram.clone(),
);
let build_duration = Timed::new(build_histogram, Arc::new(context.clone()));
Self {
context,
application,
marshal,
epocher,
last_built: Arc::new(Mutex::new(None)),
available_blocks: Arc::new(Mutex::new(BTreeSet::new())),
build_duration,
}
}
}
impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: Block + Clone,
ES: Epocher,
{
type Digest = B::Digest;
type Context = Context<Self::Digest, S::PublicKey>;
async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
if epoch.is_zero() {
return self.application.genesis().await.digest();
}
let prev = epoch.previous().expect("checked to be non-zero above");
let last_height = self
.epocher
.last(prev)
.expect("previous epoch should exist");
let Some(block) = self.marshal.get_block(last_height).await else {
unreachable!("missing starting epoch block at height {}", last_height);
};
block.digest()
}
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();
let build_duration = self.build_duration.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("propose")
.with_attribute("round", consensus_context.round)
.spawn(move |runtime_context| async move {
let (parent_view, parent_digest) = consensus_context.parent;
let parent_request = fetch_parent(
parent_digest,
Some(Round::new(consensus_context.epoch(), parent_view)),
&mut application,
&mut marshal,
)
.await;
let parent = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(
?parent_digest,
reason = "failed to fetch parent block",
"skipping proposal"
);
return;
}
},
};
let last_in_epoch = epocher
.last(consensus_context.epoch())
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.digest();
{
let mut lock = last_built.lock();
*lock = Some((consensus_context.round, parent));
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"re-proposed parent block at epoch boundary"
);
return;
}
let ancestor_stream = AncestorStream::new(marshal.clone(), [parent]);
let build_request = application.propose(
(
runtime_context.with_label("app_propose"),
consensus_context.clone(),
),
ancestor_stream,
);
let mut build_timer = build_duration.timer();
let built_block = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = build_request => match result {
Some(block) => block,
None => {
debug!(
?parent_digest,
reason = "block building failed",
"skipping proposal"
);
return;
}
},
};
build_timer.observe();
let digest = built_block.digest();
{
let mut lock = last_built.lock();
*lock = Some((consensus_context.round, built_block));
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"proposed new block"
);
});
rx
}
async fn verify(
&mut self,
context: Context<Self::Digest, S::PublicKey>,
digest: Self::Digest,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let available_blocks = self.available_blocks.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("inline_verify")
.with_attribute("round", context.round)
.spawn(move |runtime_context| async move {
let block_request = marshal
.subscribe_by_digest(Some(context.round), digest)
.await;
let Some(block) =
await_block_subscription(&mut tx, block_request, &digest, "verification").await
else {
return;
};
available_blocks.lock().insert((context.round, digest));
let block = match precheck_epoch_and_reproposal(
&epocher,
&mut marshal,
&context,
digest,
block,
)
.await
{
Decision::Complete(valid) => {
tx.send_lossy(valid);
return;
}
Decision::Continue(block) => block,
};
let application_valid = match verify_with_parent(
runtime_context,
context,
block,
&mut application,
&mut marshal,
&mut tx,
)
.await
{
Some(valid) => valid,
None => return,
};
tx.send_lossy(application_valid);
});
rx
}
}
impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: Block + Clone,
ES: Epocher,
{
async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
if self.available_blocks.lock().contains(&(round, digest)) {
let (tx, rx) = oneshot::channel();
tx.send_lossy(true);
return rx;
}
let block_rx = self.marshal.subscribe_by_digest(Some(round), digest).await;
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("inline_certify")
.with_attribute("round", round)
.spawn(move |_| async move {
if await_block_subscription(&mut tx, block_rx, &digest, "certification")
.await
.is_some()
{
tx.send_lossy(true);
}
});
rx
}
}
impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
B: Block + Clone,
ES: Epocher,
{
type Digest = B::Digest;
type PublicKey = S::PublicKey;
type Plan = Plan<S::PublicKey>;
async fn broadcast(&mut self, digest: Self::Digest, plan: Plan<S::PublicKey>) {
match plan {
Plan::Propose => {
let Some((round, block)) = self.last_built.lock().take() else {
warn!("missing block to broadcast");
return;
};
if block.digest() != digest {
warn!(
round = %round,
digest = %block.digest(),
height = %block.height(),
"skipping requested broadcast of block with mismatched digest"
);
return;
}
self.marshal.proposed(round, block).await;
}
Plan::Forward { round, peers } => {
self.marshal.forward(round, digest, peers).await;
}
}
}
}
impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
+ Reporter<Activity = Update<B>>,
B: Block + Clone,
ES: Epocher,
{
type Activity = A::Activity;
async fn report(&mut self, update: Self::Activity) {
if let Update::Tip(tip_round, _, _) = &update {
self.available_blocks
.lock()
.retain(|(round, _)| round > tip_round);
}
self.application.report(update).await
}
}
#[cfg(test)]
mod tests {
use super::Inline;
use crate::{
marshal::mocks::{
harness::{
default_leader, make_raw_block, setup_network_with_participants, Ctx,
StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
},
verifying::MockVerifyingApp,
},
simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context},
types::{Epoch, FixedEpocher, Height, Round, View},
Automaton, Block, CertifiableAutomaton, Relay, VerifyingApplication,
};
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider, Scheme},
sha256::Sha256,
Digestible, Hasher as _,
};
use commonware_macros::{select, test_traced};
use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner};
use commonware_utils::NZUsize;
use rand::Rng;
use std::time::Duration;
#[allow(dead_code)]
fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = S,
Context = Context<B::Digest, S::PublicKey>,
>,
B: Block + Clone,
ES: crate::types::Epocher,
{
fn assert_automaton<T: Automaton>() {}
fn assert_certifiable<T: CertifiableAutomaton>() {}
fn assert_relay<T: Relay>() {}
assert_automaton::<Inline<E, S, A, B, ES>>();
assert_certifiable::<Inline<E, S, A, B, ES>>();
assert_relay::<Inline<E, S, A, B, ES>>();
}
#[test_traced("INFO")]
fn test_certify_returns_immediately_after_verify_fetches_block() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
let mut inline = Inline::new(
context.clone(),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = Ctx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
marshal.clone().proposed(parent_round, parent).await;
let round = Round::new(Epoch::zero(), View::new(2));
let verify_context = Ctx {
round,
leader: me,
parent: (View::new(1), parent_digest),
};
let block =
B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
let digest = block.digest();
marshal.clone().proposed(round, block).await;
let verify_rx = inline.verify(verify_context, digest).await;
assert!(
verify_rx.await.unwrap(),
"verify should complete successfully before certify"
);
let certify_rx = inline.certify(round, digest).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify should return immediately once verify has fetched the block"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should not hang after local verify completed");
},
}
});
}
#[test_traced("INFO")]
fn test_certify_succeeds_without_verify_task() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new(genesis.clone());
let mut inline = Inline::new(
context.clone(),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = Ctx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
marshal.clone().proposed(parent_round, parent).await;
let round = Round::new(Epoch::zero(), View::new(2));
let verify_context = Ctx {
round,
leader: me,
parent: (View::new(1), parent_digest),
};
let block =
B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
let digest = block.digest();
marshal.clone().proposed(round, block).await;
let certify_rx = inline.certify(round, digest).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify should resolve once block availability is known"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should not hang when block is already available in marshal");
},
}
});
}
}