pub(crate) mod config;
#[cfg(test)]
mod des_testing;
mod fault;
mod message;
mod params;
mod participation;
mod proposal;
mod round;
#[cfg(test)]
mod tests;
use std::{
any::Any,
cmp::Reverse,
collections::{btree_map, BTreeMap, HashMap, HashSet},
fmt::Debug,
iter,
path::PathBuf,
};
use datasize::DataSize;
use either::Either;
use itertools::Itertools;
use rand::{seq::IteratorRandom, Rng};
use tracing::{debug, error, event, info, trace, warn, Level};
use casper_types::{Chainspec, TimeDiff, Timestamp, U512};
use crate::{
components::consensus::{
config::Config,
consensus_protocol::{
BlockContext, ConsensusProtocol, FinalizedBlock, ProposedBlock, ProtocolOutcome,
ProtocolOutcomes, TerminalBlockData,
},
era_supervisor::SerializedMessage,
protocols,
traits::{ConsensusValueT, Context},
utils::{
wal::{ReadWal, WalEntry, WriteWal},
ValidatorIndex, ValidatorMap, Validators, Weight,
},
ActionId, LeaderSequence, TimerId,
},
types::NodeId,
utils, NodeRng,
};
use fault::Fault;
use message::{Content, SignedMessage, SyncResponse};
use params::Params;
use participation::{Participation, ParticipationStatus};
use proposal::{HashedProposal, Proposal};
use round::Round;
use serde::{Deserialize, Serialize};
pub(crate) use message::{Message, SyncRequest};
const TIMER_ID_SYNC_PEER: TimerId = TimerId(0);
const TIMER_ID_UPDATE: TimerId = TimerId(1);
const TIMER_ID_LOG_PARTICIPATION: TimerId = TimerId(2);
const MAX_FUTURE_ROUNDS: u32 = 7200;
pub(crate) type RoundId = u32;
type ProposalsAwaitingParent = HashSet<(RoundId, NodeId)>;
type ProposalsAwaitingValidation<C> = HashSet<(RoundId, HashedProposal<C>, NodeId)>;
#[derive(Deserialize, Serialize, Debug, PartialEq)]
#[serde(bound(
serialize = "C::Hash: Serialize",
deserialize = "C::Hash: Deserialize<'de>",
))]
pub(crate) enum ZugWalEntry<C: Context> {
SignedMessage(SignedMessage<C>),
Proposal(Proposal<C>, RoundId),
Evidence(SignedMessage<C>, Content<C>, C::Signature),
}
impl<C: Context> WalEntry for ZugWalEntry<C> {}
#[derive(DataSize)]
pub(crate) struct ActiveValidator<C>
where
C: Context,
{
idx: ValidatorIndex,
secret: C::ValidatorSecret,
}
impl<C: Context> Debug for ActiveValidator<C> {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ActiveValidator")
.field("idx", &self.idx)
.field("secret", &"<REDACTED>")
.finish()
}
}
struct FaultySender(NodeId);
#[derive(Debug, DataSize)]
pub(crate) struct Zug<C>
where
C: Context,
{
params: Params<C>,
proposal_timeout_millis: f64,
validators: Validators<C::ValidatorId>,
active_validator: Option<ActiveValidator<C>>,
evidence_only: bool,
proposals_waiting_for_parent:
HashMap<RoundId, HashMap<HashedProposal<C>, ProposalsAwaitingParent>>,
proposals_waiting_for_validation: HashMap<ProposedBlock<C>, ProposalsAwaitingValidation<C>>,
pending_proposal: Option<(BlockContext<C>, RoundId, Option<RoundId>)>,
leader_sequence: LeaderSequence,
rounds: BTreeMap<RoundId, Round<C>>,
faults: HashMap<ValidatorIndex, Fault<C>>,
config: config::Config,
active: ValidatorMap<Option<SignedMessage<C>>>,
first_non_finalized_round_id: RoundId,
maybe_dirty_round_id: Option<RoundId>,
current_round: RoundId,
current_round_start: Timestamp,
progress_detected: bool,
paused: bool,
next_scheduled_update: Timestamp,
write_wal: Option<WriteWal<ZugWalEntry<C>>>,
sent_sync_requests: registered_sync::RegisteredSync,
}
impl<C: Context + 'static> Zug<C> {
fn new_with_params(
validators: Validators<C::ValidatorId>,
params: Params<C>,
config: &config::Config,
prev_cp: Option<&dyn ConsensusProtocol<C>>,
seed: u64,
) -> Zug<C> {
let weights = protocols::common::validator_weights::<C>(&validators);
let active: ValidatorMap<_> = weights.iter().map(|_| None).collect();
let proposal_timeout_millis = prev_cp
.and_then(|cp| cp.as_any().downcast_ref::<Zug<C>>())
.map(|zug| zug.proposal_timeout_millis)
.unwrap_or_else(|| {
config.proposal_timeout.millis() as f64
* (config.proposal_grace_period as f64 / 100.0 + 1.0)
});
let mut can_propose: ValidatorMap<bool> = weights.iter().map(|_| true).collect();
for vidx in validators.iter_cannot_propose_idx() {
can_propose[vidx] = false;
}
let faults: HashMap<_, _> = validators
.iter_banned_idx()
.map(|idx| (idx, Fault::Banned))
.collect();
let leader_sequence = LeaderSequence::new(seed, &weights, can_propose);
info!(
instance_id = %params.instance_id(),
era_start_time = %params.start_timestamp(),
%proposal_timeout_millis,
"initializing Zug instance",
);
Zug {
leader_sequence,
proposals_waiting_for_parent: HashMap::new(),
proposals_waiting_for_validation: HashMap::new(),
rounds: BTreeMap::new(),
first_non_finalized_round_id: 0,
maybe_dirty_round_id: None,
current_round: 0,
current_round_start: Timestamp::MAX,
evidence_only: false,
faults,
active,
config: config.clone(),
params,
proposal_timeout_millis,
validators,
active_validator: None,
pending_proposal: None,
progress_detected: false,
paused: false,
next_scheduled_update: Timestamp::MAX,
write_wal: None,
sent_sync_requests: Default::default(),
}
}
#[allow(clippy::too_many_arguments)]
fn new(
instance_id: C::InstanceId,
validator_stakes: BTreeMap<C::ValidatorId, U512>,
faulty: &HashSet<C::ValidatorId>,
inactive: &HashSet<C::ValidatorId>,
chainspec: &Chainspec,
config: &Config,
prev_cp: Option<&dyn ConsensusProtocol<C>>,
era_start_time: Timestamp,
seed: u64,
) -> Zug<C> {
let validators = protocols::common::validators::<C>(faulty, inactive, validator_stakes);
let core_config = &chainspec.core_config;
let params = Params::new(
instance_id,
core_config.minimum_block_time,
era_start_time,
core_config.minimum_era_height,
era_start_time.saturating_add(core_config.era_duration),
protocols::common::ftt::<C>(core_config.finality_threshold_fraction, &validators),
);
Zug::new_with_params(validators, params, &config.zug, prev_cp, seed)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_boxed(
instance_id: C::InstanceId,
validator_stakes: BTreeMap<C::ValidatorId, U512>,
faulty: &HashSet<C::ValidatorId>,
inactive: &HashSet<C::ValidatorId>,
chainspec: &Chainspec,
config: &Config,
prev_cp: Option<&dyn ConsensusProtocol<C>>,
era_start_time: Timestamp,
seed: u64,
now: Timestamp,
wal_file: PathBuf,
) -> (Box<dyn ConsensusProtocol<C>>, ProtocolOutcomes<C>) {
let mut zug = Self::new(
instance_id,
validator_stakes,
faulty,
inactive,
chainspec,
config,
prev_cp,
era_start_time,
seed,
);
let outcomes = zug.open_wal(wal_file, now);
(Box::new(zug), outcomes)
}
fn our_idx(&self) -> Option<u32> {
self.active_validator.as_ref().map(|av| av.idx.0)
}
fn log_participation(&self) {
let mut inactive_w: u64 = 0;
let mut faulty_w: u64 = 0;
let total_w = self.validators.total_weight().0;
let mut inactive_validators = Vec::new();
let mut faulty_validators = Vec::new();
for (idx, v_id) in self.validators.enumerate_ids() {
if let Some(status) = ParticipationStatus::for_index(idx, self) {
match status {
ParticipationStatus::Equivocated
| ParticipationStatus::EquivocatedInOtherEra => {
faulty_w = faulty_w.saturating_add(self.validators.weight(idx).0);
faulty_validators.push((idx, v_id.clone(), status));
}
ParticipationStatus::Inactive | ParticipationStatus::LastSeenInRound(_) => {
inactive_w = inactive_w.saturating_add(self.validators.weight(idx).0);
inactive_validators.push((idx, v_id.clone(), status));
}
}
}
}
inactive_validators.sort_by_key(|(idx, _, status)| (Reverse(*status), *idx));
faulty_validators.sort_by_key(|(idx, _, status)| (Reverse(*status), *idx));
let inactive_w_100 = u128::from(inactive_w).saturating_mul(100);
let faulty_w_100 = u128::from(faulty_w).saturating_mul(100);
let participation = Participation::<C> {
instance_id: *self.instance_id(),
inactive_stake_percent: utils::div_round(inactive_w_100, u128::from(total_w)) as u8,
faulty_stake_percent: utils::div_round(faulty_w_100, u128::from(total_w)) as u8,
inactive_validators,
faulty_validators,
};
info!(
our_idx = self.our_idx(),
?participation,
"validator participation"
);
}
fn finalized_switch_block(&self) -> bool {
if let Some(round_id) = self.first_non_finalized_round_id.checked_sub(1) {
self.accepted_switch_block(round_id) || self.accepted_dummy_proposal(round_id)
} else {
false
}
}
fn accepted_switch_block(&self, round_id: RoundId) -> bool {
match self.round(round_id).and_then(Round::accepted_proposal) {
None => false,
Some((height, proposal)) => {
proposal.maybe_block().is_some() && height.saturating_add(1) >= self.params.end_height() && proposal.timestamp() >= self.params.end_timestamp() }
}
}
fn accepted_dummy_proposal(&self, round_id: RoundId) -> bool {
match self.round(round_id).and_then(Round::accepted_proposal) {
None => false,
Some((_, proposal)) => proposal.maybe_block().is_none(),
}
}
fn has_echoed(&self, round_id: RoundId, validator_idx: ValidatorIndex) -> bool {
self.round(round_id)
.is_some_and(|round| round.has_echoed(validator_idx))
}
fn has_voted(&self, round_id: RoundId, validator_idx: ValidatorIndex) -> bool {
self.round(round_id)
.is_some_and(|round| round.has_voted(validator_idx))
}
fn handle_sync_peer_timer(&mut self, now: Timestamp, rng: &mut NodeRng) -> ProtocolOutcomes<C> {
if self.evidence_only || self.finalized_switch_block() {
return vec![]; }
trace!(
our_idx = self.our_idx(),
instance_id = ?self.instance_id(),
"syncing with random peer",
);
let first_validator_idx = ValidatorIndex(rng.gen_range(0..self.validators.len() as u32));
let round_id = (self.first_non_finalized_round_id..=self.current_round)
.choose(rng)
.unwrap_or(self.current_round);
let payload = self.create_sync_request(rng, first_validator_idx, round_id);
let mut outcomes = vec![ProtocolOutcome::CreatedRequestToRandomPeer(
SerializedMessage::from_message(&payload),
)];
if let Some(interval) = self.config.sync_state_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now.saturating_add(interval),
TIMER_ID_SYNC_PEER,
));
}
outcomes
}
fn log_proposal(&self, proposal: &HashedProposal<C>, round_id: RoundId, msg: &str) {
let creator_index = self.leader(round_id);
let creator = if let Some(creator) = self.validators.id(creator_index) {
creator
} else {
error!(
our_idx = self.our_idx(),
?creator_index,
?round_id,
"{}: invalid creator",
msg
);
return;
};
info!(
our_idx = self.our_idx(),
hash = %proposal.hash(),
%creator,
creator_index = creator_index.0,
round_id,
timestamp = %proposal.timestamp(),
"{}", msg,
);
}
fn create_sync_request(
&mut self,
rng: &mut NodeRng,
first_validator_idx: ValidatorIndex,
round_id: RoundId,
) -> SyncRequest<C> {
let faulty = self.validator_bit_field(first_validator_idx, self.faults.keys().cloned());
let active = self.validator_bit_field(first_validator_idx, self.active.keys_some());
let round = match self.round(round_id) {
Some(round) => round,
None => {
return SyncRequest::new_empty_round(
round_id,
first_validator_idx,
faulty,
active,
*self.instance_id(),
self.sent_sync_requests.create_and_register_new_id(rng),
);
}
};
let true_votes =
self.validator_bit_field(first_validator_idx, round.votes(true).keys_some());
let false_votes =
self.validator_bit_field(first_validator_idx, round.votes(false).keys_some());
let proposal_hash = round.quorum_echoes().or_else(|| {
round
.echoes()
.iter()
.max_by_key(|(_, echo_map)| self.sum_weights(echo_map.keys()))
.map(|(hash, _)| *hash)
});
let has_proposal = round.proposal().map(HashedProposal::hash) == proposal_hash.as_ref();
let mut echoes = 0;
if let Some(echo_map) = proposal_hash.and_then(|hash| round.echoes().get(&hash)) {
echoes = self.validator_bit_field(first_validator_idx, echo_map.keys().cloned());
}
let sync_id = self.sent_sync_requests.create_and_register_new_id(rng);
SyncRequest {
round_id,
proposal_hash,
has_proposal,
first_validator_idx,
echoes,
true_votes,
false_votes,
active,
faulty,
instance_id: *self.instance_id(),
sync_id,
}
}
fn validator_bit_field(
&self,
ValidatorIndex(first_idx): ValidatorIndex,
index_iter: impl Iterator<Item = ValidatorIndex>,
) -> u128 {
let validator_count = self.validators.len() as u32;
if first_idx >= validator_count {
return 0;
}
let mut bit_field: u128 = 0;
for ValidatorIndex(v_idx) in index_iter {
let idx = match v_idx.overflowing_sub(first_idx) {
(idx, false) => idx,
(idx, true) => idx.wrapping_add(validator_count),
};
if idx < u128::BITS {
bit_field |= 1_u128.wrapping_shl(idx); }
}
bit_field
}
fn iter_validator_bit_field(
&self,
ValidatorIndex(mut idx): ValidatorIndex,
mut bit_field: u128,
) -> impl Iterator<Item = ValidatorIndex> {
let validator_count = self.validators.len() as u32;
iter::from_fn(move || {
if bit_field == 0 || idx >= validator_count {
return None; }
let zeros = bit_field.trailing_zeros();
bit_field = bit_field.wrapping_shr(zeros);
bit_field &= !1;
idx = match idx.overflowing_add(zeros) {
(i, false) => i,
(i, true) => i
.checked_rem(validator_count)?
.wrapping_sub(validator_count),
}
.checked_rem(validator_count)?;
Some(ValidatorIndex(idx))
})
}
fn validator_bit_field_includes(
&self,
ValidatorIndex(first_idx): ValidatorIndex,
ValidatorIndex(v_idx): ValidatorIndex,
) -> bool {
let validator_count = self.validators.len() as u32;
if first_idx >= validator_count {
return false;
}
let high_bit = u128::BITS.saturating_sub(1);
let (last_idx, last_idx_overflow) = first_idx.overflowing_add(high_bit);
if v_idx >= first_idx {
last_idx_overflow || v_idx <= last_idx
} else {
let (v_idx2, v_idx2_overflow) = v_idx.overflowing_add(validator_count);
if v_idx2_overflow == last_idx_overflow {
v_idx2 <= last_idx
} else {
last_idx_overflow
}
}
}
pub(crate) fn leader(&self, round_id: RoundId) -> ValidatorIndex {
if let Some(round) = self.round(round_id) {
return round.leader();
}
self.leader_sequence.leader(u64::from(round_id))
}
fn create_message(
&mut self,
round_id: RoundId,
content: Content<C>,
) -> Option<SignedMessage<C>> {
let (validator_idx, secret_key) = if let Some(active_validator) = &self.active_validator {
(active_validator.idx, &active_validator.secret)
} else {
return None;
};
if self.paused {
return None;
}
let already_signed = match &content {
Content::Echo(_) => self.has_echoed(round_id, validator_idx),
Content::Vote(_) => self.has_voted(round_id, validator_idx),
};
if already_signed {
return None;
}
let signed_msg = SignedMessage::sign_new(
round_id,
*self.instance_id(),
content,
validator_idx,
secret_key,
);
if self.record_entry(&ZugWalEntry::SignedMessage(signed_msg.clone()))
&& self.add_content(signed_msg.clone())
{
Some(signed_msg)
} else {
debug!(
our_idx = self.our_idx(),
%round_id,
?content,
"couldn't record a signed message in the WAL or add it to the protocol state"
);
None
}
}
fn create_and_gossip_message(
&mut self,
round_id: RoundId,
content: Content<C>,
) -> ProtocolOutcomes<C> {
let maybe_signed_msg = self.create_message(round_id, content);
maybe_signed_msg
.into_iter()
.map(|signed_msg| {
let message = Message::Signed(signed_msg);
ProtocolOutcome::CreatedGossipMessage(SerializedMessage::from_message(&message))
})
.collect()
}
fn handle_fault(
&mut self,
signed_msg: SignedMessage<C>,
validator_id: C::ValidatorId,
content2: Content<C>,
signature2: C::Signature,
now: Timestamp,
) -> ProtocolOutcomes<C> {
self.record_entry(&ZugWalEntry::Evidence(
signed_msg.clone(),
content2,
signature2,
));
self.handle_fault_no_wal(signed_msg, validator_id, content2, signature2, now)
}
fn handle_fault_no_wal(
&mut self,
signed_msg: SignedMessage<C>,
validator_id: C::ValidatorId,
content2: Content<C>,
signature2: C::Signature,
now: Timestamp,
) -> ProtocolOutcomes<C> {
let validator_idx = signed_msg.validator_idx;
warn!(
our_idx = self.our_idx(),
?signed_msg,
?content2,
id = %validator_id,
"validator double-signed"
);
let fault = Fault::Direct(signed_msg, content2, signature2);
self.faults.insert(validator_idx, fault);
if Some(validator_idx) == self.active_validator.as_ref().map(|av| av.idx) {
error!(our_idx = validator_idx.0, "we are faulty; deactivating");
self.active_validator = None;
}
self.active[validator_idx] = None;
self.progress_detected = true;
let mut outcomes = vec![ProtocolOutcome::NewEvidence(validator_id)];
if self.faulty_weight() > self.params.ftt() {
outcomes.push(ProtocolOutcome::FttExceeded);
return outcomes;
}
for round in self.rounds.values_mut() {
round.remove_votes_and_echoes(validator_idx);
}
for round_id in
self.first_non_finalized_round_id..=self.rounds.keys().last().copied().unwrap_or(0)
{
if !self.rounds.contains_key(&round_id) {
continue;
}
if self.rounds[&round_id].quorum_echoes().is_none() {
let hashes = self.rounds[&round_id]
.echoes()
.keys()
.copied()
.collect_vec();
if hashes
.into_iter()
.any(|hash| self.check_new_echo_quorum(round_id, hash))
{
self.mark_dirty(round_id);
}
}
if self.check_new_vote_quorum(round_id, true)
|| self.check_new_vote_quorum(round_id, false)
{
self.mark_dirty(round_id);
}
}
debug!(round_id = ?self.current_round, "Calling update after handle_fault_no_wal");
outcomes.extend(self.update(now));
outcomes
}
fn handle_sync_request(
&self,
sync_request: SyncRequest<C>,
sender: NodeId,
) -> (ProtocolOutcomes<C>, Option<SerializedMessage>) {
let SyncRequest {
round_id,
mut proposal_hash,
mut has_proposal,
first_validator_idx,
mut echoes,
true_votes,
false_votes,
active,
faulty,
instance_id,
sync_id,
} = sync_request;
if first_validator_idx.0 >= self.validators.len() as u32 {
info!(
our_idx = self.our_idx(),
first_validator_idx = first_validator_idx.0,
%sender,
"invalid SyncRequest message"
);
return (vec![ProtocolOutcome::Disconnect(sender)], None);
}
let round = match self.round(round_id) {
Some(round) => round,
None => return (vec![], None),
};
if round.quorum_echoes() != proposal_hash && round.quorum_echoes().is_some() {
has_proposal = true;
echoes = 0;
proposal_hash = round.quorum_echoes();
}
let our_faulty = self.validator_bit_field(first_validator_idx, self.faults.keys().cloned());
let mut proposal_or_hash = None;
let mut echo_sigs = BTreeMap::new();
let mut our_echoes: u128 = 0;
if let Some(hash) = proposal_hash {
if let Some(echo_map) = round.echoes().get(&hash) {
our_echoes =
self.validator_bit_field(first_validator_idx, echo_map.keys().cloned());
let missing_echoes = our_echoes & !(echoes | faulty | our_faulty);
for v_idx in self.iter_validator_bit_field(first_validator_idx, missing_echoes) {
echo_sigs.insert(v_idx, echo_map[&v_idx]);
}
if has_proposal {
proposal_or_hash = Some(Either::Right(hash));
} else {
let leader_idx = round.leader();
if !self.validator_bit_field_includes(first_validator_idx, leader_idx) {
if let Some(signature) = echo_map.get(&leader_idx) {
echo_sigs.insert(leader_idx, *signature);
}
}
if let Some(proposal) = round.proposal() {
if *proposal.hash() == hash {
proposal_or_hash = Some(Either::Left(proposal.inner().clone()));
}
}
}
}
}
let our_true_votes: u128 = if round.quorum_votes() == Some(false) {
0
} else {
self.validator_bit_field(first_validator_idx, round.votes(true).keys_some())
};
let missing_true_votes = our_true_votes & !(true_votes | faulty | our_faulty);
let true_vote_sigs = self
.iter_validator_bit_field(first_validator_idx, missing_true_votes)
.map(|v_idx| (v_idx, round.votes(true)[v_idx].unwrap()))
.collect();
let our_false_votes: u128 = if round.quorum_votes() == Some(true) {
0
} else {
self.validator_bit_field(first_validator_idx, round.votes(false).keys_some())
};
let missing_false_votes = our_false_votes & !(false_votes | faulty | our_faulty);
let false_vote_sigs = self
.iter_validator_bit_field(first_validator_idx, missing_false_votes)
.map(|v_idx| (v_idx, round.votes(false)[v_idx].unwrap()))
.collect();
let mut outcomes = vec![];
let missing_faulty = our_faulty & !faulty;
let mut evidence = vec![];
for v_idx in self.iter_validator_bit_field(first_validator_idx, missing_faulty) {
match &self.faults[&v_idx] {
Fault::Banned => {
info!(
our_idx = self.our_idx(),
validator_index = v_idx.0,
%sender,
"peer disagrees about banned validator; disconnecting"
);
return (vec![ProtocolOutcome::Disconnect(sender)], None);
}
Fault::Direct(signed_msg, content2, signature2) => {
evidence.push((signed_msg.clone(), *content2, *signature2));
}
Fault::Indirect => {
let vid = self.validators.id(v_idx).unwrap().clone();
outcomes.push(ProtocolOutcome::SendEvidence(sender, vid));
}
}
}
let our_active = self.validator_bit_field(first_validator_idx, self.active.keys_some());
let missing_active =
our_active & !(active | our_echoes | our_true_votes | our_false_votes | our_faulty);
let signed_messages = self
.iter_validator_bit_field(first_validator_idx, missing_active)
.filter_map(|v_idx| self.active[v_idx].clone())
.collect();
let sync_response = SyncResponse {
round_id,
proposal_or_hash,
echo_sigs,
true_vote_sigs,
false_vote_sigs,
signed_messages,
evidence,
instance_id,
sync_id,
};
(
outcomes,
Some(SerializedMessage::from_message(&Message::SyncResponse(
sync_response,
))),
)
}
fn handle_sync_response(
&mut self,
sync_response: SyncResponse<C>,
sender: NodeId,
now: Timestamp,
) -> ProtocolOutcomes<C> {
let SyncResponse {
round_id,
proposal_or_hash,
echo_sigs,
true_vote_sigs,
false_vote_sigs,
signed_messages,
evidence,
instance_id,
sync_id,
} = sync_response;
if self.sent_sync_requests.try_remove_id(sync_id).is_none() {
debug!(
?round_id,
?sync_id,
"Disconnecting from peer due to unwanted sync response"
);
return vec![ProtocolOutcome::Disconnect(sender)];
}
if echo_sigs
.len()
.max(true_vote_sigs.len())
.max(false_vote_sigs.len())
> self.validators.len()
{
debug!(
?round_id,
?sync_id,
"Disconnecting from peer due to mismatching echos number"
);
return vec![ProtocolOutcome::Disconnect(sender)];
}
let local_round_id = self.current_round;
let (proposal_hash, proposal) = match proposal_or_hash {
Some(Either::Left(proposal)) => {
let hashed_prop = HashedProposal::new(proposal);
let hash = hashed_prop.hash();
debug!(?hash, ?round_id, ?local_round_id, "Got proposal from peer");
(Some(*hash), Some(hashed_prop.into_inner()))
}
Some(Either::Right(hash)) => {
debug!(
?hash,
?round_id,
?local_round_id,
"Got proposal hash from peer"
);
(Some(hash), None)
}
None => {
debug!(
?round_id,
?local_round_id,
"Got no proposal or hash from peer"
);
(None, None)
}
};
let signed_messages = {
let echo_sigs = proposal_hash
.map(move |hash| {
echo_sigs
.into_iter()
.map(move |(validator_idx, signature)| {
(validator_idx, Content::Echo(hash), signature)
})
})
.into_iter()
.flatten();
let true_vote_sigs = true_vote_sigs
.into_iter()
.map(|(validator_idx, signature)| (validator_idx, Content::Vote(true), signature));
let false_vote_sigs = false_vote_sigs
.into_iter()
.map(|(validator_idx, signature)| (validator_idx, Content::Vote(false), signature));
let sigs = echo_sigs.chain(true_vote_sigs).chain(false_vote_sigs).map(
|(validator_idx, content, signature)| SignedMessage {
round_id,
instance_id,
content,
validator_idx,
signature,
},
);
signed_messages.into_iter().chain(sigs)
};
let handle_outcomes = move || -> Result<_, FaultySender> {
let mut outcomes = vec![];
for signed_msg in signed_messages {
outcomes.extend(self.handle_signed_message(signed_msg, sender, now)?);
}
for (signed_msg, content2, signature2) in evidence {
outcomes
.extend(self.handle_evidence(signed_msg, content2, signature2, sender, now)?);
}
if let Some(proposal) = proposal {
outcomes.extend(self.handle_proposal(round_id, proposal, sender, now)?);
}
Ok(outcomes)
};
outcomes_or_disconnect(handle_outcomes())
}
fn handle_signed_message(
&mut self,
signed_msg: SignedMessage<C>,
sender: NodeId,
now: Timestamp,
) -> Result<ProtocolOutcomes<C>, FaultySender> {
let our_idx = self.our_idx();
let validator_idx = signed_msg.validator_idx;
let validator_id = if let Some(validator_id) = self.validators.id(validator_idx) {
validator_id.clone()
} else {
warn!(
our_idx,
?signed_msg,
%sender,
"invalid incoming message: validator index out of range",
);
return Err(FaultySender(sender));
};
if self.faults.contains_key(&validator_idx) {
debug!(
our_idx,
?validator_id,
"ignoring message from faulty validator"
);
return Ok(vec![]);
}
if signed_msg.round_id > self.current_round.saturating_add(MAX_FUTURE_ROUNDS) {
debug!(our_idx, ?signed_msg, "dropping message from future round");
return Ok(vec![]);
}
if self.evidence_only {
debug!(our_idx, ?signed_msg, "received an irrelevant message");
return Ok(vec![]);
}
if let Some(round) = self.round(signed_msg.round_id) {
if round.contains(&signed_msg.content, validator_idx) {
debug!(our_idx, ?signed_msg, %sender, "received a duplicated message");
return Ok(vec![]);
}
}
if !signed_msg.verify_signature(&validator_id) {
warn!(our_idx, ?signed_msg, %sender, "invalid signature",);
return Err(FaultySender(sender));
}
if let Some((content2, signature2)) = self.detect_fault(&signed_msg) {
let evidence_msg = Message::Evidence(signed_msg.clone(), content2, signature2);
let mut outcomes =
self.handle_fault(signed_msg, validator_id, content2, signature2, now);
outcomes.push(ProtocolOutcome::CreatedGossipMessage(
SerializedMessage::from_message(&evidence_msg),
));
return Ok(outcomes);
}
if self.faults.contains_key(&signed_msg.validator_idx) {
debug!(
our_idx,
?signed_msg,
"dropping message from faulty validator"
);
return Ok(vec![]);
}
self.record_entry(&ZugWalEntry::SignedMessage(signed_msg.clone()));
if self.add_content(signed_msg) {
debug!(round_id = ?self.current_round, "Calling update after add_content");
Ok(self.update(now))
} else {
Ok(vec![])
}
}
fn handle_evidence(
&mut self,
signed_msg: SignedMessage<C>,
content2: Content<C>,
signature2: C::Signature,
sender: NodeId,
now: Timestamp,
) -> Result<ProtocolOutcomes<C>, FaultySender> {
let our_idx = self.our_idx();
let validator_idx = signed_msg.validator_idx;
if let Some(Fault::Direct(..)) = self.faults.get(&validator_idx) {
return Ok(vec![]); }
let validator_id = if let Some(validator_id) = self.validators.id(validator_idx) {
validator_id.clone()
} else {
warn!(
our_idx,
?signed_msg,
%sender,
"invalid incoming evidence: validator index out of range",
);
return Err(FaultySender(sender));
};
if !signed_msg.content.contradicts(&content2) {
warn!(
our_idx,
?signed_msg,
?content2,
%sender,
"invalid evidence: contents don't conflict",
);
return Err(FaultySender(sender));
}
if !signed_msg.verify_signature(&validator_id)
|| !signed_msg
.with(content2, signature2)
.verify_signature(&validator_id)
{
warn!(
our_idx,
?signed_msg,
?content2,
%sender,
"invalid signature in evidence",
);
return Err(FaultySender(sender));
}
Ok(self.handle_fault(signed_msg, validator_id, content2, signature2, now))
}
fn handle_proposal(
&mut self,
round_id: RoundId,
proposal: Proposal<C>,
sender: NodeId,
now: Timestamp,
) -> Result<ProtocolOutcomes<C>, FaultySender> {
let leader_idx = self.leader(round_id);
let our_idx = self.our_idx();
macro_rules! log_proposal {
($lvl:expr, $prop:expr, $msg:expr $(,)?) => {
event!(
$lvl,
our_idx,
round_id,
parent = $prop.maybe_parent_round_id,
timestamp = %$prop.timestamp,
leader_idx = leader_idx.0,
?sender,
"{}",
$msg
);
}
}
if let Some(parent_round_id) = proposal.maybe_parent_round_id {
if parent_round_id >= round_id {
log_proposal!(
Level::WARN,
proposal,
"invalid proposal: parent is not from an earlier round",
);
return Err(FaultySender(sender));
}
}
if proposal.timestamp > now.saturating_add(self.config.clock_tolerance) {
log_proposal!(
Level::TRACE,
proposal,
"received a proposal with a timestamp far in the future; dropping",
);
return Ok(vec![]);
}
if proposal.timestamp > now {
log_proposal!(
Level::TRACE,
proposal,
"received a proposal with a timestamp slightly in the future",
);
}
if (proposal.maybe_parent_round_id.is_none() || proposal.maybe_block.is_none())
!= proposal.inactive.is_none()
{
log_proposal!(
Level::WARN,
proposal,
"invalid proposal: inactive must be present in all except the first and dummy proposals",
);
return Err(FaultySender(sender));
}
if let Some(inactive) = &proposal.inactive {
if inactive
.iter()
.any(|idx| *idx == leader_idx || self.validators.id(*idx).is_none())
{
log_proposal!(
Level::WARN,
proposal,
"invalid proposal: invalid inactive validator index",
);
return Err(FaultySender(sender));
}
}
let hashed_prop = HashedProposal::new(proposal);
if self
.round(round_id)
.is_none_or(|round| !round.has_echoes_for_proposal(hashed_prop.hash()))
{
log_proposal!(
Level::DEBUG,
hashed_prop.inner(),
"dropping proposal: missing echoes"
);
return Ok(vec![]);
}
if self.round(round_id).and_then(Round::proposal) == Some(&hashed_prop) {
log_proposal!(
Level::DEBUG,
hashed_prop.inner(),
"dropping proposal: we already have it"
);
return Ok(vec![]);
}
let ancestor_values = if let Some(parent_round_id) = hashed_prop.maybe_parent_round_id() {
if let Some(ancestor_values) = self.ancestor_values(parent_round_id) {
ancestor_values
} else {
log_proposal!(
Level::DEBUG,
hashed_prop.inner(),
"storing proposal for later; still missing ancestors",
);
self.proposals_waiting_for_parent
.entry(parent_round_id)
.or_default()
.entry(hashed_prop)
.or_default()
.insert((round_id, sender));
return Ok(vec![]);
}
} else {
vec![]
};
let mut outcomes = self.validate_proposal(round_id, hashed_prop, ancestor_values, sender);
debug!(round_id = ?self.current_round, "Calling update after handle_proposal");
outcomes.extend(self.update(now));
Ok(outcomes)
}
fn check_new_echo_quorum(&mut self, round_id: RoundId, hash: C::Hash) -> bool {
if self.rounds.contains_key(&round_id)
&& self.rounds[&round_id].quorum_echoes().is_none()
&& self.is_quorum(self.rounds[&round_id].echoes()[&hash].keys().copied())
{
self.round_mut(round_id).set_quorum_echoes(hash);
return true;
}
false
}
fn check_new_vote_quorum(&mut self, round_id: RoundId, vote: bool) -> bool {
if self.rounds.contains_key(&round_id)
&& self.rounds[&round_id].quorum_votes().is_none()
&& self.is_quorum(self.rounds[&round_id].votes(vote).keys_some())
{
self.round_mut(round_id).set_quorum_votes(vote);
let our_idx = self.our_idx();
if !vote {
info!(our_idx, %round_id, "round is now skippable");
} else if self.rounds[&round_id].accepted_proposal().is_none() {
info!(our_idx, %round_id, "round committed; no accepted proposal yet");
}
return true;
}
false
}
fn record_entry(&mut self, entry: &ZugWalEntry<C>) -> bool {
match self.write_wal.as_mut().map(|ww| ww.record_entry(entry)) {
None => false,
Some(Ok(())) => true,
Some(Err(err)) => {
self.active_validator = None;
self.write_wal = None;
error!(
our_idx = self.our_idx(),
%err,
"could not record a signed message to the WAL; deactivating"
);
false
}
}
}
pub(crate) fn open_wal(&mut self, wal_file: PathBuf, now: Timestamp) -> ProtocolOutcomes<C> {
let our_idx = self.our_idx();
let mut read_wal = match ReadWal::<ZugWalEntry<C>>::new(&wal_file) {
Ok(read_wal) => read_wal,
Err(err) => {
error!(our_idx, %err, "could not create a ReadWal using this file");
return vec![];
}
};
let mut outcomes = vec![];
loop {
match read_wal.read_next_entry() {
Ok(Some(next_entry)) => match next_entry {
ZugWalEntry::SignedMessage(next_message) => {
if !self.add_content(next_message) {
error!(our_idx, "Could not add content from WAL.");
return outcomes;
}
}
ZugWalEntry::Proposal(next_proposal, corresponding_round_id) => {
if self
.round(corresponding_round_id)
.and_then(Round::proposal)
.map(HashedProposal::inner)
== Some(&next_proposal)
{
warn!(our_idx, "Proposal from WAL is duplicated.");
continue;
}
let mut ancestor_values = vec![];
if let Some(mut round_id) = next_proposal.maybe_parent_round_id {
loop {
let proposal = if let Some(proposal) =
self.round(round_id).and_then(Round::proposal)
{
proposal
} else {
error!(our_idx, "Proposal from WAL is missing ancestors.");
return outcomes;
};
if self.round(round_id).and_then(Round::quorum_echoes)
!= Some(*proposal.hash())
{
error!(our_idx, "Proposal from WAL has unaccepted ancestor.");
return outcomes;
}
ancestor_values.extend(proposal.maybe_block().cloned());
match proposal.maybe_parent_round_id() {
None => break,
Some(parent_round_id) => round_id = parent_round_id,
}
}
}
if self
.round_mut(corresponding_round_id)
.insert_proposal(HashedProposal::new(next_proposal.clone()))
{
self.mark_dirty(corresponding_round_id);
if let Some(block) = next_proposal.maybe_block {
let block_context =
BlockContext::new(next_proposal.timestamp, ancestor_values);
let proposed_block = ProposedBlock::new(block, block_context);
outcomes
.push(ProtocolOutcome::HandledProposedBlock(proposed_block));
}
}
}
ZugWalEntry::Evidence(
conflicting_message,
conflicting_message_content,
conflicting_signature,
) => {
let validator_id = {
if let Some(validator_id) =
self.validators.id(conflicting_message.validator_idx)
{
validator_id.clone()
} else {
warn!(
our_idx,
index = conflicting_message.validator_idx.0,
"No validator present at this index, despite holding \
conflicting messages for it in the WAL"
);
continue;
}
};
let new_outcomes = self.handle_fault_no_wal(
conflicting_message,
validator_id,
conflicting_message_content,
conflicting_signature,
now,
);
outcomes.extend(new_outcomes.into_iter().filter(|outcome| match outcome {
ProtocolOutcome::FttExceeded
| ProtocolOutcome::WeAreFaulty
| ProtocolOutcome::FinalizedBlock(_)
| ProtocolOutcome::ValidateConsensusValue { .. }
| ProtocolOutcome::HandledProposedBlock(..)
| ProtocolOutcome::NewEvidence(_) => true,
ProtocolOutcome::SendEvidence(_, _)
| ProtocolOutcome::CreatedGossipMessage(_)
| ProtocolOutcome::CreatedTargetedMessage(_, _)
| ProtocolOutcome::CreatedMessageToRandomPeer(_)
| ProtocolOutcome::CreatedRequestToRandomPeer(_)
| ProtocolOutcome::ScheduleTimer(_, _)
| ProtocolOutcome::QueueAction(_)
| ProtocolOutcome::CreateNewBlock(_, _)
| ProtocolOutcome::DoppelgangerDetected
| ProtocolOutcome::Disconnect(_) => false,
}));
}
},
Ok(None) => {
break;
}
Err(err) => {
error!(
our_idx,
?err,
"couldn't read a message from the WAL: was this node recently shut down?"
);
return outcomes; }
}
}
match WriteWal::new(&wal_file) {
Ok(write_wal) => self.write_wal = Some(write_wal),
Err(err) => error!(
our_idx,
?err,
?wal_file,
"could not create a WAL using this file"
),
}
outcomes
}
fn add_content(&mut self, signed_msg: SignedMessage<C>) -> bool {
if self.active[signed_msg.validator_idx]
.as_ref()
.is_none_or(|old_msg| old_msg.round_id < signed_msg.round_id)
{
if self.active[signed_msg.validator_idx].is_none() {
self.mark_dirty(self.first_non_finalized_round_id);
}
self.active[signed_msg.validator_idx] = Some(signed_msg.clone());
}
let SignedMessage {
round_id,
instance_id: _,
content,
validator_idx,
signature,
} = signed_msg;
let our_idx = self.our_idx();
match content {
Content::Echo(hash) => {
if self
.round_mut(round_id)
.insert_echo(hash, validator_idx, signature)
{
debug!(our_idx, round_id, %hash, validator = validator_idx.0, "inserted echo");
self.progress_detected = true;
if self.check_new_echo_quorum(round_id, hash) {
self.mark_dirty(round_id);
}
return true;
}
}
Content::Vote(vote) => {
if self
.round_mut(round_id)
.insert_vote(vote, validator_idx, signature)
{
debug!(
our_idx,
round_id,
vote,
validator = validator_idx.0,
"inserted vote"
);
self.progress_detected = true;
if self.check_new_vote_quorum(round_id, vote) {
self.mark_dirty(round_id);
}
return true;
}
}
}
false
}
fn detect_fault(&self, signed_msg: &SignedMessage<C>) -> Option<(Content<C>, C::Signature)> {
let round = self.round(signed_msg.round_id)?;
match &signed_msg.content {
Content::Echo(hash) => round.echoes().iter().find_map(|(hash2, echo_map)| {
if hash2 == hash {
return None;
}
echo_map
.get(&signed_msg.validator_idx)
.map(|sig| (Content::Echo(*hash2), *sig))
}),
Content::Vote(vote) => {
round.votes(!vote)[signed_msg.validator_idx].map(|sig| (Content::Vote(!vote), sig))
}
}
}
fn schedule_update(&mut self, timestamp: Timestamp) -> ProtocolOutcomes<C> {
debug!(our_idx = self.our_idx(), %timestamp, "schedule update");
if self.next_scheduled_update > timestamp {
self.next_scheduled_update = timestamp;
vec![ProtocolOutcome::ScheduleTimer(timestamp, TIMER_ID_UPDATE)]
} else {
vec![]
}
}
fn update(&mut self, now: Timestamp) -> ProtocolOutcomes<C> {
let mut outcomes = vec![];
if self.finalized_switch_block() || self.faulty_weight() > self.params.ftt() {
return outcomes; }
if let Some(dirty_round_id) = self.maybe_dirty_round_id {
for round_id in dirty_round_id.. {
outcomes.extend(self.update_round(round_id, now));
if round_id >= self.current_round {
break;
}
}
}
self.maybe_dirty_round_id = None;
outcomes
}
fn update_round(&mut self, round_id: RoundId, now: Timestamp) -> ProtocolOutcomes<C> {
self.create_round(round_id);
let mut outcomes = vec![];
let mut voted_on_round_outcome = false;
if let Some(&hash) = self.rounds[&round_id].proposal().map(HashedProposal::hash) {
outcomes.extend(self.create_and_gossip_message(round_id, Content::Echo(hash)));
}
if self.update_accepted_proposal(round_id) {
if round_id == self.current_round {
self.update_proposal_timeout(now);
}
outcomes.extend(self.create_and_gossip_message(round_id, Content::Vote(true)));
voted_on_round_outcome = true;
if let Some(proposals) = self.proposals_waiting_for_parent.remove(&round_id) {
let ancestor_values = self
.ancestor_values(round_id)
.expect("missing ancestors of accepted proposal");
for (proposal, rounds_and_senders) in proposals {
for (proposal_round_id, sender) in rounds_and_senders {
outcomes.extend(self.validate_proposal(
proposal_round_id,
proposal.clone(),
ancestor_values.clone(),
sender,
));
}
}
}
}
if round_id == self.current_round {
let our_idx = self.our_idx();
let current_round_start = self.current_round_start;
let current_timeout = current_round_start.saturating_add(self.proposal_timeout());
if now >= current_timeout {
debug!(?round_id, "Voting false due to timeout");
let msg_outcomes = self.create_and_gossip_message(round_id, Content::Vote(false));
voted_on_round_outcome = true;
if !msg_outcomes.is_empty() {
self.update_proposal_timeout(now);
}
outcomes.extend(msg_outcomes);
} else if self.faults.contains_key(&self.leader(round_id)) {
debug!(?round_id, "Voting false due to faults");
outcomes.extend(self.create_and_gossip_message(round_id, Content::Vote(false)));
voted_on_round_outcome = true;
}
if self.is_skippable_round(round_id) || self.has_accepted_proposal(round_id) {
self.current_round_start = Timestamp::MAX;
self.current_round = self.current_round.saturating_add(1);
info!(
our_idx,
round_id = self.current_round,
leader = self.leader(self.current_round).0,
"started a new round"
);
} else if let Some((maybe_parent_round_id, timestamp)) = self.suitable_parent_round(now)
{
if now < timestamp {
debug!(our_idx, %now, %timestamp, "update_round - schedule update 1");
outcomes.extend(self.schedule_update(timestamp));
} else if self.current_round_start > now {
self.current_round_start = now;
outcomes.extend(self.propose_if_leader(maybe_parent_round_id, now));
let current_timeout = self
.current_round_start
.saturating_add(self.proposal_timeout());
if current_timeout > now {
debug!(our_idx, %now, %current_timeout, "update_round - schedule update 2");
outcomes.extend(self.schedule_update(current_timeout));
}
} else if !voted_on_round_outcome {
debug!(round_id, "Scheduling proposal recheck");
let updated_timestamp = now.saturating_add(self.proposal_timeout());
outcomes.extend(self.schedule_update(updated_timestamp));
}
} else {
error!(our_idx, "No suitable parent for current round");
}
}
if self.has_accepted_proposal(round_id) && self.is_committed_round(round_id) {
outcomes.extend(self.finalize_round(round_id));
}
outcomes
}
fn update_accepted_proposal(&mut self, round_id: RoundId) -> bool {
if self.has_accepted_proposal(round_id) {
return false; }
let proposal = if let Some(proposal) = self.round(round_id).and_then(Round::proposal) {
proposal
} else {
return false; };
if self.round(round_id).and_then(Round::quorum_echoes) != Some(*proposal.hash()) {
return false; }
if let Some(inactive) = proposal.inactive() {
for (idx, _) in self.validators.enumerate_ids() {
if !inactive.contains(&idx)
&& self.active[idx].is_none()
&& !self.faults.contains_key(&idx)
{
return false;
}
}
}
let (first_skipped_round_id, rel_height) =
if let Some(parent_round_id) = proposal.maybe_parent_round_id() {
if let Some((parent_height, _)) = self
.round(parent_round_id)
.and_then(Round::accepted_proposal)
{
(
parent_round_id.saturating_add(1),
parent_height.saturating_add(1),
)
} else {
return false; }
} else {
(0, 0)
};
if (first_skipped_round_id..round_id)
.any(|skipped_round_id| !self.is_skippable_round(skipped_round_id))
{
return false; }
self.round_mut(round_id)
.set_accepted_proposal_height(rel_height);
true
}
fn validate_proposal(
&mut self,
round_id: RoundId,
proposal: HashedProposal<C>,
ancestor_values: Vec<C::ConsensusValue>,
sender: NodeId,
) -> ProtocolOutcomes<C> {
let our_idx = self.our_idx();
if proposal.timestamp() < self.params.start_timestamp() {
info!(
our_idx,
"rejecting proposal with timestamp earlier than era start"
);
return vec![];
}
if let Some((_, parent_proposal)) = proposal
.maybe_parent_round_id()
.and_then(|parent_round_id| self.accepted_proposal(parent_round_id))
{
let min_block_time = self.params.min_block_time();
if proposal.timestamp() < parent_proposal.timestamp().saturating_add(min_block_time) {
info!(
our_idx,
"rejecting proposal with timestamp earlier than the parent"
);
return vec![];
}
if let (Some(inactive), Some(parent_inactive)) =
(proposal.inactive(), parent_proposal.inactive())
{
if !inactive.is_subset(parent_inactive) {
info!(
our_idx,
"rejecting proposal with more inactive validators than parent"
);
return vec![];
}
}
}
let block_context = BlockContext::new(proposal.timestamp(), ancestor_values);
if let Some(block) = proposal
.maybe_block()
.filter(|value| value.needs_validation())
.cloned()
{
self.log_proposal(&proposal, round_id, "requesting proposal validation");
let proposed_block = ProposedBlock::new(block, block_context);
if self
.proposals_waiting_for_validation
.entry(proposed_block.clone())
.or_default()
.insert((round_id, proposal, sender))
{
return vec![ProtocolOutcome::ValidateConsensusValue {
sender,
proposed_block,
}];
}
} else {
self.log_proposal(&proposal, round_id, "proposal does not need validation");
if self.round_mut(round_id).insert_proposal(proposal.clone()) {
self.record_entry(&ZugWalEntry::Proposal(proposal.inner().clone(), round_id));
self.progress_detected = true;
self.mark_dirty(round_id);
if let Some(block) = proposal.maybe_block().cloned() {
let proposed_block = ProposedBlock::new(block, block_context);
return vec![ProtocolOutcome::HandledProposedBlock(proposed_block)];
}
}
}
vec![] }
fn finalize_round(&mut self, round_id: RoundId) -> ProtocolOutcomes<C> {
let mut outcomes = vec![];
if round_id < self.first_non_finalized_round_id {
return outcomes; }
let (relative_height, proposal) = if let Some((height, proposal)) =
self.round(round_id).and_then(Round::accepted_proposal)
{
(height, proposal.clone())
} else {
error!(
our_idx = self.our_idx(),
round_id, "missing finalized proposal; this is a bug"
);
return outcomes;
};
if let Some(parent_round_id) = proposal.maybe_parent_round_id() {
outcomes.extend(self.finalize_round(parent_round_id));
}
for prune_round_id in self.first_non_finalized_round_id..round_id {
info!(
our_idx = self.our_idx(),
round_id = prune_round_id,
"skipped round"
);
self.round_mut(prune_round_id).prune_skipped();
}
self.first_non_finalized_round_id = round_id.saturating_add(1);
let value = if let Some(block) = proposal.maybe_block() {
block.clone()
} else {
return outcomes; };
let proposer = self
.validators
.id(self.leader(round_id))
.expect("validator not found")
.clone();
let terminal_block_data = self.accepted_switch_block(round_id).then(|| {
let inactive_validators = proposal.inactive().map_or_else(Vec::new, |inactive| {
inactive
.iter()
.filter_map(|idx| self.validators.id(*idx))
.cloned()
.collect()
});
TerminalBlockData {
inactive_validators,
}
});
let finalized_block = FinalizedBlock {
value,
timestamp: proposal.timestamp(),
relative_height,
equivocators: vec![],
terminal_block_data,
proposer,
};
outcomes.push(ProtocolOutcome::FinalizedBlock(finalized_block));
outcomes
}
fn propose_if_leader(
&mut self,
maybe_parent_round_id: Option<RoundId>,
now: Timestamp,
) -> ProtocolOutcomes<C> {
match &self.active_validator {
Some(active_validator) if active_validator.idx == self.leader(self.current_round) => {}
_ => return vec![], }
match self.pending_proposal {
Some((_, round_id, _)) if round_id == self.current_round => return vec![],
_ => {}
}
if self.round_mut(self.current_round).has_proposal() {
return vec![]; }
let ancestor_values = match maybe_parent_round_id {
Some(parent_round_id)
if self.accepted_switch_block(parent_round_id)
|| self.accepted_dummy_proposal(parent_round_id) =>
{
return self.create_echo_and_proposal(Proposal::dummy(now, parent_round_id));
}
Some(parent_round_id) => self
.ancestor_values(parent_round_id)
.expect("missing ancestor value"),
None => vec![],
};
let block_context = BlockContext::new(now, ancestor_values);
self.pending_proposal = Some((
block_context.clone(),
self.current_round,
maybe_parent_round_id,
));
vec![ProtocolOutcome::CreateNewBlock(
block_context,
now.saturating_add(TimeDiff::from_millis(self.proposal_timeout_millis as u64)),
)]
}
fn create_echo_and_proposal(&mut self, proposal: Proposal<C>) -> ProtocolOutcomes<C> {
let round_id = self.current_round;
let hashed_prop = HashedProposal::new(proposal.clone());
let echo_content = Content::Echo(*hashed_prop.hash());
let echo = if let Some(echo) = self.create_message(round_id, echo_content) {
echo
} else {
return vec![];
};
let prop_msg = Message::Proposal {
round_id,
proposal,
instance_id: *self.instance_id(),
echo,
};
if !self.record_entry(&ZugWalEntry::Proposal(
hashed_prop.inner().clone(),
round_id,
)) {
error!(
our_idx = self.our_idx(),
"could not record own proposal in WAL"
);
vec![]
} else if self.round_mut(round_id).insert_proposal(hashed_prop) {
self.mark_dirty(round_id);
vec![ProtocolOutcome::CreatedGossipMessage(
SerializedMessage::from_message(&prop_msg),
)]
} else {
vec![]
}
}
fn suitable_parent_round(&self, now: Timestamp) -> Option<(Option<RoundId>, Timestamp)> {
let min_block_time = self.params.min_block_time();
let mut maybe_parent = None;
for round_id in (0..self.current_round).rev() {
if let Some((_, parent)) = self.accepted_proposal(round_id) {
let timestamp = parent.timestamp().saturating_add(min_block_time);
if now >= timestamp {
return Some((Some(round_id), timestamp));
}
if maybe_parent.is_none_or(|(_, timestamp2)| timestamp2 > timestamp) {
maybe_parent = Some((Some(round_id), timestamp));
}
}
if !self.is_skippable_round(round_id) {
return maybe_parent;
}
}
Some((None, self.params.start_timestamp()))
}
fn is_skippable_round(&self, round_id: RoundId) -> bool {
self.rounds.get(&round_id).and_then(Round::quorum_votes) == Some(false)
}
fn is_committed_round(&self, round_id: RoundId) -> bool {
self.rounds.get(&round_id).and_then(Round::quorum_votes) == Some(true)
}
fn has_accepted_proposal(&self, round_id: RoundId) -> bool {
self.round(round_id)
.and_then(Round::accepted_proposal)
.is_some()
}
fn accepted_proposal(&self, round_id: RoundId) -> Option<(u64, &HashedProposal<C>)> {
self.round(round_id)?.accepted_proposal()
}
fn proposal_timeout(&self) -> TimeDiff {
TimeDiff::from_millis(self.proposal_timeout_millis as u64)
}
fn update_proposal_timeout(&mut self, now: Timestamp) {
let proposal_delay_millis = now.saturating_diff(self.current_round_start).millis() as f64;
let grace_period_factor = self.config.proposal_grace_period as f64 / 100.0 + 1.0;
let target_timeout = proposal_delay_millis * grace_period_factor;
let inertia = self.config.proposal_timeout_inertia as f64;
let ftt = self.params.ftt().0 as f64 / self.validators.total_weight().0 as f64;
if target_timeout > self.proposal_timeout_millis {
self.proposal_timeout_millis *= (1.0 / (inertia * (1.0 - ftt))).exp2();
self.proposal_timeout_millis = self.proposal_timeout_millis.min(target_timeout);
} else {
self.proposal_timeout_millis *= (-1.0 / (inertia * (1.0 + ftt))).exp2();
let min_timeout = (self.config.proposal_timeout.millis() as f64).max(target_timeout);
self.proposal_timeout_millis = self.proposal_timeout_millis.max(min_timeout);
}
debug!(our_idx = self.our_idx(), %self.proposal_timeout_millis, "proposal timeout updated");
}
fn is_quorum(&self, vidxs: impl Iterator<Item = ValidatorIndex>) -> bool {
let mut sum = self.faulty_weight();
let quorum_threshold = self.quorum_threshold();
if sum > quorum_threshold {
return true;
}
for vidx in vidxs {
if !self.faults.contains_key(&vidx) {
sum = sum.saturating_add(self.validators.weight(vidx));
if sum > quorum_threshold {
return true;
}
}
}
false
}
fn ancestor_values(&self, mut round_id: RoundId) -> Option<Vec<C::ConsensusValue>> {
let mut ancestor_values = vec![];
loop {
let (_, proposal) = self.accepted_proposal(round_id)?;
ancestor_values.extend(proposal.maybe_block().cloned());
match proposal.maybe_parent_round_id() {
None => return Some(ancestor_values),
Some(parent_round_id) => round_id = parent_round_id,
}
}
}
fn quorum_threshold(&self) -> Weight {
let total_weight = self.validators.total_weight().0;
let ftt = self.params.ftt().0;
let (sum, sum_overflow) = total_weight.overflowing_add(ftt);
if sum_overflow {
Weight((sum / 2) | 1u64.reverse_bits()) } else {
Weight(sum / 2)
}
}
fn faulty_weight(&self) -> Weight {
self.sum_weights(self.faults.keys())
}
fn sum_weights<'a>(&self, vidxs: impl Iterator<Item = &'a ValidatorIndex>) -> Weight {
vidxs.map(|vidx| self.validators.weight(*vidx)).sum()
}
fn round(&self, round_id: RoundId) -> Option<&Round<C>> {
self.rounds.get(&round_id)
}
fn round_mut(&mut self, round_id: RoundId) -> &mut Round<C> {
match self.rounds.entry(round_id) {
btree_map::Entry::Occupied(entry) => entry.into_mut(),
btree_map::Entry::Vacant(entry) => {
let leader_idx = self.leader_sequence.leader(u64::from(round_id));
entry.insert(Round::new(self.validators.len(), leader_idx))
}
}
}
fn create_round(&mut self, round_id: RoundId) {
self.round_mut(round_id); }
fn mark_dirty(&mut self, round_id: RoundId) {
if round_id <= self.current_round
&& self.maybe_dirty_round_id.is_none_or(|r_id| r_id > round_id)
{
self.maybe_dirty_round_id = Some(round_id);
}
}
}
impl<C> ConsensusProtocol<C> for Zug<C>
where
C: Context + 'static,
{
fn handle_message(
&mut self,
_rng: &mut NodeRng,
sender: NodeId,
msg: SerializedMessage,
now: Timestamp,
) -> ProtocolOutcomes<C> {
let our_idx = self.our_idx();
match msg.deserialize_incoming() {
Err(err) => {
warn!(%sender, %err, "failed to deserialize Zug message");
vec![ProtocolOutcome::Disconnect(sender)]
}
Ok(zug_msg) if zug_msg.instance_id() != self.instance_id() => {
let instance_id = zug_msg.instance_id();
warn!(our_idx, ?instance_id, %sender, "wrong instance ID; disconnecting");
vec![ProtocolOutcome::Disconnect(sender)]
}
Ok(Message::SyncResponse(sync_response)) => {
self.handle_sync_response(sync_response, sender, now)
}
Ok(Message::Proposal {
round_id,
instance_id: _,
proposal,
echo,
}) => {
debug!(our_idx, %sender, %proposal, %round_id, "handling proposal with echo");
let outcomes = || {
let mut outcomes = self.handle_signed_message(echo, sender, now)?;
outcomes.extend(self.handle_proposal(round_id, proposal, sender, now)?);
Ok(outcomes)
};
outcomes_or_disconnect(outcomes())
}
Ok(Message::Signed(signed_msg)) => {
outcomes_or_disconnect(self.handle_signed_message(signed_msg, sender, now))
}
Ok(Message::Evidence(signed_msg, content2, signature2)) => outcomes_or_disconnect(
self.handle_evidence(signed_msg, content2, signature2, sender, now),
),
}
}
fn handle_request_message(
&mut self,
_rng: &mut NodeRng,
sender: NodeId,
msg: SerializedMessage,
_now: Timestamp,
) -> (ProtocolOutcomes<C>, Option<SerializedMessage>) {
let our_idx = self.our_idx();
match msg.deserialize_incoming::<SyncRequest<C>>() {
Err(err) => {
warn!(
our_idx,
%sender,
%err,
"could not deserialize Zug message"
);
(vec![ProtocolOutcome::Disconnect(sender)], None)
}
Ok(sync_request) if sync_request.instance_id != *self.instance_id() => {
let instance_id = sync_request.instance_id;
warn!(our_idx, ?instance_id, %sender, "wrong instance ID; disconnecting");
(vec![ProtocolOutcome::Disconnect(sender)], None)
}
Ok(sync_request) => self.handle_sync_request(sync_request, sender),
}
}
fn handle_timer(
&mut self,
timestamp: Timestamp,
now: Timestamp,
timer_id: TimerId,
rng: &mut NodeRng,
) -> ProtocolOutcomes<C> {
match timer_id {
TIMER_ID_SYNC_PEER => self.handle_sync_peer_timer(now, rng),
TIMER_ID_UPDATE => {
if timestamp >= self.next_scheduled_update {
self.next_scheduled_update = Timestamp::MAX;
}
let current_round = self.current_round;
self.mark_dirty(current_round);
debug!(?current_round, "TIMER_ID_UPDATE");
self.update(now)
}
TIMER_ID_LOG_PARTICIPATION => {
self.log_participation();
match self.config.log_participation_interval {
Some(interval) if !self.evidence_only && !self.finalized_switch_block() => {
vec![ProtocolOutcome::ScheduleTimer(
now.saturating_add(interval),
timer_id,
)]
}
_ => vec![],
}
}
timer_id => {
error!(
our_idx = self.our_idx(),
timer_id = timer_id.0,
"unexpected timer ID"
);
vec![]
}
}
}
fn handle_is_current(&self, now: Timestamp) -> ProtocolOutcomes<C> {
let mut outcomes = vec![];
if let Some(interval) = self.config.sync_state_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now.max(self.params.start_timestamp())
.saturating_add(interval),
TIMER_ID_SYNC_PEER,
));
}
if let Some(interval) = self.config.log_participation_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now.max(self.params.start_timestamp())
.saturating_add(interval),
TIMER_ID_LOG_PARTICIPATION,
));
}
outcomes
}
fn handle_action(&mut self, action_id: ActionId, now: Timestamp) -> ProtocolOutcomes<C> {
error!(our_idx = self.our_idx(), ?action_id, %now, "unexpected action");
vec![]
}
fn propose(&mut self, proposed_block: ProposedBlock<C>, now: Timestamp) -> ProtocolOutcomes<C> {
let maybe_parent_round_id = if let Some((block_context, round_id, maybe_parent_round_id)) =
self.pending_proposal.take()
{
if block_context != *proposed_block.context() || round_id != self.current_round {
warn!(our_idx = self.our_idx(), %proposed_block, "skipping outdated proposal");
self.pending_proposal = Some((block_context, round_id, maybe_parent_round_id));
return vec![];
}
maybe_parent_round_id
} else {
error!(our_idx = self.our_idx(), "unexpected call to propose");
return vec![];
};
let inactive = self
.validators
.enumerate_ids()
.map(|(idx, _)| idx)
.filter(|idx| self.active[*idx].is_none() && !self.faults.contains_key(idx));
let proposal = Proposal::with_block(&proposed_block, maybe_parent_round_id, inactive);
let mut outcomes = self.create_echo_and_proposal(proposal);
let round_id = self.current_round;
warn!(?round_id, "Calling update after proposal");
outcomes.extend(self.update(now));
outcomes
}
fn resolve_validity(
&mut self,
proposed_block: ProposedBlock<C>,
valid: bool,
now: Timestamp,
) -> ProtocolOutcomes<C> {
let rounds_and_node_ids = self
.proposals_waiting_for_validation
.remove(&proposed_block)
.into_iter()
.flatten();
let mut outcomes = vec![];
if valid {
for (round_id, proposal, _sender) in rounds_and_node_ids {
info!(our_idx = self.our_idx(), %round_id, %proposal, "handling valid proposal");
if self.round_mut(round_id).insert_proposal(proposal.clone()) {
self.record_entry(&ZugWalEntry::Proposal(proposal.into_inner(), round_id));
self.mark_dirty(round_id);
self.progress_detected = true;
outcomes.push(ProtocolOutcome::HandledProposedBlock(
proposed_block.clone(),
));
}
}
outcomes.extend(self.update(now));
} else {
for (round_id, proposal, sender) in rounds_and_node_ids {
let validator_index = self.leader(round_id).0;
info!(
our_idx = self.our_idx(),
%validator_index,
%round_id,
%sender,
%proposal,
"dropping invalid proposal"
);
}
}
outcomes
}
fn activate_validator(
&mut self,
our_id: C::ValidatorId,
secret: C::ValidatorSecret,
now: Timestamp,
wal_file: Option<PathBuf>,
) -> ProtocolOutcomes<C> {
let mut outcomes = vec![];
if self.write_wal.is_none() {
if let Some(wal_file) = wal_file {
outcomes.extend(self.open_wal(wal_file, now));
}
if self.write_wal.is_none() {
error!(?our_id, "missing WAL file; not activating");
return vec![];
}
}
if let Some(idx) = self.validators.get_index(&our_id) {
if self.faults.contains_key(&idx) {
error!(our_idx = idx.0, "we are faulty; not activating");
return outcomes;
}
info!(our_idx = idx.0, "start voting");
self.active_validator = Some(ActiveValidator { idx, secret });
debug!(
our_idx = idx.0,
%now,
start_timestamp=%self.params.start_timestamp(),
"activate_validator - schedule update"
);
outcomes.extend(self.schedule_update(self.params.start_timestamp().max(now)));
} else {
error!(
?our_id,
"we are not a validator in this era; not activating"
);
}
outcomes
}
fn deactivate_validator(&mut self) {
self.active_validator = None;
}
fn set_evidence_only(&mut self) {
self.evidence_only = true;
self.rounds.clear();
self.proposals_waiting_for_parent.clear();
self.proposals_waiting_for_validation.clear();
}
fn has_evidence(&self, vid: &C::ValidatorId) -> bool {
self.validators
.get_index(vid)
.and_then(|idx| self.faults.get(&idx))
.is_some_and(Fault::is_direct)
}
fn mark_faulty(&mut self, vid: &C::ValidatorId) {
if let Some(idx) = self.validators.get_index(vid) {
self.faults.entry(idx).or_insert(Fault::Indirect);
}
}
fn send_evidence(&self, peer: NodeId, vid: &C::ValidatorId) -> ProtocolOutcomes<C> {
self.validators
.get_index(vid)
.and_then(|idx| self.faults.get(&idx))
.cloned()
.map(|fault| match fault {
Fault::Direct(msg, content, sign) => {
vec![ProtocolOutcome::CreatedTargetedMessage(
SerializedMessage::from_message(&Message::Evidence(msg, content, sign)),
peer,
)]
}
_ => vec![],
})
.unwrap_or_default()
}
fn set_paused(&mut self, paused: bool, now: Timestamp) -> ProtocolOutcomes<C> {
if self.paused && !paused {
info!(
our_idx = self.our_idx(),
current_round = self.current_round,
"unpausing consensus"
);
self.paused = paused;
self.current_round_start = Timestamp::MAX;
let round_id = self.current_round;
self.mark_dirty(round_id);
debug!(?round_id, "Calling update after unpausing");
self.update(now)
} else {
if self.paused != paused {
info!(
our_idx = self.our_idx(),
current_round = self.current_round,
"pausing consensus"
);
}
self.paused = paused;
vec![]
}
}
fn validators_with_evidence(&self) -> Vec<&C::ValidatorId> {
self.faults
.iter()
.filter(|(_, fault)| fault.is_direct())
.filter_map(|(vidx, _)| self.validators.id(*vidx))
.collect()
}
fn as_any(&self) -> &dyn Any {
self
}
fn is_active(&self) -> bool {
self.active_validator.is_some()
}
fn instance_id(&self) -> &C::InstanceId {
self.params.instance_id()
}
fn next_round_length(&self) -> Option<TimeDiff> {
Some(self.params.min_block_time())
}
}
fn outcomes_or_disconnect<C: Context>(
result: Result<ProtocolOutcomes<C>, FaultySender>,
) -> ProtocolOutcomes<C> {
result.unwrap_or_else(|sender| vec![ProtocolOutcome::Disconnect(sender.0)])
}
mod specimen_support {
use std::collections::BTreeSet;
use crate::{
components::consensus::{utils::ValidatorIndex, ClContext},
utils::specimen::{
btree_map_distinct_from_prop, btree_set_distinct_from_prop, largest_variant,
vec_prop_specimen, Cache, LargeUniqueSequence, LargestSpecimen, SizeEstimator,
},
};
use super::{
message::{
Content, ContentDiscriminants, Message, MessageDiscriminants, SignedMessage,
SyncResponse,
},
proposal::Proposal,
SyncRequest,
};
impl LargestSpecimen for Message<ClContext> {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
largest_variant::<Self, MessageDiscriminants, _, _>(
estimator,
|variant| match variant {
MessageDiscriminants::SyncResponse => {
Message::SyncResponse(LargestSpecimen::largest_specimen(estimator, cache))
}
MessageDiscriminants::Proposal => Message::Proposal {
round_id: LargestSpecimen::largest_specimen(estimator, cache),
instance_id: LargestSpecimen::largest_specimen(estimator, cache),
proposal: LargestSpecimen::largest_specimen(estimator, cache),
echo: LargestSpecimen::largest_specimen(estimator, cache),
},
MessageDiscriminants::Signed => {
Message::Signed(LargestSpecimen::largest_specimen(estimator, cache))
}
MessageDiscriminants::Evidence => Message::Evidence(
LargestSpecimen::largest_specimen(estimator, cache),
LargestSpecimen::largest_specimen(estimator, cache),
LargestSpecimen::largest_specimen(estimator, cache),
),
},
)
}
}
impl LargestSpecimen for SyncRequest<ClContext> {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
SyncRequest {
round_id: LargestSpecimen::largest_specimen(estimator, cache),
proposal_hash: LargestSpecimen::largest_specimen(estimator, cache),
has_proposal: LargestSpecimen::largest_specimen(estimator, cache),
first_validator_idx: LargestSpecimen::largest_specimen(estimator, cache),
echoes: LargestSpecimen::largest_specimen(estimator, cache),
true_votes: LargestSpecimen::largest_specimen(estimator, cache),
false_votes: LargestSpecimen::largest_specimen(estimator, cache),
active: LargestSpecimen::largest_specimen(estimator, cache),
faulty: LargestSpecimen::largest_specimen(estimator, cache),
instance_id: LargestSpecimen::largest_specimen(estimator, cache),
sync_id: LargestSpecimen::largest_specimen(estimator, cache),
}
}
}
impl<E> LargeUniqueSequence<E> for ValidatorIndex
where
E: SizeEstimator,
{
fn large_unique_sequence(
_estimator: &E,
count: usize,
_cache: &mut Cache,
) -> BTreeSet<Self> {
Iterator::map((0..u32::MAX).rev(), ValidatorIndex::from)
.take(count)
.collect()
}
}
impl LargestSpecimen for SyncResponse<ClContext> {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
SyncResponse {
round_id: LargestSpecimen::largest_specimen(estimator, cache),
proposal_or_hash: LargestSpecimen::largest_specimen(estimator, cache),
echo_sigs: btree_map_distinct_from_prop(estimator, "validator_count", cache),
true_vote_sigs: btree_map_distinct_from_prop(estimator, "validator_count", cache),
false_vote_sigs: btree_map_distinct_from_prop(estimator, "validator_count", cache),
signed_messages: vec_prop_specimen(estimator, "validator_count", cache),
evidence: vec_prop_specimen(estimator, "validator_count", cache),
instance_id: LargestSpecimen::largest_specimen(estimator, cache),
sync_id: LargestSpecimen::largest_specimen(estimator, cache),
}
}
}
impl LargestSpecimen for Proposal<ClContext> {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
Proposal {
timestamp: LargestSpecimen::largest_specimen(estimator, cache),
maybe_block: LargestSpecimen::largest_specimen(estimator, cache),
maybe_parent_round_id: LargestSpecimen::largest_specimen(estimator, cache),
inactive: Some(btree_set_distinct_from_prop(
estimator,
"validator_count",
cache,
)),
}
}
}
impl LargestSpecimen for ValidatorIndex {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
u32::largest_specimen(estimator, cache).into()
}
}
impl LargestSpecimen for SignedMessage<ClContext> {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
SignedMessage::sign_new(
LargestSpecimen::largest_specimen(estimator, cache),
LargestSpecimen::largest_specimen(estimator, cache),
LargestSpecimen::largest_specimen(estimator, cache),
LargestSpecimen::largest_specimen(estimator, cache),
&LargestSpecimen::largest_specimen(estimator, cache),
)
}
}
impl LargestSpecimen for Content<ClContext> {
fn largest_specimen<E: SizeEstimator>(estimator: &E, cache: &mut Cache) -> Self {
if let Some(item) = cache.get::<Self>() {
return *item;
}
let item = largest_variant::<Self, ContentDiscriminants, _, _>(estimator, |variant| {
match variant {
ContentDiscriminants::Echo => {
Content::Echo(LargestSpecimen::largest_specimen(estimator, cache))
}
ContentDiscriminants::Vote => {
Content::Vote(LargestSpecimen::largest_specimen(estimator, cache))
}
}
});
*cache.set(item)
}
}
}
mod registered_sync {
use crate::{
types::{DataSize, NodeRng},
utils::specimen::{Cache, LargestSpecimen, SizeEstimator},
};
use casper_types::{TimeDiff, Timestamp};
use rand::Rng as _;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Default, DataSize, Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash)]
pub struct RegisteredSync(BTreeMap<RandomId, Timestamp>);
#[derive(
DataSize, Copy, Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Hash, Ord, PartialOrd,
)]
pub struct RandomId(u64);
impl RegisteredSync {
fn prune_old(&mut self) {
const ONE_MIN: TimeDiff = TimeDiff::from_seconds(60);
self.0.retain(|_, timestamp| timestamp.elapsed() < ONE_MIN);
}
pub fn create_and_register_new_id(&mut self, rng: &mut NodeRng) -> RandomId {
self.prune_old();
let id = loop {
let id = RandomId::new(rng);
if self.0.contains_key(&id) == false {
break id;
}
};
self.0.insert(id, Timestamp::now());
id
}
pub fn try_remove_id(&mut self, id: RandomId) -> Option<RandomId> {
self.0.remove(&id)?;
Some(id)
}
}
impl RandomId {
pub fn new(rng: &mut NodeRng) -> Self {
RandomId(rng.gen())
}
}
impl LargestSpecimen for RandomId {
fn largest_specimen<E: SizeEstimator>(_estimator: &E, _cache: &mut Cache) -> Self {
RandomId(u64::MAX)
}
}
}