use crate::{
marshal::{
ancestry::AncestorStream,
application::{
validation::{
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt,
},
verification_tasks::VerificationTasks,
},
coding::{
shards,
types::{coding_config_for_participants, hash_context, CodedBlock},
validation::{validate_block, validate_proposal, ProposalError},
Coding,
},
core, Update,
},
simplex::{scheme::Scheme, types::Context, Plan},
types::{coding::Commitment, Epoch, Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
Relay, Reporter, VerifyingApplication,
};
use commonware_coding::{Config as CodingConfig, Scheme as CodingScheme};
use commonware_cryptography::{
certificate::{Provider, Scheme as CertificateScheme},
Committable, Digestible, Hasher,
};
use commonware_macros::select;
use commonware_parallel::Strategy;
use commonware_runtime::{
telemetry::metrics::histogram::{Buckets, Timed},
Clock, Metrics, Spawner, Storage,
};
use commonware_utils::{
channel::{
fallible::OneshotExt,
oneshot::{self, error::RecvError},
},
sync::Mutex,
NZU16,
};
use futures::future::{ready, try_join, Either, Ready};
use prometheus_client::metrics::histogram::Histogram;
use rand::Rng;
use std::sync::{Arc, OnceLock};
use tracing::{debug, warn};
const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
minimum_shards: NZU16!(1),
extra_shards: NZU16!(1),
};
#[allow(clippy::type_complexity)]
pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
where
B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
pub application: A,
pub marshal:
core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
pub scheme_provider: Z,
pub strategy: S,
pub epocher: ES,
}
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<E>,
B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
context: E,
application: A,
marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
scheme_provider: Z,
epocher: ES,
strategy: S,
last_built: LastBuilt<CodedBlock<B, C, H>>,
verification_tasks: VerificationTasks<Commitment>,
cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
build_duration: Timed<E>,
verify_duration: Timed<E>,
proposal_parent_fetch_duration: Timed<E>,
erasure_encode_duration: Timed<E>,
}
impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = Z::Scheme,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
let MarshaledConfig {
application,
marshal,
shards,
scheme_provider,
strategy,
epocher,
} = cfg;
let clock = Arc::new(context.clone());
let build_histogram = Histogram::new(Buckets::LOCAL);
context.register(
"build_duration",
"Histogram of time taken for the application to build a new block, in seconds",
build_histogram.clone(),
);
let build_duration = Timed::new(build_histogram, clock.clone());
let verify_histogram = Histogram::new(Buckets::LOCAL);
context.register(
"verify_duration",
"Histogram of time taken for the application to verify a block, in seconds",
verify_histogram.clone(),
);
let verify_duration = Timed::new(verify_histogram, clock.clone());
let parent_fetch_histogram = Histogram::new(Buckets::LOCAL);
context.register(
"parent_fetch_duration",
"Histogram of time taken to fetch a parent block in proposal, in seconds",
parent_fetch_histogram.clone(),
);
let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram, clock.clone());
let erasure_histogram = Histogram::new(Buckets::LOCAL);
context.register(
"erasure_encode_duration",
"Histogram of time taken to erasure encode a block, in seconds",
erasure_histogram.clone(),
);
let erasure_encode_duration = Timed::new(erasure_histogram, clock);
Self {
context,
application,
marshal,
shards,
scheme_provider,
strategy,
epocher,
last_built: Arc::new(Mutex::new(None)),
verification_tasks: VerificationTasks::new(),
cached_genesis: Arc::new(OnceLock::new()),
build_duration,
verify_duration,
proposal_parent_fetch_duration,
erasure_encode_duration,
}
}
fn deferred_verify(
&mut self,
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
commitment: Commitment,
prefetched_block: Option<CodedBlock<B, C, H>>,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let verify_duration = self.verify_duration.clone();
let cached_genesis = self.cached_genesis.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("deferred_verify")
.with_attribute("round", consensus_context.round)
.spawn(move |runtime_context| async move {
let round = consensus_context.round;
let (parent_view, parent_commitment) = consensus_context.parent;
let parent_request = fetch_parent(
parent_commitment,
Some(Round::new(consensus_context.epoch(), parent_view)),
&mut application,
&mut marshal,
cached_genesis,
)
.await;
let (parent, block) = if let Some(block) = prefetched_block {
let parent = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping verification"
);
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(reason = "failed to fetch parent", "skipping verification");
return;
}
},
};
(parent, block)
} else {
let block_request = marshal
.subscribe_by_commitment(Some(round), commitment)
.await;
let block_requests = try_join(parent_request, block_request);
select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping verification"
);
return;
},
result = block_requests => match result {
Ok(results) => results,
Err(_) => {
debug!(
reason = "failed to fetch parent or block",
"skipping verification"
);
return;
}
},
}
};
if let Err(err) = validate_block::<H, _, _>(
&epocher,
&block,
&parent,
&consensus_context,
commitment,
parent_commitment,
) {
debug!(
?err,
expected_commitment = %commitment,
block_commitment = %block.commitment(),
expected_parent_commitment = %parent_commitment,
parent_commitment = %parent.commitment(),
expected_parent = %parent.digest(),
block_parent = %block.parent(),
parent_height = %parent.height(),
block_height = %block.height(),
"block failed coded invariant validation"
);
tx.send_lossy(false);
return;
}
let ancestry_stream = AncestorStream::new(
marshal.clone(),
[block.clone().into_inner(), parent.into_inner()],
);
let validity_request = application.verify(
(
runtime_context.with_label("app_verify"),
consensus_context.clone(),
),
ancestry_stream,
);
let mut timer = verify_duration.timer();
let application_valid = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping verification"
);
return;
},
is_valid = validity_request => is_valid,
};
timer.observe();
if application_valid {
marshal.verified(round, block).await;
}
tx.send_lossy(application_valid);
});
rx
}
}
impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = Z::Scheme,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
type Digest = Commitment;
type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
let Some(previous_epoch) = epoch.previous() else {
let genesis_block = self.application.genesis().await;
return genesis_coding_commitment::<H, _>(&genesis_block);
};
let last_height = self
.epocher
.last(previous_epoch)
.expect("previous epoch should exist");
let Some(block) = self.marshal.get_block(last_height).await else {
unreachable!("missing starting epoch block at height {last_height}");
};
block.commitment()
}
async fn propose(
&mut self,
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();
let strategy = self.strategy.clone();
let cached_genesis = self.cached_genesis.clone();
let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
debug!(
round = %consensus_context.round,
"no scheme for epoch, skipping propose"
);
let (_, rx) = oneshot::channel();
return rx;
};
let n_participants =
u16::try_from(scheme.participants().len()).expect("too many participants");
let coding_config = coding_config_for_participants(n_participants);
let build_duration = self.build_duration.clone();
let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
let erasure_encode_duration = self.erasure_encode_duration.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("propose")
.with_attribute("round", consensus_context.round)
.spawn(move |runtime_context| async move {
let (parent_view, parent_commitment) = consensus_context.parent;
let parent_request = fetch_parent(
parent_commitment,
Some(Round::new(consensus_context.epoch(), parent_view)),
&mut application,
&mut marshal,
cached_genesis,
)
.await;
let mut parent_timer = proposal_parent_fetch_duration.timer();
let parent = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(
?parent_commitment,
reason = "failed to fetch parent block",
"skipping proposal"
);
return;
}
},
};
parent_timer.observe();
let last_in_epoch = epocher
.last(consensus_context.epoch())
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let commitment = parent.commitment();
{
let mut lock = last_built.lock();
*lock = Some((consensus_context.round, parent));
}
let success = tx.send_lossy(commitment);
debug!(
round = ?consensus_context.round,
?commitment,
success,
"re-proposed parent block at epoch boundary"
);
return;
}
let ancestor_stream = AncestorStream::new(marshal.clone(), [parent.into_inner()]);
let build_request = application.propose(
(
runtime_context.with_label("app_propose"),
consensus_context.clone(),
),
ancestor_stream,
);
let mut build_timer = build_duration.timer();
let built_block = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = build_request => match result {
Some(block) => block,
None => {
debug!(
?parent_commitment,
reason = "block building failed",
"skipping proposal"
);
return;
}
},
};
build_timer.observe();
let mut erasure_timer = erasure_encode_duration.timer();
let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
erasure_timer.observe();
let commitment = coded_block.commitment();
{
let mut lock = last_built.lock();
*lock = Some((consensus_context.round, coded_block));
}
let success = tx.send_lossy(commitment);
debug!(
round = ?consensus_context.round,
?commitment,
success,
"proposed new block"
);
});
rx
}
async fn verify(
&mut self,
consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
payload: Self::Digest,
) -> oneshot::Receiver<bool> {
let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
debug!(
round = %consensus_context.round,
"no scheme for epoch, skipping verify"
);
let (_, rx) = oneshot::channel();
return rx;
};
let n_participants =
u16::try_from(scheme.participants().len()).expect("too many participants");
let coding_config = coding_config_for_participants(n_participants);
let is_reproposal = payload == consensus_context.parent.1;
let proposal_context = (!is_reproposal).then_some(&consensus_context);
if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
match err {
ProposalError::CodingConfig => {
warn!(
round = %consensus_context.round,
got = ?payload.config(),
expected = ?coding_config,
"rejected proposal with unexpected coding configuration"
);
}
ProposalError::ContextDigest => {
let expected = hash_context::<H, _>(&consensus_context);
let got = payload.context::<H::Digest>();
warn!(
round = %consensus_context.round,
expected = ?expected,
got = ?got,
"rejected proposal with mismatched context digest"
);
}
}
let (tx, rx) = oneshot::channel();
tx.send_lossy(false);
return rx;
}
if is_reproposal {
let block_rx = self
.marshal
.subscribe_by_commitment(Some(consensus_context.round), payload)
.await;
let marshal = self.marshal.clone();
let epocher = self.epocher.clone();
let round = consensus_context.round;
let verification_tasks = self.verification_tasks.clone();
let (task_tx, task_rx) = oneshot::channel();
verification_tasks.insert(round, payload, task_rx);
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("verify_reproposal")
.spawn(move |_| async move {
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping re-proposal verification"
);
return;
},
block = block_rx => match block {
Ok(block) => block,
Err(_) => {
debug!(
?payload,
reason = "failed to fetch block for re-proposal verification",
"skipping re-proposal verification"
);
return;
}
},
};
if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
debug!(
height = %block.height(),
"re-proposal is not at epoch boundary"
);
task_tx.send_lossy(false);
tx.send_lossy(false);
return;
}
marshal.verified(round, block).await;
task_tx.send_lossy(true);
tx.send_lossy(true);
});
return rx;
}
self.shards
.discovered(
payload,
consensus_context.leader.clone(),
consensus_context.round,
)
.await;
let round = consensus_context.round;
let task = self.deferred_verify(consensus_context, payload, None);
self.verification_tasks.insert(round, payload, task);
match scheme.me() {
Some(_) => {
let validity_rx = self.shards.subscribe_assigned_shard_verified(payload).await;
let (tx, rx) = oneshot::channel();
self.context
.with_label("shard_validity_wait")
.spawn(|_| async move {
if validity_rx.await.is_ok() {
tx.send_lossy(true);
}
});
rx
}
None => {
let (tx, rx) = oneshot::channel();
tx.send_lossy(true);
rx
}
}
}
}
impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: VerifyingApplication<
E,
Block = B,
SigningScheme = Z::Scheme,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
let task = self.verification_tasks.take(round, payload);
if let Some(task) = task {
return task;
}
debug!(
?round,
?payload,
"subscribing to block for certification using embedded context"
);
let block_rx = self
.marshal
.subscribe_by_commitment(Some(round), payload)
.await;
let mut marshaled = self.clone();
let shards = self.shards.clone();
let (mut tx, rx) = oneshot::channel();
self.context
.with_label("certify")
.with_attribute("round", round)
.spawn(move |_| async move {
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = block_rx => match result {
Ok(block) => block,
Err(_) => {
debug!(
?payload,
reason = "failed to fetch block for certification",
"skipping certification"
);
return;
}
},
};
let embedded_context = block.context();
let is_reproposal = is_inferred_reproposal_at_certify(
&marshaled.epocher,
block.height(),
embedded_context.round,
round,
);
if is_reproposal {
marshaled.marshal.verified(round, block).await;
tx.send_lossy(true);
return;
}
shards
.discovered(
payload,
embedded_context.leader.clone(),
embedded_context.round,
)
.await;
let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
});
rx
}
}
impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
type Digest = Commitment;
type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
type Plan = Plan<Self::PublicKey>;
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
match plan {
Plan::Propose => {
let Some((round, block)) = self.last_built.lock().take() else {
warn!("missing block to broadcast");
return;
};
if block.commitment() != commitment {
warn!(
round = %round,
commitment = %block.commitment(),
height = %block.height(),
"skipping requested broadcast of block with mismatched commitment"
);
return;
}
debug!(
round = %round,
commitment = %block.commitment(),
height = %block.height(),
"requested broadcast of built block"
);
self.shards.proposed(round, block).await;
}
Plan::Forward { .. } => {
}
}
}
}
impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
> + Reporter<Activity = Update<B>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
type Activity = A::Activity;
async fn report(&mut self, update: Self::Activity) {
if let Update::Tip(round, _, _) = &update {
self.verification_tasks.retain_after(round);
}
self.application.report(update).await
}
}
#[allow(clippy::type_complexity)]
async fn fetch_parent<E, S, A, B, C, H>(
parent_commitment: Commitment,
parent_round: Option<Round>,
application: &mut A,
marshal: &mut core::Mailbox<S, Coding<B, C, H, S::PublicKey>>,
cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
) -> Either<Ready<Result<CodedBlock<B, C, H>, RecvError>>, oneshot::Receiver<CodedBlock<B, C, H>>>
where
E: Rng + Spawner + Metrics + Clock,
S: CertificateScheme,
A: Application<E, Block = B, Context = Context<Commitment, S::PublicKey>>,
B: CertifiableBlock<Context = Context<Commitment, S::PublicKey>>,
C: CodingScheme,
H: Hasher,
{
if cached_genesis.get().is_none() {
let genesis = application.genesis().await;
let genesis_coding_commitment = genesis_coding_commitment::<H, _>(&genesis);
let coded_genesis = CodedBlock::<B, C, H>::new_trusted(genesis, genesis_coding_commitment);
let _ = cached_genesis.set((genesis_coding_commitment, coded_genesis));
}
let (genesis_commitment, coded_genesis) = cached_genesis
.get()
.expect("genesis cache should be initialized");
if parent_commitment == *genesis_commitment {
Either::Left(ready(Ok(coded_genesis.clone())))
} else {
Either::Right(
marshal
.subscribe_by_commitment(parent_round, parent_commitment)
.await,
)
}
}
fn genesis_coding_commitment<H: Hasher, B: CertifiableBlock>(block: &B) -> Commitment {
Commitment::from((
block.digest(),
block.digest(),
hash_context::<H, _>(&block.context()),
GENESIS_CODING_CONFIG,
))
}