use crate::{
marshal::{
application::{
validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage},
verification_tasks::VerificationTasks,
},
coding::{
shards,
types::{coding_config_for_participants, hash_context, CodedBlock},
validation::{validate_block, validate_proposal, ProposalError},
Coding,
},
core, Update,
},
simplex::{scheme::Scheme, types::Context, Plan},
types::{coding::Commitment, Epoch, Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, CertifiableBlock, Epochable, Heightable,
Relay, Reporter,
};
use commonware_actor::Feedback;
use commonware_coding::Scheme as CodingScheme;
use commonware_cryptography::{
certificate::{Provider, Scheme as CertificateScheme},
Committable, Digestible, Hasher,
};
use commonware_macros::select;
use commonware_p2p::Recipients;
use commonware_parallel::Strategy;
use commonware_runtime::{
telemetry::metrics::{
histogram::{Buckets, Timed},
MetricsExt as _,
},
Clock, Metrics, Spawner, Storage,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
sync::AsyncMutex,
};
use rand::Rng;
use std::sync::Arc;
use tracing::{debug, warn};
#[allow(clippy::type_complexity)]
pub struct MarshaledConfig<A, B, C, H, Z, S, ES>
where
B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
pub application: A,
pub marshal:
core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
pub shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
pub scheme_provider: Z,
pub strategy: S,
pub epocher: ES,
}
#[allow(clippy::type_complexity)]
pub struct Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<E>,
B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
context: Arc<AsyncMutex<E>>,
application: A,
marshal: core::Mailbox<Z::Scheme, Coding<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>>,
shards: shards::Mailbox<B, C, H, <Z::Scheme as CertificateScheme>::PublicKey>,
scheme_provider: Z,
epocher: ES,
strategy: S,
verification_tasks: VerificationTasks<Commitment>,
build_duration: Timed,
verify_duration: Timed,
proposal_parent_fetch_duration: Timed,
ancestor_fetch_duration: Timed,
erasure_encode_duration: Timed,
}
impl<E, A, B, C, H, Z, S, ES> Clone for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<E>,
B: CertifiableBlock<Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
fn clone(&self) -> Self {
Self {
context: self.context.clone(),
application: self.application.clone(),
marshal: self.marshal.clone(),
shards: self.shards.clone(),
scheme_provider: self.scheme_provider.clone(),
epocher: self.epocher.clone(),
strategy: self.strategy.clone(),
verification_tasks: self.verification_tasks.clone(),
build_duration: self.build_duration.clone(),
verify_duration: self.verify_duration.clone(),
proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
erasure_encode_duration: self.erasure_encode_duration.clone(),
}
}
}
impl<E, A, B, C, H, Z, S, ES> Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
SigningScheme = Z::Scheme,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
pub fn new(context: E, cfg: MarshaledConfig<A, B, C, H, Z, S, ES>) -> Self {
let MarshaledConfig {
application,
marshal,
shards,
scheme_provider,
strategy,
epocher,
} = cfg;
let build_histogram = context.histogram(
"build_duration",
"Histogram of time taken for the application to build a new block, in seconds",
Buckets::LOCAL,
);
let build_duration = Timed::new(build_histogram);
let verify_histogram = context.histogram(
"verify_duration",
"Histogram of time taken for the application to verify a block, in seconds",
Buckets::LOCAL,
);
let verify_duration = Timed::new(verify_histogram);
let parent_fetch_histogram = context.histogram(
"parent_fetch_duration",
"Histogram of time taken to fetch a parent block in proposal, in seconds",
Buckets::LOCAL,
);
let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
let ancestor_fetch_histogram = context.histogram(
"ancestor_fetch_duration",
"Histogram of time taken to fetch a block via the ancestry stream, in seconds",
Buckets::LOCAL,
);
let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
let erasure_histogram = context.histogram(
"erasure_encode_duration",
"Histogram of time taken to erasure encode a block, in seconds",
Buckets::LOCAL,
);
let erasure_encode_duration = Timed::new(erasure_histogram);
Self {
context: Arc::new(AsyncMutex::new(context)),
application,
marshal,
shards,
scheme_provider,
strategy,
epocher,
verification_tasks: VerificationTasks::new(),
build_duration,
verify_duration,
proposal_parent_fetch_duration,
ancestor_fetch_duration,
erasure_encode_duration,
}
}
async fn deferred_verify(
&mut self,
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
commitment: Commitment,
prefetched_block: Option<CodedBlock<B, C, H>>,
stage: Stage,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let verify_duration = self.verify_duration.clone();
let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("deferred_verify")
.with_attribute("round", consensus_context.round);
context.spawn(move |runtime_context| async move {
let round = consensus_context.round;
let (parent_view, parent_commitment) = consensus_context.parent;
let block = if let Some(block) = prefetched_block {
block
} else {
let block_request =
marshal.subscribe_by_commitment(commitment, core::CommitmentFallback::Wait);
select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping verification"
);
return;
},
result = block_request => match result {
Ok(block) => block,
Err(_) => {
debug!(
reason = "block unavailable",
"skipping verification"
);
return;
}
},
}
};
let fallback = core::CommitmentFallback::FetchByRound {
round: Round::new(consensus_context.epoch(), parent_view),
};
let parent_request = marshal.subscribe_by_commitment(parent_commitment, fallback);
let parent = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping verification"
);
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(reason = "failed to fetch parent", "skipping verification");
return;
}
},
};
if let Err(err) = validate_block::<H, _, _>(
&epocher,
&block,
&parent,
&consensus_context,
commitment,
parent_commitment,
) {
debug!(
?err,
expected_commitment = %commitment,
block_commitment = %block.commitment(),
expected_parent_commitment = %parent_commitment,
parent_commitment = %parent.commitment(),
expected_parent = %parent.digest(),
block_parent = %block.parent(),
parent_height = %parent.height(),
block_height = %block.height(),
"block failed coded invariant validation"
);
tx.send_lossy(false);
return;
}
let ancestry_stream = marshal.ancestor_stream(
Arc::new(runtime_context.child("ancestor_stream")),
[block.clone(), parent],
ancestor_fetch_duration,
);
let validity_request = application.verify(
(
runtime_context.child("app_verify"),
consensus_context.clone(),
),
ancestry_stream,
);
let timer = verify_duration.timer(&runtime_context);
let application_valid = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping verification"
);
return;
},
is_valid = validity_request => is_valid,
};
timer.observe(&runtime_context);
if application_valid && !stage.store(&mut marshal, round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
tx.send_lossy(application_valid);
});
rx
}
async fn certify_from_embedded_context(
&mut self,
round: Round,
payload: Commitment,
) -> oneshot::Receiver<bool> {
self.shards.notarized(payload, round);
debug!(
?round,
?payload,
"subscribing to block for certification using embedded context"
);
let block_rx = self
.marshal
.subscribe_by_commitment(payload, core::CommitmentFallback::FetchByRound { round });
let mut marshaled = self.clone();
let shards = self.shards.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("certify")
.with_attribute("round", round);
context.spawn(move |_| async move {
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = block_rx => match result {
Ok(block) => block,
Err(_) => {
debug!(
?payload,
reason = "failed to fetch block for certification",
"skipping certification"
);
return;
}
},
};
let embedded_context = block.context();
let is_reproposal = is_inferred_reproposal_at_certify(
&marshaled.epocher,
block.height(),
embedded_context.round,
round,
);
if is_reproposal {
if !marshaled.marshal.certified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
tx.send_lossy(true);
return;
}
shards.discovered(
payload,
embedded_context.leader.clone(),
embedded_context.round,
);
let verify_rx = marshaled
.deferred_verify(embedded_context, payload, Some(block), Stage::Certified)
.await;
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
});
rx
}
async fn certify_from_existing_task(
&mut self,
round: Round,
payload: Commitment,
task: oneshot::Receiver<bool>,
) -> oneshot::Receiver<bool> {
self.shards.notarized(payload, round);
self.marshal.hint_notarized(round, payload);
let mut marshaled = self.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("certify_existing")
.with_attribute("round", round);
context.spawn(move |_| async move {
let result = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = task => result,
};
match result {
Ok(result) => {
tx.send_lossy(result);
}
Err(_) => {
debug!(
?round,
?payload,
"verification task closed before certification, falling back to embedded context"
);
let fallback = marshaled.certify_from_embedded_context(round, payload).await;
let result = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping certification"
);
return;
},
result = fallback => result,
};
if let Ok(result) = result {
tx.send_lossy(result);
}
}
}
});
rx
}
}
impl<E, A, B, C, H, Z, S, ES> Automaton for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
SigningScheme = Z::Scheme,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
type Digest = Commitment;
type Context = Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>;
async fn propose(
&mut self,
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let strategy = self.strategy.clone();
let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
debug!(
round = %consensus_context.round,
"no scheme for epoch, skipping propose"
);
let (_, rx) = oneshot::channel();
return rx;
};
let n_participants =
u16::try_from(scheme.participants().len()).expect("too many participants");
let coding_config = coding_config_for_participants(n_participants);
let build_duration = self.build_duration.clone();
let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
let erasure_encode_duration = self.erasure_encode_duration.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("propose")
.with_attribute("round", consensus_context.round);
context.spawn(move |runtime_context| async move {
if let Some(block) = marshal.get_verified(consensus_context.round).await {
let block_context = block.context();
if block_context != consensus_context {
debug!(
round = ?consensus_context.round,
?consensus_context,
?block_context,
"skipping proposal: cached verified block context no longer matches"
);
return;
}
let commitment = block.commitment();
let round = consensus_context.round;
let success = tx.send_lossy(commitment);
debug!(
?round,
?commitment,
success,
"reused verified block from marshal on leader recovery"
);
return;
}
let (parent_view, parent_commitment) = consensus_context.parent;
let parent_request = marshal.subscribe_by_commitment(
parent_commitment,
core::CommitmentFallback::FetchByRound {
round: Round::new(consensus_context.epoch(), parent_view),
},
);
let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
let parent = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(
?parent_commitment,
reason = "failed to fetch parent block",
"skipping proposal"
);
return;
}
},
};
parent_timer.observe(&runtime_context);
let last_in_epoch = epocher
.last(consensus_context.epoch())
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let commitment = parent.commitment();
let round = consensus_context.round;
if !marshal.verified(round, parent).await {
debug!(
?round,
?commitment,
"marshal rejected re-proposed boundary block"
);
return;
}
let success = tx.send_lossy(commitment);
debug!(
?round,
?commitment,
success,
"re-proposed parent block at epoch boundary"
);
return;
}
let ancestor_stream = marshal.ancestor_stream(
Arc::new(runtime_context.child("ancestor_stream")),
[parent],
ancestor_fetch_duration,
);
let build_request = application.propose(
(
runtime_context.child("app_propose"),
consensus_context.clone(),
),
ancestor_stream,
);
let build_timer = build_duration.timer(&runtime_context);
let built_block = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = build_request => match result {
Some(block) => block,
None => {
debug!(
?parent_commitment,
reason = "block building failed",
"skipping proposal"
);
return;
}
},
};
build_timer.observe(&runtime_context);
let erasure_timer = erasure_encode_duration.timer(&runtime_context);
let coded_block = CodedBlock::<B, C, H>::new(built_block, coding_config, &strategy);
erasure_timer.observe(&runtime_context);
let commitment = coded_block.commitment();
let round = consensus_context.round;
if !marshal.proposed(round, coded_block).await {
debug!(?round, ?commitment, "marshal rejected proposed block");
return;
}
let success = tx.send_lossy(commitment);
debug!(?round, ?commitment, success, "proposed new block");
});
rx
}
async fn verify(
&mut self,
consensus_context: Context<Self::Digest, <Z::Scheme as CertificateScheme>::PublicKey>,
payload: Self::Digest,
) -> oneshot::Receiver<bool> {
let Some(scheme) = self.scheme_provider.scoped(consensus_context.epoch()) else {
debug!(
round = %consensus_context.round,
"no scheme for epoch, skipping verify"
);
let (_, rx) = oneshot::channel();
return rx;
};
let n_participants =
u16::try_from(scheme.participants().len()).expect("too many participants");
let coding_config = coding_config_for_participants(n_participants);
let is_reproposal = payload == consensus_context.parent.1;
let proposal_context = (!is_reproposal).then_some(&consensus_context);
if let Err(err) = validate_proposal::<H, _>(payload, coding_config, proposal_context) {
match err {
ProposalError::CodingConfig => {
warn!(
round = %consensus_context.round,
got = ?payload.config(),
expected = ?coding_config,
"rejected proposal with unexpected coding configuration"
);
}
ProposalError::ContextDigest => {
let expected = hash_context::<H, _>(&consensus_context);
let got = payload.context::<H::Digest>();
warn!(
round = %consensus_context.round,
expected = ?expected,
got = ?got,
"rejected proposal with mismatched context digest"
);
}
}
let (tx, rx) = oneshot::channel();
tx.send_lossy(false);
return rx;
}
if is_reproposal {
let block_rx = self
.marshal
.subscribe_by_commitment(payload, core::CommitmentFallback::Wait);
let marshal = self.marshal.clone();
let epocher = self.epocher.clone();
let round = consensus_context.round;
let verification_tasks = self.verification_tasks.clone();
let (task_tx, task_rx) = oneshot::channel();
verification_tasks.insert(round, payload, task_rx);
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("verify_reproposal")
.with_attribute("round", round);
context.spawn(move |_| {
async move {
let block = select! {
_ = tx.closed() => {
debug!(
reason = "consensus dropped receiver",
"skipping re-proposal verification"
);
return;
},
block = block_rx => match block {
Ok(block) => block,
Err(_) => {
debug!(
?payload,
reason = "failed to fetch block for re-proposal verification",
"skipping re-proposal verification"
);
return;
}
},
};
if !is_valid_reproposal_at_verify(&epocher, block.height(), round.epoch()) {
debug!(
height = %block.height(),
"re-proposal is not at epoch boundary"
);
task_tx.send_lossy(false);
tx.send_lossy(false);
return;
}
if !marshal.verified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
task_tx.send_lossy(true);
tx.send_lossy(true);
}
});
return rx;
}
self.shards.discovered(
payload,
consensus_context.leader.clone(),
consensus_context.round,
);
let round = consensus_context.round;
let task = self
.deferred_verify(consensus_context, payload, None, Stage::Verified)
.await;
self.verification_tasks.insert(round, payload, task);
match scheme.me() {
Some(_) => {
let validity_rx = self.shards.subscribe_assigned_shard_verified(payload);
let (tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("shard_validity_wait")
.with_attribute("round", round);
context.spawn(|_| async move {
if validity_rx.await.is_ok() {
tx.send_lossy(true);
}
});
rx
}
None => {
let (tx, rx) = oneshot::channel();
tx.send_lossy(true);
rx
}
}
}
}
impl<E, A, B, C, H, Z, S, ES> CertifiableAutomaton for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
SigningScheme = Z::Scheme,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
let task = self.verification_tasks.take(round, payload);
if let Some(task) = task {
return self.certify_from_existing_task(round, payload, task).await;
}
self.certify_from_embedded_context(round, payload).await
}
}
impl<E, A, B, C, H, Z, S, ES> Relay for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
type Digest = Commitment;
type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
type Plan = Plan<Self::PublicKey>;
fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> Feedback {
let Plan::Propose { round } = plan else {
return Feedback::Ok;
};
self.marshal.forward(round, commitment, Recipients::All)
}
}
impl<E, A, B, C, H, Z, S, ES> Reporter for Marshaled<E, A, B, C, H, Z, S, ES>
where
E: Rng + Storage + Spawner + Metrics + Clock,
A: Application<
E,
Block = B,
Context = Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
> + Reporter<Activity = Update<B>>,
B: CertifiableBlock<Context = <A as Application<E>>::Context>,
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
ES: Epocher,
{
type Activity = A::Activity;
fn report(&mut self, update: Self::Activity) -> Feedback {
if let Update::Tip(round, _, _) = &update {
self.verification_tasks.retain_after(round);
}
self.application.report(update)
}
}