use crate::{
marshal::{
application::validation::Stage,
core::{CommitmentFallback, DigestFallback, Mailbox},
standard::{
validation::{precheck_epoch_and_reproposal, verify_with_parent, Decision},
Standard,
},
Update,
},
simplex::{types::Context, Plan},
types::{Epocher, Round},
Application, Automaton, Block, CertifiableAutomaton, Epochable, Relay, Reporter,
};
use commonware_actor::Feedback;
use commonware_cryptography::certificate::Scheme;
use commonware_macros::select;
use commonware_p2p::Recipients;
use commonware_runtime::{
telemetry::metrics::{
histogram::{Buckets, Timed},
MetricsExt as _,
},
Clock, Metrics, Spawner,
};
use commonware_utils::{
channel::{fallible::OneshotExt, oneshot},
sync::{AsyncMutex, Mutex},
};
use rand::Rng;
use std::{collections::BTreeSet, sync::Arc};
use tracing::debug;
type AvailableBlocks<D> = Arc<Mutex<BTreeSet<(Round, D)>>>;
async fn await_block_subscription<T, D>(
tx: &mut oneshot::Sender<bool>,
block_rx: oneshot::Receiver<T>,
digest: &D,
stage: &'static str,
) -> Option<T>
where
D: std::fmt::Debug + ?Sized,
{
select! {
_ = tx.closed() => {
debug!(
stage,
reason = "consensus dropped receiver",
"skipping block wait"
);
None
},
result = block_rx => {
if result.is_err() {
debug!(
stage,
?digest,
reason = "failed to fetch block",
"skipping block wait"
);
}
result.ok()
},
}
}
pub struct Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E>,
B: Block + Clone,
ES: Epocher,
{
context: Arc<AsyncMutex<E>>,
application: A,
marshal: Mailbox<S, Standard<B>>,
epocher: ES,
available_blocks: AvailableBlocks<B::Digest>,
build_duration: Timed,
proposal_parent_fetch_duration: Timed,
ancestor_fetch_duration: Timed,
}
impl<E, S, A, B, ES> Clone for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E>,
B: Block + Clone,
ES: Epocher,
{
fn clone(&self) -> Self {
Self {
context: self.context.clone(),
application: self.application.clone(),
marshal: self.marshal.clone(),
epocher: self.epocher.clone(),
available_blocks: self.available_blocks.clone(),
build_duration: self.build_duration.clone(),
proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(),
ancestor_fetch_duration: self.ancestor_fetch_duration.clone(),
}
}
}
impl<E, S, A, B, ES> Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: Block + Clone,
ES: Epocher,
{
pub fn new(context: E, application: A, marshal: Mailbox<S, Standard<B>>, epocher: ES) -> Self {
let build_histogram = context.histogram(
"build_duration",
"Histogram of time taken for the application to build a new block, in seconds",
Buckets::LOCAL,
);
let build_duration = Timed::new(build_histogram);
let parent_fetch_histogram = context.histogram(
"parent_fetch_duration",
"Histogram of time taken to fetch a parent block in propose, in seconds",
Buckets::LOCAL,
);
let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram);
let ancestor_fetch_histogram = context.histogram(
"ancestor_fetch_duration",
"Histogram of time taken to fetch a block via the ancestry stream, in seconds",
Buckets::LOCAL,
);
let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram);
Self {
context: Arc::new(AsyncMutex::new(context)),
application,
marshal,
epocher,
available_blocks: Arc::new(Mutex::new(BTreeSet::new())),
build_duration,
proposal_parent_fetch_duration,
ancestor_fetch_duration,
}
}
}
impl<E, S, A, B, ES> Automaton for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: Block + Clone,
ES: Epocher,
{
type Digest = B::Digest;
type Context = Context<Self::Digest, S::PublicKey>;
async fn propose(
&mut self,
consensus_context: Context<Self::Digest, S::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let build_duration = self.build_duration.clone();
let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone();
let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("propose")
.with_attribute("round", consensus_context.round);
context.spawn(move |runtime_context| async move {
if marshal
.get_verified(consensus_context.round)
.await
.is_some()
{
debug!(
round = ?consensus_context.round,
"skipping proposal: verified block already exists for round on restart"
);
return;
}
let (parent_view, parent_commitment) = consensus_context.parent;
let parent_request = marshal.subscribe_by_commitment(
parent_commitment,
CommitmentFallback::FetchByRound {
round: Round::new(consensus_context.epoch(), parent_view),
},
);
let parent_timer = proposal_parent_fetch_duration.timer(&runtime_context);
let parent = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = parent_request => match result {
Ok(parent) => parent,
Err(_) => {
debug!(
?parent_commitment,
reason = "failed to fetch parent block",
"skipping proposal"
);
return;
}
},
};
parent_timer.observe(&runtime_context);
let last_in_epoch = epocher
.last(consensus_context.epoch())
.expect("current epoch should exist");
if parent.height() == last_in_epoch {
let digest = parent.digest();
if !marshal.verified(consensus_context.round, parent).await {
debug!(
round = ?consensus_context.round,
?digest,
"marshal rejected re-proposed boundary block"
);
return;
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"re-proposed parent block at epoch boundary"
);
return;
}
let ancestor_stream = marshal.ancestor_stream(
Arc::new(runtime_context.child("ancestor_stream")),
[parent],
ancestor_fetch_duration,
);
let build_request = application.propose(
(
runtime_context.child("app_propose"),
consensus_context.clone(),
),
ancestor_stream,
);
let build_timer = build_duration.timer(&runtime_context);
let built_block = select! {
_ = tx.closed() => {
debug!(reason = "consensus dropped receiver", "skipping proposal");
return;
},
result = build_request => match result {
Some(block) => block,
None => {
debug!(
?parent_commitment,
reason = "block building failed",
"skipping proposal"
);
return;
}
},
};
build_timer.observe(&runtime_context);
let digest = built_block.digest();
if !marshal.proposed(consensus_context.round, built_block).await {
debug!(
round = ?consensus_context.round,
?digest,
"marshal rejected proposed block"
);
return;
}
let success = tx.send_lossy(digest);
debug!(
round = ?consensus_context.round,
?digest,
success,
"proposed new block"
);
});
rx
}
async fn verify(
&mut self,
context: Context<Self::Digest, S::PublicKey>,
digest: Self::Digest,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let epocher = self.epocher.clone();
let available_blocks = self.available_blocks.clone();
let ancestor_fetch_duration = self.ancestor_fetch_duration.clone();
let (mut tx, rx) = oneshot::channel();
let runtime_context = self
.context
.lock()
.await
.child("inline_verify")
.with_attribute("round", context.round);
runtime_context.spawn(move |runtime_context| async move {
let block_request = marshal.subscribe_by_digest(digest, DigestFallback::Wait);
let Some(block) =
await_block_subscription(&mut tx, block_request, &digest, "verification").await
else {
return;
};
let Some(decision) =
precheck_epoch_and_reproposal(&epocher, &mut marshal, &context, digest, block)
.await
else {
return;
};
let block = match decision {
Decision::Complete(valid) => {
if valid {
available_blocks.lock().insert((context.round, digest));
}
tx.send_lossy(valid);
return;
}
Decision::Continue(block) => block,
};
let round = context.round;
let application_valid = match verify_with_parent(
runtime_context,
context,
block,
&mut application,
&mut marshal,
&mut tx,
Stage::Verified,
ancestor_fetch_duration,
)
.await
{
Some(valid) => valid,
None => return,
};
if application_valid {
available_blocks.lock().insert((round, digest));
}
tx.send_lossy(application_valid);
});
rx
}
}
impl<E, S, A, B, ES> CertifiableAutomaton for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: Block + Clone,
ES: Epocher,
{
async fn certify(&mut self, round: Round, digest: Self::Digest) -> oneshot::Receiver<bool> {
if self.available_blocks.lock().contains(&(round, digest)) {
let (tx, rx) = oneshot::channel();
tx.send_lossy(true);
return rx;
}
let block_rx = self
.marshal
.subscribe_by_digest(digest, DigestFallback::FetchByRound { round });
let marshal = self.marshal.clone();
let (mut tx, rx) = oneshot::channel();
let context = self
.context
.lock()
.await
.child("inline_certify")
.with_attribute("round", round);
context.spawn(move |_| async move {
let Some(block) =
await_block_subscription(&mut tx, block_rx, &digest, "certification").await
else {
return;
};
if marshal.certified(round, block).await {
tx.send_lossy(true);
}
});
rx
}
}
impl<E, S, A, B, ES> Relay for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>,
B: Block + Clone,
ES: Epocher,
{
type Digest = B::Digest;
type PublicKey = S::PublicKey;
type Plan = Plan<S::PublicKey>;
fn broadcast(&mut self, commitment: Self::Digest, plan: Plan<S::PublicKey>) -> Feedback {
let (round, recipients) = match plan {
Plan::Propose { round } => (round, Recipients::All),
Plan::Forward { round, recipients } => (round, recipients),
};
self.marshal.forward(round, commitment, recipients)
}
}
impl<E, S, A, B, ES> Reporter for Inline<E, S, A, B, ES>
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, Context = Context<B::Digest, S::PublicKey>>
+ Reporter<Activity = Update<B>>,
B: Block + Clone,
ES: Epocher,
{
type Activity = A::Activity;
fn report(&mut self, update: Self::Activity) -> Feedback {
if let Update::Tip(tip_round, _, _) = &update {
self.available_blocks
.lock()
.retain(|(round, _)| round > tip_round);
}
self.application.report(update)
}
}
#[cfg(test)]
mod tests {
use super::Inline;
use crate::{
marshal::mocks::{
harness::{
default_leader, make_raw_block, setup_network_with_participants, Ctx,
StandardHarness, TestHarness, B, BLOCKS_PER_EPOCH, NAMESPACE, NUM_VALIDATORS, S, V,
},
verifying::{GatedVerifyingApp, MockVerifyingApp},
},
simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context},
types::{Epoch, FixedEpocher, Height, Round, View},
Application, Automaton, Block, CertifiableAutomaton, Relay,
};
use commonware_broadcast::Broadcaster;
use commonware_cryptography::{
certificate::{mocks::Fixture, ConstantProvider, Scheme},
sha256::Sha256,
Digestible, Hasher as _,
};
use commonware_macros::{select, test_traced};
use commonware_runtime::{deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _};
use commonware_utils::{channel::fallible::OneshotExt, NZUsize};
use rand::Rng;
use std::time::Duration;
#[allow(dead_code)]
fn assert_non_certifiable_block_supported<E, S, A, B, ES>()
where
E: Rng + Spawner + Metrics + Clock,
S: Scheme,
A: Application<E, Block = B, SigningScheme = S, Context = Context<B::Digest, S::PublicKey>>,
B: Block + Clone,
ES: crate::types::Epocher,
{
fn assert_automaton<T: Automaton>() {}
fn assert_certifiable<T: CertifiableAutomaton>() {}
fn assert_relay<T: Relay>() {}
assert_automaton::<Inline<E, S, A, B, ES>>();
assert_certifiable::<Inline<E, S, A, B, ES>>();
assert_relay::<Inline<E, S, A, B, ES>>();
}
#[test_traced("INFO")]
fn test_certify_returns_immediately_after_verify_fetches_block() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut inline = Inline::new(
context.child("inline"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = Ctx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
assert!(marshal.verified(parent_round, parent).await);
let round = Round::new(Epoch::zero(), View::new(2));
let verify_context = Ctx {
round,
leader: me,
parent: (View::new(1), parent_digest),
};
let block =
B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
let digest = block.digest();
assert!(marshal.verified(round, block).await);
let verify_rx = inline.verify(verify_context, digest).await;
assert!(
verify_rx.await.unwrap(),
"verify should complete successfully before certify"
);
let certify_rx = inline.certify(round, digest).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify should return immediately once verify has fetched the block"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should not hang after local verify completed");
},
}
});
}
#[test_traced("INFO")]
fn test_certify_succeeds_without_verify_task() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut inline = Inline::new(
context.child("inline"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent_round = Round::new(Epoch::zero(), View::new(1));
let parent_ctx = Ctx {
round: parent_round,
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
};
let parent = B::new::<Sha256>(parent_ctx, genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
assert!(marshal.verified(parent_round, parent).await);
let round = Round::new(Epoch::zero(), View::new(2));
let verify_context = Ctx {
round,
leader: me,
parent: (View::new(1), parent_digest),
};
let block =
B::new::<Sha256>(verify_context.clone(), parent_digest, Height::new(2), 200);
let digest = block.digest();
assert!(marshal.verified(round, block).await);
let certify_rx = inline.certify(round, digest).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify should resolve once block availability is known"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should not hang when block is already available in marshal");
},
}
});
}
#[test_traced("INFO")]
fn test_certify_reproposal_uses_available_blocks_after_verify() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.child("network"), NZUsize!(1), participants.clone())
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let marshal_actor_handle = setup.actor_handle;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut inline = Inline::new(context.child("inline"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
let boundary_round = Round::new(Epoch::zero(), View::new(boundary_height.get()));
let boundary_block = B::new::<Sha256>(
Ctx {
round: boundary_round,
leader: default_leader(),
parent: (View::zero(), genesis.digest()),
},
genesis.digest(),
boundary_height,
1900,
);
let boundary_digest = boundary_block.digest();
assert!(marshal.verified(boundary_round, boundary_block).await);
let reproposal_round = Round::new(Epoch::zero(), View::new(boundary_height.get() + 1));
let reproposal_context = Ctx {
round: reproposal_round,
leader: me,
parent: (View::new(boundary_height.get()), boundary_digest),
};
let verify_rx = inline.verify(reproposal_context, boundary_digest).await;
assert!(
verify_rx.await.unwrap(),
"verify should accept a valid boundary re-proposal"
);
marshal_actor_handle.abort();
drop(marshal);
context.sleep(Duration::from_millis(1)).await;
let certify_rx = inline.certify(reproposal_round, boundary_digest).await;
select! {
result = certify_rx => {
assert!(
result.unwrap(),
"certify should use the available_blocks fast path for verified re-proposals"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should not depend on marshal after verify cached a re-proposal");
},
}
});
}
#[test_traced("WARN")]
fn test_inline_verify_persists_block_before_resolving() {
for seed in 0u64..16 {
inline_verify_persists_block_before_resolving_at(seed);
}
}
fn inline_verify_persists_block_before_resolving_at(seed: u64) {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(60))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let buffer = setup.extra;
let actor_handle = setup.actor_handle;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut inline = Inline::new(
context.child("inline"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = Ctx {
round: child_round,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
let child_digest = child.digest();
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), parent.clone())
.accepted(),
"buffer broadcast for parent should be accepted"
);
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), child.clone())
.accepted(),
"buffer broadcast for child should be accepted"
);
let verify_result = inline
.verify(child_ctx, child_digest)
.await
.await
.expect("verify result missing");
assert!(verify_result, "inline verify should pass");
actor_handle.abort();
drop(inline);
drop(marshal);
drop(buffer);
let setup2 = StandardHarness::setup_validator(
context
.child("validator_restart")
.with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal2 = setup2.mailbox;
let post_restart = marshal2.get_block(&child_digest).await;
assert!(
post_restart.is_some(),
"verify resolved true so block must be durably persisted (seed={seed})"
);
});
}
#[test_traced("WARN")]
fn test_inline_certify_persists_block_before_resolving() {
for seed in 0u64..16 {
inline_certify_persists_block_before_resolving_at(seed);
}
}
fn inline_certify_persists_block_before_resolving_at(seed: u64) {
let runner = deterministic::Runner::new(
deterministic::Config::new()
.with_seed(seed)
.with_timeout(Some(Duration::from_secs(60))),
);
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let buffer = setup.extra;
let actor_handle = setup.actor_handle;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let mock_app: MockVerifyingApp<B, S> = MockVerifyingApp::new();
let mut inline = Inline::new(
context.child("inline"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = Ctx {
round: child_round,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
let child_digest = child.digest();
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), parent.clone())
.accepted(),
"buffer broadcast for parent should be accepted"
);
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), child.clone())
.accepted(),
"buffer broadcast for child should be accepted"
);
let verify_rx = inline.verify(child_ctx, child_digest).await;
let certify_result = inline
.certify(child_round, child_digest)
.await
.await
.expect("certify result missing");
assert!(certify_result, "certify should succeed");
actor_handle.abort();
drop(verify_rx);
drop(inline);
drop(marshal);
drop(buffer);
let setup2 = StandardHarness::setup_validator(
context
.child("validator_restart")
.with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal2 = setup2.mailbox;
let post_restart = marshal2.get_block(&child_digest).await;
assert!(
post_restart.is_some(),
"certify resolved true so block must be durably persisted (seed={seed})"
);
});
}
#[test_traced("WARN")]
fn test_inline_certify_does_not_bypass_failed_verify_persistence() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal = setup.mailbox;
let buffer = setup.extra;
let marshal_actor_handle = setup.actor_handle;
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let (mock_app, verify_started, release_verify): (GatedVerifyingApp<B, S>, _, _) =
GatedVerifyingApp::new();
let mut inline = Inline::new(
context.child("inline"),
mock_app,
marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let parent = make_raw_block(genesis.digest(), Height::new(1), 100);
let parent_digest = parent.digest();
let child_round = Round::new(Epoch::zero(), View::new(2));
let child_ctx = Ctx {
round: child_round,
leader: me.clone(),
parent: (View::new(1), parent_digest),
};
let child = B::new::<Sha256>(child_ctx.clone(), parent_digest, Height::new(2), 200);
let child_digest = child.digest();
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), parent)
.accepted(),
"buffer broadcast for parent should be accepted"
);
assert!(
buffer
.broadcast(commonware_p2p::Recipients::Some(vec![]), child)
.accepted(),
"buffer broadcast for child should be accepted"
);
let verify_rx = inline.verify(child_ctx, child_digest).await;
verify_started
.await
.expect("verify should reach application before marshal abort");
marshal_actor_handle.abort();
let _ = marshal_actor_handle.await;
release_verify.send_lossy(());
select! {
result = verify_rx => {
assert!(
result.is_err(),
"verify must not resolve after marshal.verified loses its persistence ack"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("verify should terminate after marshal abort");
},
}
let certify_rx = inline.certify(child_round, child_digest).await;
select! {
result = certify_rx => {
assert!(
result.is_err(),
"certify must not bypass failed verify persistence via stale availability"
);
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!("certify should terminate after marshal abort");
},
}
drop(inline);
drop(marshal);
drop(buffer);
let setup2 = StandardHarness::setup_validator(
context
.child("validator_restart")
.with_attribute("index", 0),
&mut oracle,
me,
ConstantProvider::new(schemes[0].clone()),
)
.await;
let marshal2 = setup2.mailbox;
let post_restart = marshal2.get_block(&child_digest).await;
assert!(
post_restart.is_none(),
"failed marshal.verified ack must not leave a durably recoverable block"
);
});
}
#[test_traced("WARN")]
fn test_propose_skips_when_verified_block_exists_on_restart() {
let runner = deterministic::Runner::timed(Duration::from_secs(30));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle = setup_network_with_participants(
context.child("network"),
NZUsize!(1),
participants.clone(),
)
.await;
let me = participants[0].clone();
let round = Round::new(Epoch::zero(), View::new(1));
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
let ctx = Ctx {
round,
leader: me.clone(),
parent: (View::zero(), genesis.digest()),
};
let pre_setup = StandardHarness::setup_validator(
context.child("validator").with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let pre_marshal = pre_setup.mailbox;
let pre_actor = pre_setup.actor_handle;
let pre_extra = pre_setup.extra;
let pre_application = pre_setup.application;
let stale_block = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
assert!(pre_marshal.verified(round, stale_block).await);
pre_actor.abort();
drop(pre_marshal);
drop(pre_extra);
drop(pre_application);
let post_setup = StandardHarness::setup_validator(
context
.child("validator_restart")
.with_attribute("index", 0),
&mut oracle,
me.clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let post_marshal = post_setup.mailbox;
let fresh_block = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
let mock_app: MockVerifyingApp<B, S> =
MockVerifyingApp::new().with_propose_result(fresh_block);
let mut inline = Inline::new(
context.child("inline"),
mock_app,
post_marshal.clone(),
FixedEpocher::new(BLOCKS_PER_EPOCH),
);
let digest_rx = inline.propose(ctx).await;
assert!(
digest_rx.await.is_err(),
"propose must drop the receiver so the voter nullifies the round via timeout"
);
});
}
}