use super::round::Round;
use crate::{
simplex::{
elector::{Config as ElectorConfig, Elector},
interesting,
metrics::{Leader, Timeout, TimeoutReason},
min_active,
scheme::Scheme,
types::{
Artifact, Certificate, Context, Finalization, Finalize, Notarization, Notarize,
Nullification, Nullify, Proposal,
},
},
types::{Epoch, Participant, Round as Rnd, View, ViewDelta},
Viewable,
};
use commonware_cryptography::{certificate, Digest};
use commonware_runtime::{telemetry::metrics::status::GaugeExt, Clock, Metrics};
use commonware_utils::futures::Aborter;
use prometheus_client::metrics::{counter::Counter, family::Family, gauge::Gauge};
use rand_core::CryptoRngCore;
use std::{
collections::{BTreeMap, BTreeSet},
mem::{replace, take},
sync::atomic::AtomicI64,
time::{Duration, SystemTime},
};
use tracing::{debug, warn};
const GENESIS_VIEW: View = View::zero();
#[derive(Clone, Copy, Debug, PartialEq, Eq, thiserror::Error)]
enum ParentPayloadError {
#[error("proposal view {proposal_view} is not after parent view {parent_view}")]
ParentNotBeforeProposal {
proposal_view: View,
parent_view: View,
},
#[error(
"proposal view {proposal_view} references parent view {parent_view} below last finalized view {last_finalized}"
)]
ParentBeforeFinalized {
proposal_view: View,
parent_view: View,
last_finalized: View,
},
#[error(
"proposal view {proposal_view} references parent view {parent_view} but view {missing_view} is not nullified"
)]
MissingNullification {
proposal_view: View,
parent_view: View,
missing_view: View,
},
#[error(
"proposal view {proposal_view} references parent view {parent_view} but the parent is not certified"
)]
ParentNotCertified {
proposal_view: View,
parent_view: View,
},
}
impl ParentPayloadError {
const fn invalid_proposal(self) -> bool {
match self {
Self::ParentNotBeforeProposal { .. } | Self::ParentBeforeFinalized { .. } => true,
Self::MissingNullification { .. } | Self::ParentNotCertified { .. } => false,
}
}
}
pub struct Config<S: certificate::Scheme, L: ElectorConfig<S>> {
pub scheme: S,
pub elector: L,
pub epoch: Epoch,
pub activity_timeout: ViewDelta,
pub leader_timeout: Duration,
pub certification_timeout: Duration,
pub timeout_retry: Duration,
}
pub struct State<E: Clock + CryptoRngCore + Metrics, S: Scheme<D>, L: ElectorConfig<S>, D: Digest> {
context: E,
scheme: S,
elector: L::Elector,
epoch: Epoch,
activity_timeout: ViewDelta,
leader_timeout: Duration,
certification_timeout: Duration,
timeout_retry: Duration,
view: View,
last_finalized: View,
genesis: Option<D>,
views: BTreeMap<View, Round<S, D>>,
certification_candidates: BTreeSet<View>,
outstanding_certifications: BTreeSet<View>,
current_view: Gauge,
tracked_views: Gauge,
timeouts: Family<Timeout, Counter>,
nullifications: Family<Leader, Counter>,
}
impl<E: Clock + CryptoRngCore + Metrics, S: Scheme<D>, L: ElectorConfig<S>, D: Digest>
State<E, S, L, D>
{
pub fn new(context: E, cfg: Config<S, L>) -> Self {
let current_view = Gauge::<i64, AtomicI64>::default();
let tracked_views = Gauge::<i64, AtomicI64>::default();
let timeouts = Family::<Timeout, Counter>::default();
let nullifications = Family::<Leader, Counter>::default();
context.register("current_view", "current view", current_view.clone());
context.register("tracked_views", "tracked views", tracked_views.clone());
context.register("timeouts", "timed out views", timeouts.clone());
context.register("nullifications", "nullifications", nullifications.clone());
let elector = cfg.elector.build(cfg.scheme.participants());
Self {
context,
scheme: cfg.scheme,
elector,
epoch: cfg.epoch,
activity_timeout: cfg.activity_timeout,
leader_timeout: cfg.leader_timeout,
certification_timeout: cfg.certification_timeout,
timeout_retry: cfg.timeout_retry,
view: GENESIS_VIEW,
last_finalized: GENESIS_VIEW,
genesis: None,
views: BTreeMap::new(),
certification_candidates: BTreeSet::new(),
outstanding_certifications: BTreeSet::new(),
current_view,
tracked_views,
timeouts,
nullifications,
}
}
pub fn set_genesis(&mut self, genesis: D) {
self.genesis = Some(genesis);
self.enter_view(GENESIS_VIEW.next());
self.set_leader(GENESIS_VIEW.next(), None);
}
pub const fn epoch(&self) -> Epoch {
self.epoch
}
pub const fn current_view(&self) -> View {
self.view
}
pub const fn last_finalized(&self) -> View {
self.last_finalized
}
pub const fn min_active(&self) -> View {
min_active(self.activity_timeout, self.last_finalized)
}
pub fn is_interesting(&self, pending: View, allow_future: bool) -> bool {
interesting(
self.activity_timeout,
self.last_finalized,
self.view,
pending,
allow_future,
)
}
pub fn is_me(&self, idx: Participant) -> bool {
self.scheme.me().is_some_and(|me| me == idx)
}
fn enter_view(&mut self, view: View) -> bool {
if view <= self.view {
return false;
}
let now = self.context.current();
let leader_deadline = now + self.leader_timeout;
let certification_deadline = now + self.certification_timeout;
let round = self.create_round(view);
round.set_deadlines(leader_deadline, certification_deadline);
self.view = view;
let _ = self.current_view.try_set(view.get());
true
}
fn set_leader(&mut self, view: View, certificate: Option<&S::Certificate>) {
let leader = self.elector.elect(Rnd::new(self.epoch, view), certificate);
let round = self.create_round(view);
if round.leader().is_some() {
return;
}
round.set_leader(leader);
}
fn create_round(&mut self, view: View) -> &mut Round<S, D> {
self.views.entry(view).or_insert_with(|| {
Round::new(
self.scheme.clone(),
Rnd::new(self.epoch, view),
self.context.current(),
)
})
}
pub fn next_timeout_deadline(&mut self) -> SystemTime {
let now = self.context.current();
let timeout_retry = self.timeout_retry;
let round = self.create_round(self.view);
round.next_timeout_deadline(now, timeout_retry)
}
pub fn construct_nullify(&mut self, view: View) -> Option<(bool, Nullify<S>)> {
if view != self.view {
return None;
}
let is_retry = self.create_round(view).construct_nullify()?;
let nullify = Nullify::sign::<D>(&self.scheme, Rnd::new(self.epoch, view))?;
if !is_retry {
let round = self.create_round(view);
let reason = if round.proposal().is_some() {
TimeoutReason::CertificationTimeout
} else {
TimeoutReason::LeaderTimeout
};
let (reason, _) = round.set_timeout_reason(reason);
if let Some(leader) = round.leader() {
self.timeouts
.get_or_create(&Timeout::new(&leader.key, reason))
.inc();
}
}
Some((is_retry, nullify))
}
pub fn get_best_certificate(&self, view: View) -> Option<Certificate<S, D>> {
if view == GENESIS_VIEW {
return None;
}
#[allow(clippy::option_if_let_else)]
if let Some(finalization) = self.finalization(view).cloned() {
Some(Certificate::Finalization(finalization))
} else if let Some(nullification) = self.nullification(view).cloned() {
Some(Certificate::Nullification(nullification))
} else if let Some(notarization) = self.notarization(view).cloned() {
Some(Certificate::Notarization(notarization))
} else {
warn!(%view, "entry certificate not found");
None
}
}
pub fn add_notarization(
&mut self,
notarization: Notarization<S, D>,
) -> (bool, Option<S::PublicKey>) {
let view = notarization.view();
self.set_leader(view.next(), Some(¬arization.certificate));
let result = self.create_round(view).add_notarization(notarization);
if result.0 && view > self.last_finalized {
self.certification_candidates.insert(view);
}
result
}
pub fn add_nullification(&mut self, nullification: Nullification<S>) -> bool {
let view = nullification.view();
self.enter_view(view.next());
self.set_leader(view.next(), Some(&nullification.certificate));
let round = self.create_round(view);
let added = round.add_nullification(nullification);
let leader = added.then(|| round.leader()).flatten();
if let Some(leader) = leader {
self.nullifications
.get_or_create(&Leader::new(&leader.key))
.inc();
}
added
}
pub fn add_finalization(
&mut self,
finalization: Finalization<S, D>,
) -> (bool, Option<S::PublicKey>) {
let view = finalization.view();
if view > self.last_finalized {
self.last_finalized = view;
self.certification_candidates.retain(|v| *v > view);
let keep = self.outstanding_certifications.split_off(&view.next());
for v in replace(&mut self.outstanding_certifications, keep) {
if let Some(round) = self.views.get_mut(&v) {
round.abort_certify();
}
}
}
self.enter_view(view.next());
self.set_leader(view.next(), Some(&finalization.certificate));
self.create_round(view).add_finalization(finalization)
}
pub fn construct_notarize(&mut self, view: View) -> Option<Notarize<S, D>> {
let candidate = self
.views
.get_mut(&view)
.and_then(|round| round.construct_notarize().cloned())?;
Notarize::sign(&self.scheme, candidate)
}
pub fn construct_finalize(&mut self, view: View) -> Option<Finalize<S, D>> {
let candidate = self
.views
.get_mut(&view)
.and_then(|round| round.construct_finalize().cloned())?;
Finalize::sign(&self.scheme, candidate)
}
pub fn broadcast_notarization(&mut self, view: View) -> Option<Notarization<S, D>> {
self.views
.get_mut(&view)
.and_then(|round| round.broadcast_notarization())
}
pub fn notarization(&self, view: View) -> Option<&Notarization<S, D>> {
self.views.get(&view).and_then(|round| round.notarization())
}
pub fn nullification(&self, view: View) -> Option<&Nullification<S>> {
self.views
.get(&view)
.and_then(|round| round.nullification())
}
pub fn finalization(&self, view: View) -> Option<&Finalization<S, D>> {
self.views.get(&view).and_then(|round| round.finalization())
}
pub fn forwardable_proposal(&self, view: View) -> Option<Proposal<D>> {
let round = self.views.get(&view)?;
if round.finalization().is_some() || round.is_certified() {
return round.proposal().cloned();
}
None
}
pub fn broadcast_nullification(&mut self, view: View) -> Option<Nullification<S>> {
self.views
.get_mut(&view)
.and_then(|round| round.broadcast_nullification())
}
pub fn broadcast_finalization(&mut self, view: View) -> Option<Finalization<S, D>> {
self.views
.get_mut(&view)
.and_then(|round| round.broadcast_finalization())
}
pub fn replay(&mut self, artifact: &Artifact<S, D>) {
self.create_round(artifact.view()).replay(artifact);
}
pub fn leader_index(&self, view: View) -> Option<Participant> {
self.views
.get(&view)
.and_then(|round| round.leader().map(|leader| leader.idx))
}
pub fn elapsed_since_start(&self, view: View) -> Option<Duration> {
let now = self.context.current();
self.views
.get(&view)
.map(|round| round.elapsed_since_start(now))
}
pub fn trigger_timeout(&mut self, view: View, reason: TimeoutReason) {
if view != self.view {
return;
}
let now = self.context.current();
let round = self.create_round(view);
let (_, is_first_timeout) = round.set_timeout_reason(reason);
if is_first_timeout {
round.set_deadlines(now, now);
}
}
pub fn try_propose(&mut self) -> Option<Context<D, S::PublicKey>> {
let view = self.view;
if view == GENESIS_VIEW {
return None;
}
if !self
.views
.get_mut(&view)
.expect("view must exist")
.should_propose()
{
return None;
}
let parent = self.find_parent(view);
let (parent_view, parent_payload) = match parent {
Ok(parent) => parent,
Err(missing) => {
debug!(%view, %missing, "missing parent during proposal");
return None;
}
};
let leader = self
.views
.get_mut(&view)
.expect("view must exist")
.try_propose()?;
Some(Context {
round: Rnd::new(self.epoch, view),
leader: leader.key,
parent: (parent_view, parent_payload),
})
}
pub fn proposed(&mut self, proposal: Proposal<D>) -> bool {
self.views
.get_mut(&proposal.view())
.map(|round| round.proposed(proposal))
.unwrap_or(false)
}
pub fn set_proposal(&mut self, view: View, proposal: Proposal<D>) -> bool {
self.create_round(view).set_proposal(proposal)
}
#[allow(clippy::type_complexity)]
pub fn try_verify(&mut self) -> Option<(Context<D, S::PublicKey>, Proposal<D>)> {
let view = self.view;
let (leader, proposal) = self.views.get(&view)?.should_verify()?;
let parent_payload = match self.parent_payload(&proposal) {
Ok(parent_payload) => parent_payload,
Err(err) => {
if err.invalid_proposal() {
warn!(round = ?proposal.round, ?err, "proposal failed verification");
self.trigger_timeout(view, TimeoutReason::InvalidProposal);
} else {
debug!(
%view,
?proposal,
?err,
"proposal exists but ancestry is not yet certified"
);
}
return None;
}
};
if !self.views.get_mut(&view)?.try_verify() {
return None;
}
let context = Context {
round: proposal.round,
leader: leader.key,
parent: (proposal.parent, parent_payload),
};
Some((context, proposal))
}
pub fn verified(&mut self, view: View) -> bool {
self.views
.get_mut(&view)
.map(|round| round.verified())
.unwrap_or(false)
}
pub fn set_certify_handle(&mut self, view: View, handle: Aborter) {
let Some(round) = self.views.get_mut(&view) else {
return;
};
round.set_certify_handle(handle);
self.outstanding_certifications.insert(view);
}
pub fn certify_candidates(&mut self) -> Vec<Proposal<D>> {
let candidates = take(&mut self.certification_candidates);
candidates
.into_iter()
.filter_map(|view| {
if view <= self.last_finalized {
return None;
}
self.views.get_mut(&view)?.try_certify()
})
.collect()
}
pub fn certified(&mut self, view: View, is_success: bool) -> Option<Notarization<S, D>> {
let round = self.views.get_mut(&view)?;
round.certified(is_success);
if is_success {
round.clear_deadlines();
}
self.outstanding_certifications.remove(&view);
let notarization = round
.notarization()
.cloned()
.expect("notarization must exist for certified view");
if is_success {
self.enter_view(view.next());
} else {
self.trigger_timeout(view, TimeoutReason::FailedCertification);
}
Some(notarization)
}
pub fn prune(&mut self) -> Vec<View> {
let min = self.min_active();
let kept = self.views.split_off(&min);
let removed = replace(&mut self.views, kept).into_keys().collect();
let _ = self.tracked_views.try_set(self.views.len());
removed
}
fn is_certified(&self, view: View) -> Option<&D> {
if view == GENESIS_VIEW {
return Some(self.genesis.as_ref().expect("genesis must be present"));
}
let round = self.views.get(&view)?;
if round.finalization().is_some() || round.is_certified() {
return Some(&round.proposal().expect("proposal must exist").payload);
}
None
}
fn is_nullified(&self, view: View) -> bool {
if view == GENESIS_VIEW {
return false;
}
let round = match self.views.get(&view) {
Some(round) => round,
None => return false,
};
round.nullification().is_some()
}
#[cfg(test)]
pub fn is_certify_aborted(&self, view: View) -> bool {
self.views
.get(&view)
.is_some_and(|round| round.is_certify_aborted())
}
fn find_parent(&self, view: View) -> Result<(View, D), View> {
let mut cursor = view.previous().unwrap_or(GENESIS_VIEW);
loop {
if let Some(parent) = self.is_certified(cursor) {
return Ok((cursor, *parent));
}
if !self.is_nullified(cursor) {
return Err(cursor);
}
cursor = cursor.previous().expect("cursor must not wrap");
}
}
fn parent_payload(&self, proposal: &Proposal<D>) -> Result<D, ParentPayloadError> {
let (view, parent) = (proposal.view(), proposal.parent);
if view <= parent {
return Err(ParentPayloadError::ParentNotBeforeProposal {
proposal_view: view,
parent_view: parent,
});
}
if parent < self.last_finalized {
return Err(ParentPayloadError::ParentBeforeFinalized {
proposal_view: view,
parent_view: parent,
last_finalized: self.last_finalized,
});
}
if let Some(missing_view) =
View::range(parent.next(), view).find(|v| !self.is_nullified(*v))
{
return Err(ParentPayloadError::MissingNullification {
proposal_view: view,
parent_view: parent,
missing_view,
});
}
self.is_certified(parent)
.copied()
.ok_or(ParentPayloadError::ParentNotCertified {
proposal_view: view,
parent_view: parent,
})
}
pub fn parent_certificate(&mut self, view: View) -> Option<Certificate<S, D>> {
let parent = {
let view = self.views.get(&view)?.proposal()?.parent;
self.views.get(&view)?
};
if let Some(f) = parent.finalization().cloned() {
return Some(Certificate::Finalization(f));
}
if let Some(n) = parent.notarization().cloned() {
return Some(Certificate::Notarization(n));
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::simplex::{
elector::RoundRobin,
scheme::ed25519,
types::{Finalization, Finalize, Notarization, Notarize, Nullification, Nullify, Proposal},
};
use commonware_cryptography::{certificate::mocks::Fixture, sha256::Digest as Sha256Digest};
use commonware_parallel::Sequential;
use commonware_runtime::{deterministic, Runner};
use commonware_utils::futures::AbortablePool;
use std::time::Duration;
fn test_genesis() -> Sha256Digest {
Sha256Digest::from([0u8; 32])
}
#[test]
fn certificate_candidates_respect_force_flag() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let mut state = State::new(
context,
Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(11),
activity_timeout: ViewDelta::new(6),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
},
);
state.set_genesis(test_genesis());
let notarize_view = View::new(3);
let notarize_round = Rnd::new(Epoch::new(11), notarize_view);
let notarize_proposal =
Proposal::new(notarize_round, GENESIS_VIEW, Sha256Digest::from([50u8; 32]));
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, notarize_proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
state.add_notarization(notarization);
assert!(state.broadcast_notarization(notarize_view).is_some());
assert!(state.broadcast_notarization(notarize_view).is_none());
assert!(state.notarization(notarize_view).is_some());
let nullify_view = View::new(4);
let nullify_round = Rnd::new(Epoch::new(11), nullify_view);
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, nullify_round).expect("nullify")
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential)
.expect("nullification");
state.add_nullification(nullification);
assert!(state.broadcast_nullification(nullify_view).is_some());
assert!(state.broadcast_nullification(nullify_view).is_none());
assert!(state.nullification(nullify_view).is_some());
let finalize_view = View::new(5);
let finalize_round = Rnd::new(Epoch::new(11), finalize_view);
let finalize_proposal =
Proposal::new(finalize_round, GENESIS_VIEW, Sha256Digest::from([51u8; 32]));
let finalize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Finalize::sign(scheme, finalize_proposal.clone()).unwrap())
.collect();
let finalization =
Finalization::from_finalizes(&verifier, finalize_votes.iter(), &Sequential)
.expect("finalization");
state.add_finalization(finalization);
assert!(state.broadcast_finalization(finalize_view).is_some());
assert!(state.broadcast_finalization(finalize_view).is_none());
assert!(state.finalization(finalize_view).is_some());
});
}
#[test]
fn timeout_helpers_reuse_and_reset_deadlines() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture { schemes, .. } = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[0].clone(); let retry = Duration::from_secs(3);
let cfg = Config {
scheme: local_scheme.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(4),
activity_timeout: ViewDelta::new(2),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: retry,
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let first = state.next_timeout_deadline();
let second = state.next_timeout_deadline();
assert_eq!(first, second, "cached deadline should be reused");
let (was_retry, _) = state
.construct_nullify(state.current_view())
.expect("first timeout nullify should exist");
assert!(!was_retry, "first timeout is not a retry");
context.sleep(Duration::from_secs(2)).await;
let later = context.current();
let third = state.next_timeout_deadline();
assert_eq!(third, later + retry, "new retry scheduled after timeout");
let fourth = state.next_timeout_deadline();
assert_eq!(fourth, third, "retry deadline should be set");
context.sleep(Duration::from_secs(10)).await;
let fifth = state.next_timeout_deadline();
assert_eq!(fifth, later + retry, "retry deadline should be set");
let (was_retry, _) = state
.construct_nullify(state.current_view())
.expect("retry timeout nullify should exist");
assert!(was_retry, "subsequent timeout should be treated as retry");
let sixth = state.next_timeout_deadline();
let later = context.current();
assert_eq!(sixth, later + retry, "retry deadline should be set");
});
}
#[test]
fn nullify_preserves_retry_backoff_after_first_timeout_vote() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes,
participants,
..
} = ed25519::fixture(&mut context, &namespace, 4);
let retry = Duration::from_secs(3);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(30),
activity_timeout: ViewDelta::new(2),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: retry,
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let view = state.current_view();
let (was_retry, _) = state
.construct_nullify(view)
.expect("first timeout nullify should exist");
assert!(!was_retry, "first timeout should not be marked as retry");
let leader = state.leader_index(view).expect("leader must be set");
let leader_key = &participants[leader.get() as usize];
let label = Timeout::new(leader_key, TimeoutReason::LeaderTimeout);
assert_eq!(
state.timeouts.get_or_create(&label).get(),
1,
"first timeout nullify should record a leader-timeout metric"
);
context.sleep(Duration::from_secs(2)).await;
let now = context.current();
let retry_deadline = state.next_timeout_deadline();
assert_eq!(
retry_deadline,
now + retry,
"first retry should honor configured nullify backoff"
);
state.trigger_timeout(view, TimeoutReason::LeaderNullify);
assert_eq!(
state.next_timeout_deadline(),
retry_deadline,
"retry backoff should be preserved after repeated timeout hints"
);
});
}
#[test]
fn nullify_without_reason_reuses_first_recorded_reason() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes,
participants,
..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(31),
activity_timeout: ViewDelta::new(2),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let view = state.current_view();
state.trigger_timeout(view, TimeoutReason::MissingProposal);
let (was_retry, _) = state
.construct_nullify(view)
.expect("first timeout nullify should exist");
assert!(!was_retry);
let leader = state.leader_index(view).expect("leader must be set");
let leader_key = &participants[leader.get() as usize];
let missing = Timeout::new(leader_key, TimeoutReason::MissingProposal);
let leader_timeout = Timeout::new(leader_key, TimeoutReason::LeaderTimeout);
assert_eq!(state.timeouts.get_or_create(&missing).get(), 1);
assert_eq!(state.timeouts.get_or_create(&leader_timeout).get(), 0);
let (was_retry, _) = state
.construct_nullify(view)
.expect("retry timeout nullify should exist");
assert!(was_retry);
assert_eq!(state.timeouts.get_or_create(&missing).get(), 1);
});
}
#[test]
fn notarization_keeps_certification_timeout_pending_certification() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(32),
activity_timeout: ViewDelta::new(2),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let view = state.current_view();
let proposal = Proposal::new(
Rnd::new(state.epoch(), view),
GENESIS_VIEW,
Sha256Digest::from([52u8; 32]),
);
assert!(state.set_proposal(view, proposal.clone()));
let certification_deadline = state.next_timeout_deadline();
assert_eq!(
certification_deadline,
context.current() + Duration::from_secs(2)
);
let votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, proposal.clone()).expect("notarize"))
.collect();
let notarization = Notarization::from_notarizes(&verifier, votes.iter(), &Sequential)
.expect("notarization");
let (added, equivocator) = state.add_notarization(notarization);
assert!(added);
assert!(equivocator.is_none());
assert_eq!(
state.next_timeout_deadline(),
certification_deadline,
"certification timeout must continue to bound certification latency"
);
context.sleep(Duration::from_secs(3)).await;
assert!(
state.next_timeout_deadline() <= context.current(),
"stalled certification should leave the view timed out"
);
});
}
#[test]
fn expire_old_round_is_noop() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(12),
activity_timeout: ViewDelta::new(3),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let deadline_v1 = state.next_timeout_deadline();
state.trigger_timeout(View::zero(), TimeoutReason::Inactivity);
assert_eq!(state.current_view(), View::new(1));
assert_eq!(state.next_timeout_deadline(), deadline_v1);
assert!(
!state.views.contains_key(&View::zero()),
"old round should not be created when expire is ignored"
);
let view_1 = View::new(1);
let votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(state.epoch(), view_1))
.expect("nullify")
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &votes, &Sequential).expect("nullify");
assert!(state.add_nullification(nullification));
assert_eq!(state.current_view(), View::new(2));
let deadline_v2 = state.next_timeout_deadline();
state.trigger_timeout(view_1, TimeoutReason::Inactivity);
assert_eq!(state.current_view(), View::new(2));
assert_eq!(state.next_timeout_deadline(), deadline_v2);
});
}
#[test]
fn entering_next_view_resets_expired_timeout_state() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let leader_timeout = Duration::from_secs(1);
let retry = Duration::from_secs(3);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(13),
activity_timeout: ViewDelta::new(3),
leader_timeout,
certification_timeout: Duration::from_secs(2),
timeout_retry: retry,
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let view_1 = state.current_view();
assert_eq!(view_1, View::new(1));
state.trigger_timeout(view_1, TimeoutReason::LeaderTimeout);
assert!(
state.next_timeout_deadline() <= context.current(),
"current view should be expired after timeout is triggered"
);
let (was_retry, _) = state
.construct_nullify(view_1)
.expect("first timeout nullify should exist");
assert!(!was_retry);
let retry_deadline = state.next_timeout_deadline();
assert_eq!(
retry_deadline,
context.current() + retry,
"timed-out view should schedule a retry"
);
let votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(state.epoch(), view_1))
.expect("nullify")
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &votes, &Sequential).expect("nullify");
assert!(state.add_nullification(nullification));
let view_2 = state.current_view();
assert_eq!(view_2, View::new(2));
let next_deadline = state.next_timeout_deadline();
assert_eq!(
next_deadline,
context.current() + leader_timeout,
"next view should start with a fresh leader timeout"
);
assert_ne!(
next_deadline, retry_deadline,
"next view must not inherit the previous view retry deadline"
);
});
}
#[test]
fn nullify_only_records_metric_once() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes,
participants,
..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(12),
activity_timeout: ViewDelta::new(3),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let view = state.current_view();
let leader = state.leader_index(view).unwrap();
let leader_key = &participants[leader.get() as usize];
let label = Timeout::new(leader_key, TimeoutReason::LeaderNullify);
state.trigger_timeout(view, TimeoutReason::LeaderNullify);
let expired_at = state.next_timeout_deadline();
context.sleep(Duration::from_secs(1)).await;
state.trigger_timeout(view, TimeoutReason::LeaderTimeout);
assert_eq!(
state.next_timeout_deadline(),
expired_at,
"repeated timeout hints should not reset the expired deadline"
);
assert_eq!(state.timeouts.get_or_create(&label).get(), 0);
let (was_retry, _) = state
.construct_nullify(view)
.expect("first timeout nullify should exist");
assert!(!was_retry);
assert_eq!(state.timeouts.get_or_create(&label).get(), 1);
state.trigger_timeout(view, TimeoutReason::LeaderTimeout);
let (was_retry, _) = state
.construct_nullify(view)
.expect("retry timeout nullify should exist");
assert!(was_retry);
assert_eq!(state.timeouts.get_or_create(&label).get(), 1);
let other_label = Timeout::new(leader_key, TimeoutReason::LeaderTimeout);
assert_eq!(state.timeouts.get_or_create(&other_label).get(), 0);
});
}
#[test]
fn construct_nullify_current_view_only() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[0].clone();
let cfg = Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(4),
activity_timeout: ViewDelta::new(2),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let current = state.current_view();
let next = current.next();
assert!(state.construct_nullify(next).is_none());
let current_round = Rnd::new(Epoch::new(4), current);
let current_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, current_round).expect("nullify")
})
.collect();
let current_nullification =
Nullification::from_nullifies(&verifier, ¤t_votes, &Sequential)
.expect("nullification");
assert!(state.add_nullification(current_nullification));
assert_eq!(state.current_view(), next);
assert!(state.construct_nullify(current).is_none());
let (was_retry, _) = state
.construct_nullify(next)
.expect("first timeout nullify for current view should be emitted");
assert!(!was_retry);
let (was_retry, _) = state
.construct_nullify(next)
.expect("retry timeout nullify for current view should be emitted");
assert!(was_retry);
});
}
#[test]
fn round_prunes_with_min_active() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(7),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
for view in 0..5 {
state.create_round(View::new(view));
}
let proposal_a = Proposal::new(
Rnd::new(Epoch::new(1), View::new(20)),
GENESIS_VIEW,
Sha256Digest::from([1u8; 32]),
);
let finalization_votes: Vec<_> = schemes
.iter()
.map(|scheme| Finalize::sign(scheme, proposal_a.clone()).unwrap())
.collect();
let finalization =
Finalization::from_finalizes(&verifier, finalization_votes.iter(), &Sequential)
.expect("finalization");
state.add_finalization(finalization);
let removed = state.prune();
assert_eq!(
removed,
vec![
View::new(0),
View::new(1),
View::new(2),
View::new(3),
View::new(4)
]
);
assert_eq!(state.views.len(), 2); });
}
#[test]
fn parent_payload_returns_parent_digest() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[2].clone(); let cfg = Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(4),
activity_timeout: ViewDelta::new(2),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let parent_view = View::new(1);
let parent_payload = Sha256Digest::from([1u8; 32]);
let parent_proposal = Proposal::new(
Rnd::new(Epoch::new(1), parent_view),
GENESIS_VIEW,
parent_payload,
);
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), View::new(2)),
parent_view,
Sha256Digest::from([9u8; 32]),
);
assert_eq!(
state.parent_payload(&proposal),
Err(ParentPayloadError::ParentNotCertified {
proposal_view: View::new(2),
parent_view,
})
);
let notarization_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, parent_proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarization_votes.iter(), &Sequential)
.unwrap();
state.add_notarization(notarization);
assert_eq!(
state.parent_payload(&proposal),
Err(ParentPayloadError::ParentNotCertified {
proposal_view: View::new(2),
parent_view,
})
);
let mut pool = AbortablePool::<()>::default();
let handle = pool.push(futures::future::pending());
state.set_certify_handle(parent_view, handle);
state.certified(parent_view, true);
assert_eq!(state.parent_payload(&proposal), Ok(parent_payload));
});
}
#[test]
fn parent_certificate_prefers_finalization() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[1].clone(); let cfg = Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(7),
activity_timeout: ViewDelta::new(3),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let parent_round = Rnd::new(state.epoch(), View::new(1));
let parent_proposal =
Proposal::new(parent_round, GENESIS_VIEW, Sha256Digest::from([11u8; 32]));
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, parent_proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
state.add_notarization(notarization.clone());
let proposal = Proposal::new(
Rnd::new(state.epoch(), View::new(2)),
View::new(1),
Sha256Digest::from([22u8; 32]),
);
state.proposed(proposal);
let cert = state.parent_certificate(View::new(2)).unwrap();
assert!(matches!(cert, Certificate::Notarization(n) if n == notarization));
let finalize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Finalize::sign(scheme, parent_proposal.clone()).unwrap())
.collect();
let finalization =
Finalization::from_finalizes(&verifier, finalize_votes.iter(), &Sequential)
.expect("finalization");
state.add_finalization(finalization.clone());
let cert = state.parent_certificate(View::new(2)).unwrap();
assert!(matches!(cert, Certificate::Finalization(f) if f == finalization));
});
}
#[test]
fn parent_payload_errors_without_nullification() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
activity_timeout: ViewDelta::new(5),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let parent_view = View::new(1);
let parent_proposal = Proposal::new(
Rnd::new(Epoch::new(1), parent_view),
GENESIS_VIEW,
Sha256Digest::from([2u8; 32]),
);
let notarization_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, parent_proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarization_votes.iter(), &Sequential)
.unwrap();
state.add_notarization(notarization);
state.create_round(View::new(2));
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), View::new(3)),
parent_view,
Sha256Digest::from([3u8; 32]),
);
assert_eq!(
state.parent_payload(&proposal),
Err(ParentPayloadError::MissingNullification {
proposal_view: View::new(3),
parent_view,
missing_view: View::new(2),
})
);
});
}
#[test]
fn parent_payload_returns_genesis_payload() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
activity_timeout: ViewDelta::new(5),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(Epoch::new(1), View::new(1)))
.unwrap()
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential).unwrap();
state.add_nullification(nullification);
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), View::new(2)),
GENESIS_VIEW,
Sha256Digest::from([8u8; 32]),
);
let genesis = Sha256Digest::from([0u8; 32]);
assert_eq!(state.parent_payload(&proposal), Ok(genesis));
});
}
#[test]
fn parent_payload_rejects_parent_before_finalized() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let proposal_a = Proposal::new(
Rnd::new(Epoch::new(1), View::new(3)),
GENESIS_VIEW,
Sha256Digest::from([1u8; 32]),
);
let finalization_votes: Vec<_> = schemes
.iter()
.map(|scheme| Finalize::sign(scheme, proposal_a.clone()).unwrap())
.collect();
let finalization =
Finalization::from_finalizes(&verifier, finalization_votes.iter(), &Sequential)
.expect("finalization");
state.add_finalization(finalization);
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), View::new(4)),
View::new(2),
Sha256Digest::from([6u8; 32]),
);
assert_eq!(
state.parent_payload(&proposal),
Err(ParentPayloadError::ParentBeforeFinalized {
proposal_view: View::new(4),
parent_view: View::new(2),
last_finalized: View::new(3),
})
);
});
}
#[test]
fn try_verify_fast_paths_parent_before_finalized() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let epoch = Epoch::new(1);
let mut state = State::new(
context.clone(),
Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch,
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_secs(30),
},
);
state.set_genesis(test_genesis());
let finalized_view = View::new(3);
let finalized_proposal = Proposal::new(
Rnd::new(epoch, finalized_view),
GENESIS_VIEW,
Sha256Digest::from([1u8; 32]),
);
let finalization_votes: Vec<_> = schemes
.iter()
.map(|scheme| Finalize::sign(scheme, finalized_proposal.clone()).unwrap())
.collect();
let finalization =
Finalization::from_finalizes(&verifier, finalization_votes.iter(), &Sequential)
.expect("finalization");
state.add_finalization(finalization);
let view = state.current_view();
assert_eq!(view, View::new(4));
let proposal = Proposal::new(
Rnd::new(epoch, view),
View::new(2),
Sha256Digest::from([6u8; 32]),
);
assert!(state.set_proposal(view, proposal));
let initial_deadline = state.next_timeout_deadline();
assert!(initial_deadline > context.current());
assert!(state.try_verify().is_none());
assert!(state.next_timeout_deadline() <= context.current());
});
}
#[test]
fn try_verify_waits_for_missing_parent_certification() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture { verifier, .. } = ed25519::fixture(&mut context, &namespace, 4);
let epoch = Epoch::new(1);
let mut state = State::new(
context.clone(),
Config {
scheme: verifier,
elector: <RoundRobin>::default(),
epoch,
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_secs(30),
},
);
state.set_genesis(test_genesis());
assert!(state.enter_view(View::new(2)));
state.set_leader(View::new(2), None);
let proposal = Proposal::new(
Rnd::new(epoch, View::new(2)),
View::new(1),
Sha256Digest::from([7u8; 32]),
);
assert!(state.set_proposal(View::new(2), proposal));
let initial_deadline = state.next_timeout_deadline();
assert!(initial_deadline > context.current());
assert!(state.try_verify().is_none());
assert_eq!(state.next_timeout_deadline(), initial_deadline);
});
}
#[test]
fn replayed_local_notarize_restores_verified_leader_proposal() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture { schemes, .. } = ed25519::fixture(&mut context, &namespace, 4);
let epoch = Epoch::new(2);
let view = View::new(2);
let proposal = Proposal::new(
Rnd::new(epoch, view),
View::new(1),
Sha256Digest::from([42u8; 32]),
);
let local_vote = Notarize::sign(&schemes[0], proposal.clone()).expect("notarize");
let mut state = State::new(
context,
Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch,
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
},
);
state.set_genesis(test_genesis());
assert!(state.enter_view(view));
state.set_leader(view, None);
assert_eq!(state.leader_index(view), Some(Participant::new(0)));
state.replay(&Artifact::Notarize(local_vote));
let round = state.views.get(&view).expect("replayed round must exist");
assert_eq!(round.proposal(), Some(&proposal));
assert!(
state.construct_notarize(view).is_none(),
"replay should restore that we already emitted the local notarize vote"
);
assert!(state.try_verify().is_none());
});
}
#[test]
fn replay_restores_conflict_state() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let mut scheme_iter = schemes.into_iter();
let local_scheme = scheme_iter.next().unwrap();
let other_schemes: Vec<_> = scheme_iter.collect();
let epoch: Epoch = Epoch::new(3);
let mut state = State::new(
context.with_label("state"),
Config {
scheme: local_scheme.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
},
);
state.set_genesis(test_genesis());
let view = View::new(4);
let round = Rnd::new(epoch, view);
let proposal_a = Proposal::new(round, GENESIS_VIEW, Sha256Digest::from([21u8; 32]));
let proposal_b = Proposal::new(round, GENESIS_VIEW, Sha256Digest::from([22u8; 32]));
let local_vote = Notarize::sign(&local_scheme, proposal_a).unwrap();
state.replay(&Artifact::Notarize(local_vote.clone()));
let votes_b: Vec<_> = other_schemes
.iter()
.take(3)
.map(|scheme| Notarize::sign(scheme, proposal_b.clone()).unwrap())
.collect();
let conflicting = Notarization::from_notarizes(&verifier, votes_b.iter(), &Sequential)
.expect("certificate");
state.add_notarization(conflicting.clone());
state.replay(&Artifact::Notarization(conflicting.clone()));
assert!(state.construct_finalize(view).is_none());
let mut restarted = State::new(
context.with_label("state_restarted"),
Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
},
);
restarted.set_genesis(test_genesis());
restarted.replay(&Artifact::Notarize(local_vote));
restarted.add_notarization(conflicting.clone());
restarted.replay(&Artifact::Notarization(conflicting));
assert!(restarted.construct_finalize(view).is_none());
});
}
#[test]
fn certification_lifecycle() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let make_notarization = |view: View| {
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), view),
GENESIS_VIEW,
Sha256Digest::from([view.get() as u8; 32]),
);
let votes: Vec<_> = schemes
.iter()
.map(|s| Notarize::sign(s, proposal.clone()).unwrap())
.collect();
Notarization::from_notarizes(&verifier, votes.iter(), &Sequential).unwrap()
};
let make_finalization = |view: View| {
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), view),
GENESIS_VIEW,
Sha256Digest::from([view.get() as u8; 32]),
);
let votes: Vec<_> = schemes
.iter()
.map(|s| Finalize::sign(s, proposal.clone()).unwrap())
.collect();
Finalization::from_finalizes(&verifier, votes.iter(), &Sequential).unwrap()
};
let mut pool = AbortablePool::<()>::default();
for i in 3..=8u64 {
state.add_notarization(make_notarization(View::new(i)));
}
let candidates = state.certify_candidates();
assert_eq!(candidates.len(), 6);
for i in [3u64, 4, 5, 7] {
let handle = pool.push(futures::future::pending());
state.set_certify_handle(View::new(i), handle);
}
assert!(state.certify_candidates().is_empty());
let notarization = state.certified(View::new(7), true);
assert!(notarization.is_some());
assert!(!state.is_certify_aborted(View::new(7)));
state.add_finalization(make_finalization(View::new(5)));
assert!(state.is_certify_aborted(View::new(3)));
assert!(state.is_certify_aborted(View::new(4)));
assert!(state.is_certify_aborted(View::new(5)));
assert!(!state.is_certify_aborted(View::new(7)));
assert!(!state.is_certify_aborted(View::new(6)));
assert!(!state.is_certify_aborted(View::new(8)));
assert!(state.certify_candidates().is_empty());
state.add_notarization(make_notarization(View::new(9)));
let candidates = state.certify_candidates();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].round.view(), View::new(9));
let handle9 = pool.push(futures::future::pending());
state.set_certify_handle(View::new(9), handle9);
state.add_notarization(make_notarization(View::new(10)));
let candidates = state.certify_candidates();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].round.view(), View::new(10));
state.add_finalization(make_finalization(View::new(9)));
assert!(state.is_certify_aborted(View::new(9)));
state.add_notarization(make_notarization(View::new(11)));
let candidates = state.certify_candidates();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].round.view(), View::new(11));
});
}
#[test]
fn nullification_keeps_notarization_as_certification_candidate() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let view = View::new(2);
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), view),
GENESIS_VIEW,
Sha256Digest::from([42u8; 32]),
);
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
let (added, _) = state.add_notarization(notarization);
assert!(added);
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(Epoch::new(1), view)).unwrap()
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential)
.expect("nullification");
assert!(state.add_nullification(nullification));
let candidates = state.certify_candidates();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].round.view(), view);
});
}
#[test]
fn nullification_does_not_abort_inflight_certification() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: verifier.clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let view = View::new(2);
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), view),
GENESIS_VIEW,
Sha256Digest::from([24u8; 32]),
);
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
let (added, _) = state.add_notarization(notarization);
assert!(added);
let candidates = state.certify_candidates();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].round.view(), view);
let mut pool = AbortablePool::<()>::default();
let handle = pool.push(futures::future::pending());
state.set_certify_handle(view, handle);
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(Epoch::new(1), view)).unwrap()
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential)
.expect("nullification");
assert!(state.add_nullification(nullification));
assert!(!state.is_certify_aborted(view));
assert!(state.certified(view, true).is_some());
assert!(state.is_certified(view).is_some());
});
}
#[test]
fn nullification_then_late_certification_allows_child_to_build_on_parent() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[0].clone();
let cfg = Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let parent_view = View::new(2);
let child_view = parent_view.next();
let payload = Sha256Digest::from([91u8; 32]);
let proposal =
Proposal::new(Rnd::new(Epoch::new(1), parent_view), GENESIS_VIEW, payload);
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
let (added, _) = state.add_notarization(notarization);
assert!(added);
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(Epoch::new(1), parent_view))
.unwrap()
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential)
.expect("nullification");
assert!(state.add_nullification(nullification));
assert_eq!(state.leader_index(child_view), Some(Participant::new(0)));
assert!(state.try_propose().is_none());
assert!(state.certified(parent_view, true).is_some());
let propose_context = state
.try_propose()
.expect("child view should be able to build on certified parent");
assert_eq!(propose_context.round.view(), child_view);
assert_eq!(propose_context.parent, (parent_view, payload));
});
}
#[test]
fn nullification_then_late_certification_unblocks_follower_verify() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[1].clone();
let cfg = Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let parent_view = View::new(2);
let child_view = parent_view.next();
let parent_payload = Sha256Digest::from([77u8; 32]);
let parent_proposal = Proposal::new(
Rnd::new(Epoch::new(1), parent_view),
GENESIS_VIEW,
parent_payload,
);
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, parent_proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
let (added, _) = state.add_notarization(notarization);
assert!(added);
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(Epoch::new(1), parent_view))
.unwrap()
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential)
.expect("nullification");
assert!(state.add_nullification(nullification));
assert_eq!(state.current_view(), child_view);
assert_eq!(state.leader_index(child_view), Some(Participant::new(0)));
let child_proposal = Proposal::new(
Rnd::new(Epoch::new(1), child_view),
parent_view,
Sha256Digest::from([78u8; 32]),
);
assert!(state.set_proposal(child_view, child_proposal.clone()));
assert!(state.try_verify().is_none());
assert!(state.certified(parent_view, true).is_some());
let verified = state.try_verify();
assert!(verified.is_some());
let (ctx, proposal) = verified.expect("verify context should exist");
assert_eq!(ctx.round.view(), child_view);
assert_eq!(ctx.parent, (parent_view, parent_payload));
assert_eq!(proposal, child_proposal);
});
}
#[test]
fn late_nullification_unblocks_follower_verify() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture {
schemes, verifier, ..
} = ed25519::fixture(&mut context, &namespace, 4);
let local_scheme = schemes[1].clone();
let cfg = Config {
scheme: local_scheme,
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(10),
leader_timeout: Duration::from_secs(10),
certification_timeout: Duration::from_secs(10),
timeout_retry: Duration::from_secs(30),
};
let mut state = State::new(context.clone(), cfg);
state.set_genesis(test_genesis());
let parent_view = View::new(1);
let blocked_view = parent_view.next();
let child_view = blocked_view.next();
let parent_payload = Sha256Digest::from([88u8; 32]);
let parent_proposal = Proposal::new(
Rnd::new(Epoch::new(1), parent_view),
GENESIS_VIEW,
parent_payload,
);
let notarize_votes: Vec<_> = schemes
.iter()
.map(|scheme| Notarize::sign(scheme, parent_proposal.clone()).unwrap())
.collect();
let notarization =
Notarization::from_notarizes(&verifier, notarize_votes.iter(), &Sequential)
.expect("notarization");
let (added, _) = state.add_notarization(notarization);
assert!(added);
assert!(state.certified(parent_view, true).is_some());
assert!(state.enter_view(child_view));
state.set_leader(child_view, None);
assert_eq!(state.current_view(), child_view);
assert_eq!(state.leader_index(child_view), Some(Participant::new(0)));
let child_proposal = Proposal::new(
Rnd::new(Epoch::new(1), child_view),
parent_view,
Sha256Digest::from([89u8; 32]),
);
assert!(state.set_proposal(child_view, child_proposal.clone()));
let initial_deadline = state.next_timeout_deadline();
assert!(initial_deadline > context.current());
assert!(state.try_verify().is_none());
assert_eq!(state.next_timeout_deadline(), initial_deadline);
let nullify_votes: Vec<_> = schemes
.iter()
.map(|scheme| {
Nullify::sign::<Sha256Digest>(scheme, Rnd::new(Epoch::new(1), blocked_view))
.unwrap()
})
.collect();
let nullification =
Nullification::from_nullifies(&verifier, &nullify_votes, &Sequential)
.expect("nullification");
assert!(state.add_nullification(nullification));
let verified = state.try_verify().expect("verify context should exist");
let (ctx, proposal) = verified;
assert_eq!(ctx.round.view(), child_view);
assert_eq!(ctx.parent, (parent_view, parent_payload));
assert_eq!(proposal, child_proposal);
});
}
#[test]
fn only_notarize_before_nullify() {
let runtime = deterministic::Runner::default();
runtime.start(|mut context| async move {
let namespace = b"ns".to_vec();
let Fixture { schemes, .. } = ed25519::fixture(&mut context, &namespace, 4);
let cfg = Config {
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
epoch: Epoch::new(1),
activity_timeout: ViewDelta::new(5),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(3),
};
let mut state = State::new(context, cfg);
state.set_genesis(test_genesis());
let view = state.current_view();
let proposal = Proposal::new(
Rnd::new(Epoch::new(1), view),
GENESIS_VIEW,
Sha256Digest::from([1u8; 32]),
);
state.set_proposal(view, proposal);
assert!(state.try_verify().is_some());
assert!(state.verified(view));
let (retry, _) = state
.construct_nullify(view)
.expect("timeout nullify should exist");
assert!(!retry);
assert!(state.construct_notarize(view).is_none());
});
}
}