use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fmt::{self, Debug, Formatter},
path::PathBuf,
rc::Rc,
time::Duration,
};
use anyhow::Error;
use blake2::{
digest::{Update, VariableOutput},
VarBlake2b,
};
use datasize::DataSize;
use itertools::Itertools;
use prometheus::Registry;
use rand::Rng;
use tracing::{info, trace, warn};
use casper_types::{AsymmetricType, ProtocolVersion, PublicKey, SecretKey, U512};
use crate::{
components::consensus::{
candidate_block::CandidateBlock,
cl_context::{ClContext, Keypair},
consensus_protocol::{
BlockContext, ConsensusProtocol, EraEnd, FinalizedBlock as CpFinalizedBlock,
ProtocolOutcome,
},
metrics::ConsensusMetrics,
traits::NodeIdT,
Config, ConsensusMessage, Event, ReactorEventT,
},
crypto::hash::Digest,
effect::{EffectBuilder, EffectExt, Effects, Responder},
fatal,
types::{
BlockHash, BlockHeader, BlockLike, FinalitySignature, FinalizedBlock, ProtoBlock, Timestamp,
},
utils::WithDir,
NodeRng,
};
pub use self::era::{Era, EraId};
use crate::components::{
consensus::config::ProtocolConfig, contract_runtime::ValidatorWeightsByEraIdRequest,
};
mod era;
type ConsensusConstructor<I> = dyn Fn(
Digest, BTreeMap<PublicKey, U512>, &HashSet<PublicKey>, &ProtocolConfig, Option<&dyn ConsensusProtocol<I, ClContext>>, Timestamp, u64, ) -> Box<dyn ConsensusProtocol<I, ClContext>>;
#[derive(DataSize)]
pub struct EraSupervisor<I> {
active_eras: HashMap<EraId, Era<I>>,
pub(super) secret_signing_key: Rc<SecretKey>,
pub(super) public_signing_key: PublicKey,
current_era: EraId,
protocol_config: ProtocolConfig,
#[data_size(skip)] new_consensus: Box<ConsensusConstructor<I>>,
node_start_time: Timestamp,
bonded_eras: u64,
next_block_height: u64,
#[data_size(skip)]
metrics: ConsensusMetrics,
finished_joining: bool,
unit_hashes_folder: PathBuf,
}
impl<I> Debug for EraSupervisor<I> {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
let ae: Vec<_> = self.active_eras.keys().collect();
write!(formatter, "EraSupervisor {{ active_eras: {:?}, .. }}", ae)
}
}
impl<I> EraSupervisor<I>
where
I: NodeIdT,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<REv: ReactorEventT<I>>(
timestamp: Timestamp,
config: WithDir<Config>,
effect_builder: EffectBuilder<REv>,
validators: BTreeMap<PublicKey, U512>,
protocol_config: ProtocolConfig,
genesis_state_root_hash: Digest,
registry: &Registry,
new_consensus: Box<ConsensusConstructor<I>>,
mut rng: &mut NodeRng,
) -> Result<(Self, Effects<Event<I>>), Error> {
let unit_hashes_folder = config.with_dir(config.value().unit_hashes_folder.clone());
let (root, config) = config.into_parts();
let secret_signing_key = Rc::new(config.secret_key_path.load(root)?);
let public_signing_key = PublicKey::from(secret_signing_key.as_ref());
let bonded_eras: u64 = protocol_config.unbonding_delay - protocol_config.auction_delay;
let metrics = ConsensusMetrics::new(registry)
.expect("failure to setup and register ConsensusMetrics");
let genesis_start_time = protocol_config.timestamp;
let mut era_supervisor = Self {
active_eras: Default::default(),
secret_signing_key,
public_signing_key,
current_era: EraId(0),
protocol_config,
new_consensus,
node_start_time: Timestamp::now(),
bonded_eras,
next_block_height: 0,
metrics,
finished_joining: false,
unit_hashes_folder,
};
let results = era_supervisor.new_era(
EraId(0),
timestamp,
validators,
vec![], 0, genesis_start_time,
0, genesis_state_root_hash,
);
let effects = era_supervisor
.handling_wrapper(effect_builder, &mut rng)
.handle_consensus_results(EraId(0), results);
Ok((era_supervisor, effects))
}
pub(super) fn handling_wrapper<'a, REv: ReactorEventT<I>>(
&'a mut self,
effect_builder: EffectBuilder<REv>,
rng: &'a mut NodeRng,
) -> EraSupervisorHandlingWrapper<'a, I, REv> {
EraSupervisorHandlingWrapper {
era_supervisor: self,
effect_builder,
rng,
}
}
fn booking_block_height(&self, era_id: EraId) -> u64 {
let after_booking_era_id =
EraId(era_id.0.saturating_sub(self.protocol_config.auction_delay));
self.active_eras
.get(&after_booking_era_id)
.expect("should have era after booking block")
.start_height
.saturating_sub(1)
}
fn key_block_height(&self, _era_id: EraId, start_height: u64) -> u64 {
start_height.saturating_sub(1)
}
fn era_seed(booking_block_hash: BlockHash, key_block_seed: Digest) -> u64 {
let mut result = [0; Digest::LENGTH];
let mut hasher = VarBlake2b::new(Digest::LENGTH).expect("should create hasher");
hasher.update(booking_block_hash);
hasher.update(key_block_seed);
hasher.finalize_variable(|slice| {
result.copy_from_slice(slice);
});
u64::from_le_bytes(result[0..std::mem::size_of::<u64>()].try_into().unwrap())
}
#[allow(clippy::too_many_arguments)] fn new_era(
&mut self,
era_id: EraId,
timestamp: Timestamp,
validators: BTreeMap<PublicKey, U512>,
newly_slashed: Vec<PublicKey>,
seed: u64,
start_time: Timestamp,
start_height: u64,
state_root_hash: Digest,
) -> Vec<ProtocolOutcome<I, ClContext>> {
if self.active_eras.contains_key(&era_id) {
panic!("{} already exists", era_id);
}
self.current_era = era_id;
self.metrics.current_era.set(self.current_era.0 as i64);
let instance_id = instance_id(&self.protocol_config, state_root_hash, start_height);
info!(
?validators,
%start_time,
%timestamp,
%start_height,
%instance_id,
era = era_id.0,
"starting era",
);
let slashed = era_id
.iter_other(self.bonded_eras)
.flat_map(|e_id| &self.active_eras[&e_id].newly_slashed)
.chain(&newly_slashed)
.cloned()
.collect();
let our_id = self.public_signing_key;
let should_activate = if !validators.contains_key(&our_id) {
info!(era = era_id.0, %our_id, "not voting; not a validator");
false
} else if !self.finished_joining {
info!(era = era_id.0, "not voting; still joining");
false
} else {
info!(era = era_id.0, "start voting");
true
};
let prev_era = era_id
.checked_sub(1)
.and_then(|last_era_id| self.active_eras.get(&last_era_id));
let mut consensus = (self.new_consensus)(
instance_id,
validators.clone(),
&slashed,
&self.protocol_config,
prev_era.map(|era| &*era.consensus),
start_time,
seed,
);
let results = if should_activate {
let secret = Keypair::new(Rc::clone(&self.secret_signing_key), our_id);
let unit_hash_file = self.unit_hashes_folder.join(format!(
"unit_hash_{:?}_{}.dat",
instance_id,
self.public_signing_key.to_hex()
));
consensus.activate_validator(our_id, secret, timestamp, Some(unit_hash_file))
} else {
Vec::new()
};
let era = Era::new(
consensus,
start_time,
start_height,
newly_slashed,
slashed,
validators,
);
let _ = self.active_eras.insert(era_id, era);
if let Some(obsolete_era_id) = era_id.checked_sub(2 * self.bonded_eras + 1) {
trace!(era = obsolete_era_id.0, "removing obsolete era");
self.active_eras.remove(&obsolete_era_id);
}
results
}
fn is_bonded(&self, era_id: EraId) -> bool {
era_id.0 + self.bonded_eras >= self.current_era.0 && era_id <= self.current_era
}
fn is_validator_in(&self, pub_key: &PublicKey, era_id: EraId) -> bool {
let has_validator = |era: &Era<I>| era.validators().contains_key(&pub_key);
self.active_eras.get(&era_id).map_or(false, has_validator)
}
#[cfg(test)]
pub(crate) fn active_eras(&self) -> &HashMap<EraId, Era<I>> {
&self.active_eras
}
pub(crate) fn finished_joining(
&mut self,
now: Timestamp,
) -> Vec<ProtocolOutcome<I, ClContext>> {
self.finished_joining = true;
let secret = Keypair::new(Rc::clone(&self.secret_signing_key), self.public_signing_key);
let public_key = self.public_signing_key;
let unit_hashes_folder = self.unit_hashes_folder.clone();
self.active_eras
.get_mut(&self.current_era)
.map(|era| {
if era.validators().contains_key(&public_key) {
let instance_id = *era.consensus.instance_id();
let unit_hash_file = unit_hashes_folder.join(format!(
"unit_hash_{:?}_{}.dat",
instance_id,
public_key.to_hex()
));
era.consensus
.activate_validator(public_key, secret, now, Some(unit_hash_file))
} else {
Vec::new()
}
})
.unwrap_or_default()
}
}
pub(super) struct EraSupervisorHandlingWrapper<'a, I, REv: 'static> {
pub(super) era_supervisor: &'a mut EraSupervisor<I>,
pub(super) effect_builder: EffectBuilder<REv>,
pub(super) rng: &'a mut NodeRng,
}
impl<'a, I, REv> EraSupervisorHandlingWrapper<'a, I, REv>
where
I: NodeIdT,
REv: ReactorEventT<I>,
{
fn delegate_to_era<F>(&mut self, era_id: EraId, f: F) -> Effects<Event<I>>
where
F: FnOnce(
&mut dyn ConsensusProtocol<I, ClContext>,
&mut NodeRng,
) -> Vec<ProtocolOutcome<I, ClContext>>,
{
match self.era_supervisor.active_eras.get_mut(&era_id) {
None => {
if era_id > self.era_supervisor.current_era {
info!(era = era_id.0, "received message for future era");
} else {
info!(era = era_id.0, "received message for obsolete era");
}
Effects::new()
}
Some(era) => {
let results = f(&mut *era.consensus, self.rng);
self.handle_consensus_results(era_id, results)
}
}
}
pub(super) fn handle_timer(
&mut self,
era_id: EraId,
timestamp: Timestamp,
) -> Effects<Event<I>> {
self.delegate_to_era(era_id, move |consensus, rng| {
consensus.handle_timer(timestamp, rng)
})
}
pub(super) fn handle_message(&mut self, sender: I, msg: ConsensusMessage) -> Effects<Event<I>> {
match msg {
ConsensusMessage::Protocol { era_id, payload } => {
let evidence_only = !self.era_supervisor.is_bonded(era_id);
trace!(era = era_id.0, "received a consensus message");
self.delegate_to_era(era_id, move |consensus, rng| {
consensus.handle_message(sender, payload, evidence_only, rng)
})
}
ConsensusMessage::EvidenceRequest { era_id, pub_key } => {
if !self.era_supervisor.is_bonded(era_id) {
trace!(era = era_id.0, "not handling message; era too old");
return Effects::new();
}
era_id
.iter_bonded(self.era_supervisor.bonded_eras)
.flat_map(|e_id| {
self.delegate_to_era(e_id, |consensus, _| {
consensus.request_evidence(sender.clone(), &pub_key)
})
})
.collect()
}
}
}
pub(super) fn handle_new_peer(&mut self, peer_id: I) -> Effects<Event<I>> {
self.delegate_to_era(self.era_supervisor.current_era, move |consensus, _rng| {
consensus.handle_new_peer(peer_id)
})
}
pub(super) fn handle_new_proto_block(
&mut self,
era_id: EraId,
proto_block: ProtoBlock,
block_context: BlockContext,
) -> Effects<Event<I>> {
if !self.era_supervisor.is_bonded(era_id) {
warn!(era = era_id.0, "new proto block in outdated era");
return Effects::new();
}
let accusations = era_id
.iter_bonded(self.era_supervisor.bonded_eras)
.flat_map(|e_id| self.era(e_id).consensus.validators_with_evidence())
.unique()
.filter(|pub_key| !self.era(era_id).slashed.contains(pub_key))
.cloned()
.collect();
let candidate_block = CandidateBlock::new(proto_block, accusations);
self.delegate_to_era(era_id, move |consensus, rng| {
consensus.propose(candidate_block, block_context, rng)
})
}
pub(super) fn handle_linear_chain_block(
&mut self,
block_header: BlockHeader,
responder: Responder<Option<FinalitySignature>>,
) -> Effects<Event<I>> {
let our_pk = self.era_supervisor.public_signing_key;
let our_sk = self.era_supervisor.secret_signing_key.clone();
let era_id = block_header.era_id();
let maybe_fin_sig = if self.era_supervisor.is_validator_in(&our_pk, era_id) {
let block_hash = block_header.hash();
Some(FinalitySignature::new(
block_hash,
era_id,
&our_sk,
our_pk,
&mut self.rng,
))
} else {
None
};
let mut effects = responder.respond(maybe_fin_sig).ignore();
if era_id < self.era_supervisor.current_era {
trace!(era = era_id.0, "executed block in old era");
return effects;
}
if block_header.switch_block() {
let new_era_id = era_id.successor();
let request = ValidatorWeightsByEraIdRequest::new(
(*block_header.state_root_hash()).into(),
new_era_id,
ProtocolVersion::V1_0_0,
);
let key_block_height = self
.era_supervisor
.key_block_height(new_era_id, block_header.height() + 1);
let booking_block_height = self.era_supervisor.booking_block_height(new_era_id);
let effect = self
.effect_builder
.create_new_era(request, booking_block_height, key_block_height)
.event(
move |(validators, booking_block, key_block)| Event::CreateNewEra {
block_header: Box::new(block_header),
booking_block_hash: booking_block
.map_or_else(|| Err(booking_block_height), |block| Ok(*block.hash())),
key_block_seed: key_block.map_or_else(
|| Err(key_block_height),
|block| Ok(block.header().accumulated_seed()),
),
get_validators_result: validators,
},
);
effects.extend(effect);
} else {
effects.extend(
self.effect_builder
.announce_block_handled(block_header)
.ignore(),
);
}
effects
}
pub(super) fn handle_deactivate_era(
&mut self,
era_id: EraId,
old_faulty_num: usize,
delay: Duration,
) -> Effects<Event<I>> {
let era = if let Some(era) = self.era_supervisor.active_eras.get_mut(&era_id) {
era
} else {
warn!(era = era_id.0, "trying to deactivate obsolete era");
return Effects::new();
};
let faulty_num = era.consensus.validators_with_evidence().len();
if faulty_num == old_faulty_num {
info!(era = era_id.0, "stop voting in era");
era.consensus.deactivate_validator();
Effects::new()
} else {
let deactivate_era = move |_| Event::DeactivateEra {
era_id,
faulty_num,
delay,
};
self.effect_builder.set_timeout(delay).event(deactivate_era)
}
}
pub(super) fn handle_create_new_era(
&mut self,
block_header: BlockHeader,
booking_block_hash: BlockHash,
key_block_seed: Digest,
validators: BTreeMap<PublicKey, U512>,
) -> Effects<Event<I>> {
let newly_slashed = block_header
.era_end()
.expect("switch block must have era_end")
.equivocators
.clone();
let era_id = block_header.era_id().successor();
info!(era = era_id.0, "era created");
let seed = EraSupervisor::<I>::era_seed(booking_block_hash, key_block_seed);
trace!(%seed, "the seed for {}: {}", era_id, seed);
let results = self.era_supervisor.new_era(
era_id,
Timestamp::now(), validators,
newly_slashed,
seed,
block_header.timestamp(),
block_header.height() + 1,
*block_header.state_root_hash(),
);
let mut effects = self.handle_consensus_results(era_id, results);
effects.extend(
self.effect_builder
.announce_block_handled(block_header)
.ignore(),
);
effects
}
pub(super) fn resolve_validity(
&mut self,
era_id: EraId,
sender: I,
proto_block: ProtoBlock,
valid: bool,
) -> Effects<Event<I>> {
self.era_supervisor.metrics.proposed_block();
let mut effects = Effects::new();
if !valid {
warn!(
%sender,
era = %era_id.0,
"invalid consensus value; disconnecting from the sender"
);
effects.extend(self.disconnect(sender));
}
let candidate_blocks = if let Some(era) = self.era_supervisor.active_eras.get_mut(&era_id) {
era.resolve_validity(&proto_block, valid)
} else {
return effects;
};
for candidate_block in candidate_blocks {
effects.extend(self.delegate_to_era(era_id, |consensus, rng| {
consensus.resolve_validity(&candidate_block, valid, rng)
}));
}
effects
}
fn handle_consensus_results<T>(&mut self, era_id: EraId, results: T) -> Effects<Event<I>>
where
T: IntoIterator<Item = ProtocolOutcome<I, ClContext>>,
{
results
.into_iter()
.flat_map(|result| self.handle_consensus_result(era_id, result))
.collect()
}
fn has_evidence(&self, era_id: EraId, pub_key: PublicKey) -> bool {
era_id
.iter_bonded(self.era_supervisor.bonded_eras)
.any(|eid| self.era(eid).consensus.has_evidence(&pub_key))
}
fn era(&self, era_id: EraId) -> &Era<I> {
&self.era_supervisor.active_eras[&era_id]
}
fn era_mut(&mut self, era_id: EraId) -> &mut Era<I> {
self.era_supervisor.active_eras.get_mut(&era_id).unwrap()
}
fn handle_consensus_result(
&mut self,
era_id: EraId,
consensus_result: ProtocolOutcome<I, ClContext>,
) -> Effects<Event<I>> {
match consensus_result {
ProtocolOutcome::InvalidIncomingMessage(_, sender, error) => {
warn!(
%sender,
%error,
"invalid incoming message to consensus instance; disconnecting from the sender"
);
self.disconnect(sender)
}
ProtocolOutcome::Disconnect(sender) => {
warn!(
%sender,
"disconnecting from the sender of invalid data"
);
self.disconnect(sender)
}
ProtocolOutcome::CreatedGossipMessage(out_msg) => {
self.effect_builder
.broadcast_message(era_id.message(out_msg).into())
.ignore()
}
ProtocolOutcome::CreatedTargetedMessage(out_msg, to) => self
.effect_builder
.send_message(to, era_id.message(out_msg).into())
.ignore(),
ProtocolOutcome::ScheduleTimer(timestamp) => {
let timediff = timestamp.saturating_sub(Timestamp::now());
self.effect_builder
.set_timeout(timediff.into())
.event(move |_| Event::Timer { era_id, timestamp })
}
ProtocolOutcome::CreateNewBlock {
block_context,
past_values,
} => {
let past_deploys = past_values
.iter()
.flat_map(|candidate| BlockLike::deploys(candidate.proto_block()))
.cloned()
.collect();
self.effect_builder
.request_proto_block(
block_context,
past_deploys,
self.era_supervisor.next_block_height,
self.rng.gen(),
)
.event(move |(proto_block, block_context)| Event::NewProtoBlock {
era_id,
proto_block,
block_context,
})
}
ProtocolOutcome::FinalizedBlock(CpFinalizedBlock {
value,
timestamp,
height,
rewards,
equivocators,
proposer,
}) => {
let era = self.era_supervisor.active_eras.get_mut(&era_id).unwrap();
era.add_accusations(&equivocators);
era.add_accusations(value.accusations());
let era_end = rewards.map(|rewards| EraEnd {
rewards,
equivocators: era.accusations(),
});
let finalized_block = FinalizedBlock::new(
value.proto_block().clone(),
timestamp,
era_end,
era_id,
era.start_height + height,
proposer,
);
self.era_supervisor
.metrics
.finalized_block(&finalized_block);
let mut effects = self
.effect_builder
.announce_finalized_block(finalized_block.clone())
.ignore();
self.era_supervisor.next_block_height = finalized_block.height() + 1;
if finalized_block.era_end().is_some() {
let delay = Timestamp::now().saturating_sub(timestamp).into();
let faulty_num = era.consensus.validators_with_evidence().len();
let deactivate_era = move |_| Event::DeactivateEra {
era_id,
faulty_num,
delay,
};
effects.extend(self.effect_builder.set_timeout(delay).event(deactivate_era));
}
effects.extend(self.effect_builder.execute_block(finalized_block).ignore());
effects
}
ProtocolOutcome::ValidateConsensusValue(sender, candidate_block, timestamp) => {
if !self.era_supervisor.is_bonded(era_id) {
return Effects::new();
}
let proto_block = candidate_block.proto_block().clone();
let missing_evidence: Vec<PublicKey> = candidate_block
.accusations()
.iter()
.filter(|pub_key| !self.has_evidence(era_id, **pub_key))
.cloned()
.collect();
let mut effects = Effects::new();
for pub_key in missing_evidence.iter().cloned() {
let msg = ConsensusMessage::EvidenceRequest { era_id, pub_key };
effects.extend(
self.effect_builder
.send_message(sender.clone(), msg.into())
.ignore(),
);
}
self.era_mut(era_id)
.add_candidate(candidate_block, missing_evidence);
effects.extend(
self.effect_builder
.validate_block(sender.clone(), proto_block, timestamp)
.event(move |(valid, proto_block)| Event::ResolveValidity {
era_id,
sender,
proto_block,
valid,
}),
);
effects
}
ProtocolOutcome::NewEvidence(pub_key) => {
info!(%pub_key, era = era_id.0, "validator equivocated");
let mut effects = self
.effect_builder
.announce_fault_event(era_id, pub_key, Timestamp::now())
.ignore();
for e_id in (era_id.0..=(era_id.0 + self.era_supervisor.bonded_eras)).map(EraId) {
let candidate_blocks =
if let Some(era) = self.era_supervisor.active_eras.get_mut(&e_id) {
era.resolve_evidence(&pub_key)
} else {
continue;
};
for candidate_block in candidate_blocks {
effects.extend(self.delegate_to_era(e_id, |consensus, rng| {
consensus.resolve_validity(&candidate_block, true, rng)
}));
}
}
effects
}
ProtocolOutcome::SendEvidence(sender, pub_key) => era_id
.iter_other(self.era_supervisor.bonded_eras)
.flat_map(|e_id| {
self.delegate_to_era(e_id, |consensus, _| {
consensus.request_evidence(sender.clone(), &pub_key)
})
})
.collect(),
ProtocolOutcome::WeAreFaulty => Default::default(),
ProtocolOutcome::DoppelgangerDetected => Default::default(),
}
}
pub(super) fn shutdown_if_necessary(&self) -> Effects<Event<I>> {
let should_emit_error = self
.era_supervisor
.active_eras
.iter()
.all(|(_, era)| !era.consensus.has_received_messages());
if should_emit_error {
fatal!(
self.effect_builder,
"Consensus shutting down due to inability to participate in the network; inactive era = {}",
self.era_supervisor.current_era
)
} else {
Default::default()
}
}
pub(crate) fn finished_joining(&mut self, now: Timestamp) -> Effects<Event<I>> {
let results = self.era_supervisor.finished_joining(now);
self.handle_consensus_results(self.era_supervisor.current_era, results)
}
pub(super) fn is_bonded_validator(
&self,
era_id: EraId,
vid: PublicKey,
responder: Responder<bool>,
) -> Effects<Event<I>> {
let is_bonded = self
.era_supervisor
.active_eras
.get(&era_id)
.map_or(false, |cp| cp.is_bonded_validator(&vid));
responder.respond(is_bonded).ignore()
}
fn disconnect(&self, sender: I) -> Effects<Event<I>> {
self.effect_builder
.announce_disconnect_from_peer(sender)
.ignore()
}
}
fn instance_id(
protocol_config: &ProtocolConfig,
state_root_hash: Digest,
block_height: u64,
) -> Digest {
let mut result = [0; Digest::LENGTH];
let mut hasher = VarBlake2b::new(Digest::LENGTH).expect("should create hasher");
hasher.update(&protocol_config.name);
hasher.update(protocol_config.timestamp.millis().to_le_bytes());
hasher.update(state_root_hash);
for upgrade_point in protocol_config
.upgrades
.iter()
.take_while(|up| up.activation_point.height <= block_height)
{
hasher.update(upgrade_point.activation_point.height.to_le_bytes());
if let Some(bytes) = upgrade_point.upgrade_installer_bytes.as_ref() {
hasher.update(bytes);
}
if let Some(bytes) = upgrade_point.upgrade_installer_args.as_ref() {
hasher.update(bytes);
}
}
hasher.finalize_variable(|slice| {
result.copy_from_slice(slice);
});
result.into()
}