pub(crate) mod config;
mod participation;
mod round_success_meter;
#[cfg(test)]
mod tests;
use std::{
any::Any,
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
iter,
path::PathBuf,
};
use datasize::DataSize;
use itertools::Itertools;
use num_traits::AsPrimitive;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use tracing::{debug, error, info, trace, warn};
use casper_types::{system::auction::BLOCK_REWARD, U512};
use crate::{
components::consensus::{
config::{Config, ProtocolConfig},
consensus_protocol::{
BlockContext, ConsensusProtocol, ProposedBlock, ProtocolOutcome, ProtocolOutcomes,
},
highway_core::{
active_validator::Effect as AvEffect,
finality_detector::{FinalityDetector, FttExceeded},
highway::{
Dependency, GetDepOutcome, Highway, Params, PreValidatedVertex, ValidVertex,
Vertex, VertexError,
},
state::{self, IndexObservation, IndexPanorama, Observation, Panorama},
synchronizer::Synchronizer,
validators::{ValidatorIndex, Validators},
},
traits::{ConsensusValueT, Context, NodeIdT},
ActionId, TimerId,
},
types::{TimeDiff, Timestamp},
NodeRng,
};
use self::round_success_meter::RoundSuccessMeter;
const MAX_ENDORSEMENT_EVIDENCE_LIMIT: u64 = 10_000;
const TIMER_ID_ACTIVE_VALIDATOR: TimerId = TimerId(0);
const TIMER_ID_VERTEX_WITH_FUTURE_TIMESTAMP: TimerId = TimerId(1);
const TIMER_ID_PURGE_VERTICES: TimerId = TimerId(2);
const TIMER_ID_LOG_PARTICIPATION: TimerId = TimerId(3);
const TIMER_ID_STANDSTILL_ALERT: TimerId = TimerId(4);
const TIMER_ID_SYNCHRONIZER_LOG: TimerId = TimerId(5);
const TIMER_ID_REQUEST_STATE: TimerId = TimerId(6);
pub(crate) const ACTION_ID_VERTEX: ActionId = ActionId(0);
#[derive(DataSize, Debug)]
pub(crate) struct HighwayProtocol<I, C>
where
I: DataSize,
C: Context,
{
pending_values: HashMap<ProposedBlock<C>, HashSet<(ValidVertex<C>, I)>>,
finality_detector: FinalityDetector<C>,
highway: Highway<C>,
round_success_meter: RoundSuccessMeter<C>,
synchronizer: Synchronizer<I, C>,
pvv_cache: HashMap<Dependency<C>, PreValidatedVertex<C>>,
evidence_only: bool,
last_panorama: Panorama<C>,
config: config::Config,
}
impl<I: NodeIdT, C: Context + 'static> HighwayProtocol<I, C> {
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub(crate) fn new_boxed(
instance_id: C::InstanceId,
validator_stakes: BTreeMap<C::ValidatorId, U512>,
faulty: &HashSet<C::ValidatorId>,
inactive: &HashSet<C::ValidatorId>,
protocol_config: &ProtocolConfig,
config: &Config,
prev_cp: Option<&dyn ConsensusProtocol<I, C>>,
era_start_time: Timestamp,
seed: u64,
now: Timestamp,
) -> (Box<dyn ConsensusProtocol<I, C>>, ProtocolOutcomes<I, C>) {
let validators_count = validator_stakes.len();
let sum_stakes: U512 = validator_stakes.iter().map(|(_, stake)| *stake).sum();
assert!(
!sum_stakes.is_zero(),
"cannot start era with total weight 0"
);
let scaling_factor = (sum_stakes + U512::from(u64::MAX) - 1) / U512::from(u64::MAX);
let scale_stake = |(key, stake): (C::ValidatorId, U512)| {
(key, AsPrimitive::<u64>::as_(stake / scaling_factor))
};
let mut validators: Validators<C::ValidatorId> =
validator_stakes.into_iter().map(scale_stake).collect();
for vid in faulty {
validators.ban(vid);
}
for vid in inactive {
validators.set_cannot_propose(vid);
}
assert!(
validators.ensure_nonzero_proposing_stake(),
"cannot start era with total weight 0"
);
let total_weight = u128::from(validators.total_weight());
let ftt_fraction = protocol_config.highway.finality_threshold_fraction;
assert!(
ftt_fraction < 1.into(),
"finality threshold must be less than 100%"
);
#[allow(clippy::integer_arithmetic)] let ftt = total_weight * *ftt_fraction.numer() as u128 / *ftt_fraction.denom() as u128;
let ftt = (ftt as u64).into();
let round_success_meter = prev_cp
.and_then(|cp| cp.as_any().downcast_ref::<HighwayProtocol<I, C>>())
.map(|highway_proto| highway_proto.next_era_round_succ_meter(era_start_time.max(now)))
.unwrap_or_else(|| {
RoundSuccessMeter::new(
protocol_config.highway.minimum_round_exponent,
protocol_config.highway.minimum_round_exponent,
protocol_config.highway.maximum_round_exponent,
era_start_time.max(now),
config.into(),
)
});
let init_round_exp = round_success_meter.new_exponent();
info!(
%init_round_exp,
"initializing Highway instance",
);
let min_round_len = state::round_len(protocol_config.highway.minimum_round_exponent);
let min_rounds_per_era = protocol_config
.minimum_era_height
.max((TimeDiff::from(1) + protocol_config.era_duration) / min_round_len);
let endorsement_evidence_limit = min_rounds_per_era
.saturating_mul(2)
.min(MAX_ENDORSEMENT_EVIDENCE_LIMIT);
let params = Params::new(
seed,
BLOCK_REWARD,
(protocol_config.highway.reduced_reward_multiplier * BLOCK_REWARD).to_integer(),
protocol_config.highway.minimum_round_exponent,
protocol_config.highway.maximum_round_exponent,
init_round_exp,
protocol_config.minimum_era_height,
era_start_time,
era_start_time + protocol_config.era_duration,
endorsement_evidence_limit,
);
let outcomes = Self::initialize_timers(now, era_start_time, &config.highway);
let highway = Highway::new(instance_id, validators, params);
let last_panorama = highway.state().panorama().clone();
let hw_proto = Box::new(HighwayProtocol {
pending_values: HashMap::new(),
finality_detector: FinalityDetector::new(ftt),
highway,
round_success_meter,
synchronizer: Synchronizer::new(validators_count, instance_id),
pvv_cache: Default::default(),
evidence_only: false,
last_panorama,
config: config.highway.clone(),
});
(hw_proto, outcomes)
}
fn initialize_timers(
now: Timestamp,
era_start_time: Timestamp,
config: &config::Config,
) -> ProtocolOutcomes<I, C> {
let mut outcomes = vec![ProtocolOutcome::ScheduleTimer(
now + config.pending_vertex_timeout,
TIMER_ID_PURGE_VERTICES,
)];
if let Some(interval) = config.log_participation_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now.max(era_start_time) + interval,
TIMER_ID_LOG_PARTICIPATION,
));
}
if let Some(interval) = config.log_synchronizer_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now + interval,
TIMER_ID_SYNCHRONIZER_LOG,
));
}
if let Some(timeout) = config.standstill_timeout {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now.max(era_start_time) + timeout,
TIMER_ID_STANDSTILL_ALERT,
));
}
outcomes
}
fn process_av_effects<E>(&mut self, av_effects: E, now: Timestamp) -> ProtocolOutcomes<I, C>
where
E: IntoIterator<Item = AvEffect<C>>,
{
av_effects
.into_iter()
.flat_map(|effect| self.process_av_effect(effect, now))
.collect()
}
fn process_av_effect(&mut self, effect: AvEffect<C>, now: Timestamp) -> ProtocolOutcomes<I, C> {
match effect {
AvEffect::NewVertex(vv) => {
self.log_unit_size(vv.inner(), "sending new unit");
self.calculate_round_exponent(&vv, now);
self.process_new_vertex(vv)
}
AvEffect::ScheduleTimer(timestamp) => {
vec![ProtocolOutcome::ScheduleTimer(
timestamp,
TIMER_ID_ACTIVE_VALIDATOR,
)]
}
AvEffect::RequestNewBlock(block_context) => {
vec![ProtocolOutcome::CreateNewBlock(block_context)]
}
AvEffect::WeAreFaulty(fault) => {
error!("this validator is faulty: {:?}", fault);
vec![ProtocolOutcome::WeAreFaulty]
}
}
}
fn process_new_vertex(&mut self, vv: ValidVertex<C>) -> ProtocolOutcomes<I, C> {
let mut outcomes = Vec::new();
if let Vertex::Evidence(ev) = vv.inner() {
let v_id = self
.highway
.validators()
.id(ev.perpetrator())
.expect("validator not found") .clone();
outcomes.push(ProtocolOutcome::NewEvidence(v_id));
}
let msg = HighwayMessage::NewVertex(vv.into());
outcomes.push(ProtocolOutcome::CreatedGossipMessage(msg.serialize()));
outcomes.extend(self.detect_finality());
outcomes
}
fn detect_finality(&mut self) -> ProtocolOutcomes<I, C> {
let faulty_weight = match self.finality_detector.run(&self.highway) {
Ok(iter) => return iter.map(ProtocolOutcome::FinalizedBlock).collect(),
Err(FttExceeded(weight)) => weight.0,
};
error!(
%faulty_weight,
total_weight = %self.highway.state().total_weight().0,
"too many faulty validators"
);
self.log_participation();
vec![ProtocolOutcome::FttExceeded]
}
fn add_vertex(&mut self, now: Timestamp) -> ProtocolOutcomes<I, C> {
let (maybe_pending_vertex, mut outcomes) = self.synchronizer.pop_vertex_to_add(
&self.highway,
&self.pending_values,
self.config.max_requests_for_vertex,
);
let pending_vertex = match maybe_pending_vertex {
None => return outcomes,
Some(pending_vertex) => pending_vertex,
};
if self.highway.is_doppelganger_vertex(pending_vertex.vertex()) {
error!(
vertex = ?pending_vertex.vertex(),
"received vertex from a doppelganger. \
Are you running multiple nodes with the same validator key?",
);
self.deactivate_validator();
outcomes.push(ProtocolOutcome::DoppelgangerDetected);
}
let sender = pending_vertex.sender().clone();
let vv = match self.highway.validate_vertex(pending_vertex.into()) {
Ok(vv) => vv,
Err((pvv, err)) => {
info!(?pvv, ?err, "invalid vertex");
let vertices = vec![pvv.inner().id()];
let faulty_senders = self.synchronizer.invalid_vertices(vertices);
outcomes.extend(faulty_senders.into_iter().map(ProtocolOutcome::Disconnect));
return outcomes;
}
};
let vertex = vv.inner();
if let (Some(value), Some(timestamp), Some(swunit)) =
(vertex.value(), vertex.timestamp(), vertex.unit())
{
let panorama = &swunit.wire_unit().panorama;
let fork_choice = self.highway.state().fork_choice(panorama);
if value.needs_validation() {
self.log_proposal(vertex, "requesting proposal validation");
let ancestor_values = self.ancestors(fork_choice).cloned().collect();
let block_context = BlockContext::new(timestamp, ancestor_values);
let proposed_block = ProposedBlock::new(value.clone(), block_context);
if self
.pending_values
.entry(proposed_block.clone())
.or_default()
.insert((vv, sender.clone()))
{
outcomes.push(ProtocolOutcome::ValidateConsensusValue {
sender,
proposed_block,
});
}
return outcomes;
} else {
self.log_proposal(vertex, "proposal does not need validation");
}
}
outcomes.extend(self.add_valid_vertex(vv, now));
outcomes.extend(self.synchronizer.remove_satisfied_deps(&self.highway));
outcomes.extend(self.detect_finality());
outcomes
}
fn calculate_round_exponent(&mut self, vv: &ValidVertex<C>, now: Timestamp) {
let new_round_exp = self
.round_success_meter
.calculate_new_exponent(self.highway.state());
if vv.is_proposal() {
let vertex = vv.inner();
if let (Some(hash), Some(timestamp)) = (vertex.unit_hash(), vertex.timestamp()) {
trace!(%now, timestamp = timestamp.millis(), "adding proposal to protocol state");
self.round_success_meter.new_proposal(hash, timestamp);
} else {
error!(?vertex, "proposal without unit hash and timestamp");
}
}
self.highway.set_round_exp(new_round_exp);
}
fn add_valid_vertex(&mut self, vv: ValidVertex<C>, now: Timestamp) -> ProtocolOutcomes<I, C> {
if self.evidence_only && !vv.inner().is_evidence() {
error!(vertex = ?vv.inner(), "unexpected vertex in evidence-only mode");
return vec![];
}
if self.highway.has_vertex(vv.inner()) {
return vec![];
}
if !vv.inner().is_proposal() {
if let Some(hash) = vv.inner().unit_hash() {
trace!(?hash, "adding unit to the protocol state");
} else {
trace!(vertex=?vv.inner(), "adding vertex to the protocol state");
}
}
self.log_unit_size(vv.inner(), "adding new unit to the protocol state");
self.log_proposal(vv.inner(), "adding valid proposal to the protocol state");
let vertex_id = vv.inner().id();
self.calculate_round_exponent(&vv, now);
let av_effects = self.highway.add_valid_vertex(vv, now);
self.pvv_cache.remove(&vertex_id);
self.process_av_effects(av_effects, now)
}
fn next_era_round_succ_meter(&self, timestamp: Timestamp) -> RoundSuccessMeter<C> {
self.round_success_meter.next_era(timestamp)
}
fn ancestors<'a>(
&'a self,
mut maybe_hash: Option<&'a C::Hash>,
) -> impl Iterator<Item = &'a C::ConsensusValue> {
iter::from_fn(move || {
let hash = maybe_hash.take()?;
let block = self.highway.state().block(hash);
let value = Some(&block.value);
maybe_hash = block.parent();
value
})
}
fn log_participation(&self) {
let instance_id = self.highway.instance_id();
let participation = participation::Participation::new(&self.highway);
info!(?participation, %instance_id, "validator participation");
}
fn log_unit_size(&self, vertex: &Vertex<C>, log_msg: &str) {
if self.config.log_unit_sizes {
if let Some(hash) = vertex.unit_hash() {
let size = HighwayMessage::NewVertex(vertex.clone()).serialize().len();
info!(size, %hash, "{}", log_msg);
}
}
}
fn finalized_switch_block(&self) -> bool {
let is_switch = |block_hash: &C::Hash| self.highway.state().is_terminal_block(block_hash);
self.finality_detector
.last_finalized()
.map_or(false, is_switch)
}
fn handle_request_state_timer(&mut self, now: Timestamp) -> ProtocolOutcomes<I, C> {
if self.evidence_only || self.finalized_switch_block() {
return vec![]; }
debug!(
instance_id = ?self.highway.instance_id(),
"requesting latest state from random peer",
);
let mut outcomes = self.latest_state_request();
if let Some(interval) = self.config.request_state_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now + interval,
TIMER_ID_REQUEST_STATE,
));
}
outcomes
}
fn handle_standstill_alert_timer(&mut self, now: Timestamp) -> ProtocolOutcomes<I, C> {
if self.evidence_only || self.finalized_switch_block() {
return vec![];
}
let timeout = match self.config.standstill_timeout {
None => return vec![],
Some(timeout) => timeout,
};
if self.last_panorama == *self.highway.state().panorama() {
info!(
instance_id = ?self.highway.instance_id(),
"no progress in the last {}, raising standstill alert",
timeout,
);
return vec![ProtocolOutcome::StandstillAlert]; }
debug!(
instance_id = ?self.highway.instance_id(),
"progress detected; scheduling next standstill check in {}",
timeout,
);
self.last_panorama = self.highway.state().panorama().clone();
vec![ProtocolOutcome::ScheduleTimer(
now + timeout,
TIMER_ID_STANDSTILL_ALERT,
)]
}
fn log_proposal(&self, vertex: &Vertex<C>, msg: &str) -> bool {
let (wire_unit, hash) = match vertex.unit() {
Some(swu) if swu.wire_unit().value.is_some() => (swu.wire_unit(), swu.hash()),
_ => return false, };
let creator = if let Some(creator) = self.highway.validators().id(wire_unit.creator) {
creator
} else {
error!(?wire_unit, "{}: invalid creator", msg);
return true;
};
info!(
?hash,
?creator,
creator_index = wire_unit.creator.0,
timestamp = %wire_unit.timestamp,
round_exp = wire_unit.round_exp,
seq_number = wire_unit.seq_number,
"{}", msg
);
true
}
fn log_received_vertex(&self, vertex: &Vertex<C>) {
match vertex {
Vertex::Unit(swu) => {
let creator = if let Some(creator) = vertex
.creator()
.and_then(|vid| self.highway.validators().id(vid))
{
creator
} else {
error!(?vertex, "invalid creator");
return;
};
let wire_unit = swu.wire_unit();
let hash = swu.hash();
if vertex.is_proposal() {
info!(
?hash,
?creator,
creator_index = wire_unit.creator.0,
timestamp = %wire_unit.timestamp,
round_exp = wire_unit.round_exp,
seq_number = wire_unit.seq_number,
"received a proposal"
);
} else {
trace!(
?hash,
?creator,
creator_index = wire_unit.creator.0,
timestamp = %wire_unit.timestamp,
round_exp = wire_unit.round_exp,
seq_number = wire_unit.seq_number,
"received a non-proposal unit"
);
};
}
Vertex::Evidence(evidence) => trace!(?evidence, "received an evidence"),
Vertex::Endorsements(endorsement) => trace!(?endorsement, "received an endorsement"),
Vertex::Ping(ping) => trace!(?ping, "received ping"),
}
}
fn pre_validate_vertex(
&mut self,
v: Vertex<C>,
) -> Result<PreValidatedVertex<C>, (Vertex<C>, VertexError)> {
let id = v.id();
if let Some(prev_pvv) = self.pvv_cache.get(&id) {
return Ok(prev_pvv.clone());
}
let pvv = self.highway.pre_validate_vertex(v)?;
self.pvv_cache.insert(id, pvv.clone());
Ok(pvv)
}
fn latest_state_request(&self) -> ProtocolOutcomes<I, C> {
let request: HighwayMessage<C> = HighwayMessage::LatestStateRequest(
IndexPanorama::from_panorama(self.highway.state().panorama(), self.highway.state()),
);
let payload = (&request).serialize();
vec![ProtocolOutcome::CreatedMessageToRandomPeer(payload)]
}
fn batch_request(
&self,
rng: &mut NodeRng,
vid: ValidatorIndex,
our_next_seq: u64,
their_next_seq: u64,
) -> Vec<HighwayMessage<C>> {
let state = self.highway.state();
if our_next_seq == their_next_seq {
return vec![];
}
if our_next_seq < their_next_seq {
(our_next_seq..their_next_seq)
.take(self.config.max_request_batch_size)
.map(|unit_seq_number| {
let uuid = rng.next_u64();
debug!(?uuid, ?vid, ?unit_seq_number, "requesting dependency");
HighwayMessage::RequestDependencyByHeight {
uuid,
vid,
unit_seq_number,
}
})
.into_iter()
.collect()
} else {
match state.panorama().get(vid) {
None => {
warn!(?vid, "received a request for non-existing validator");
vec![]
}
Some(observation) => match observation {
Observation::None => {
warn!(
?vid,
our_next_seq,
?observation,
"expected unit for validator but found none"
);
vec![]
}
Observation::Faulty => {
let ev = match state.maybe_evidence(vid) {
Some(ev) => ev.clone(),
None => {
warn!(
?vid, instance_id=?self.highway.instance_id(),
"panorama marked validator as faulty but no evidence was found"
);
return vec![];
}
};
vec![HighwayMessage::NewVertex(Vertex::Evidence(ev))]
}
Observation::Correct(hash) => (their_next_seq..our_next_seq)
.take(self.config.max_request_batch_size)
.filter_map(|seq_num| {
let unit = state.find_in_swimlane(hash, seq_num).unwrap();
state
.wire_unit(unit, *self.highway.instance_id())
.map(|swu| HighwayMessage::NewVertex(Vertex::Unit(swu)))
})
.into_iter()
.collect(),
},
}
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(bound(
serialize = "C::Hash: Serialize",
deserialize = "C::Hash: Deserialize<'de>",
))]
pub(crate) enum HighwayMessage<C: Context> {
NewVertex(Vertex<C>),
RequestDependency(u64, Dependency<C>),
RequestDependencyByHeight {
uuid: u64,
vid: ValidatorIndex,
unit_seq_number: u64,
},
LatestStateRequest(IndexPanorama),
}
impl<C: Context> HighwayMessage<C> {
pub(crate) fn serialize(&self) -> Vec<u8> {
bincode::serialize(self).expect("should serialize message")
}
}
impl<I, C> ConsensusProtocol<I, C> for HighwayProtocol<I, C>
where
I: NodeIdT,
C: Context + 'static,
{
fn handle_message(
&mut self,
rng: &mut NodeRng,
sender: I,
msg: Vec<u8>,
now: Timestamp,
) -> ProtocolOutcomes<I, C> {
match bincode::deserialize(msg.as_slice()) {
Err(err) => vec![ProtocolOutcome::InvalidIncomingMessage(
msg,
sender,
err.into(),
)],
Ok(HighwayMessage::NewVertex(v))
if self.highway.has_vertex(&v) || (self.evidence_only && !v.is_evidence()) =>
{
trace!(
has_vertex = self.highway.has_vertex(&v),
is_evidence = v.is_evidence(),
evidence_only = %self.evidence_only,
"received an irrelevant vertex"
);
vec![]
}
Ok(HighwayMessage::NewVertex(v)) => {
let v_id = v.id();
if self.highway.has_dependency(&v_id) {
return vec![];
}
let pvv = match self.pre_validate_vertex(v) {
Ok(pvv) => pvv,
Err((_, err)) => {
trace!("received an invalid vertex");
let faulty_senders = self.synchronizer.invalid_vertices(vec![v_id]);
return iter::once(ProtocolOutcome::InvalidIncomingMessage(
msg,
sender,
err.into(),
))
.chain(faulty_senders.into_iter().map(ProtocolOutcome::Disconnect))
.collect();
}
};
let is_faulty = match pvv.inner().creator() {
Some(creator) => self.highway.state().is_faulty(creator),
None => false,
};
if is_faulty && !self.synchronizer.is_dependency(&pvv.inner().id()) {
trace!("received a vertex from a faulty validator; dropping");
return vec![];
}
match pvv.timestamp() {
Some(timestamp) if timestamp > now + self.config.pending_vertex_timeout => {
trace!("received a vertex with a timestamp far in the future; dropping");
vec![]
}
Some(timestamp) if timestamp > now => {
trace!("received a vertex from the future; storing for later");
self.synchronizer
.store_vertex_for_addition_later(timestamp, now, sender, pvv);
let timer_id = TIMER_ID_VERTEX_WITH_FUTURE_TIMESTAMP;
vec![ProtocolOutcome::ScheduleTimer(timestamp, timer_id)]
}
_ => {
self.log_received_vertex(pvv.inner());
self.synchronizer.schedule_add_vertex(sender, pvv, now)
}
}
}
Ok(HighwayMessage::RequestDependency(uuid, dep)) => {
trace!(?uuid, dependency=?dep, "received a request for a dependency");
match self.highway.get_dependency(&dep) {
GetDepOutcome::None => {
info!(?dep, peer_id=?sender, "requested dependency doesn't exist");
vec![]
}
GetDepOutcome::Evidence(vid) => {
vec![ProtocolOutcome::SendEvidence(sender, vid)]
}
GetDepOutcome::Vertex(vv) => vec![ProtocolOutcome::CreatedTargetedMessage(
HighwayMessage::NewVertex(vv.into()).serialize(),
sender,
)],
}
}
Ok(HighwayMessage::RequestDependencyByHeight {
uuid,
vid,
unit_seq_number,
}) => {
debug!(
?uuid,
?vid,
?unit_seq_number,
"received a request for a dependency"
);
match self.highway.get_dependency_by_index(vid, unit_seq_number) {
GetDepOutcome::None => {
info!(
?vid,
?unit_seq_number,
?sender,
"requested dependency doesn't exist"
);
vec![]
}
GetDepOutcome::Evidence(vid) => {
vec![ProtocolOutcome::SendEvidence(sender, vid)]
}
GetDepOutcome::Vertex(vv) => {
vec![ProtocolOutcome::CreatedTargetedMessage(
HighwayMessage::NewVertex(vv.into()).serialize(),
sender,
)]
}
}
}
Ok(HighwayMessage::LatestStateRequest(their_index_panorama)) => {
trace!("received a request for the latest state");
let state = self.highway.state();
let create_message = |((vid, our_obs), their_obs): (
(ValidatorIndex, &IndexObservation),
&IndexObservation,
)| {
match (*our_obs, *their_obs) {
(our_obs, their_obs) if our_obs == their_obs => vec![],
(IndexObservation::Faulty, _) => state
.maybe_evidence(vid)
.map(|evidence| {
HighwayMessage::NewVertex(Vertex::Evidence(evidence.clone()))
})
.into_iter()
.collect(),
(_, IndexObservation::Faulty) => {
let dependency = Dependency::Evidence(vid);
let uuid = rng.next_u64();
debug!(?uuid, "requesting evidence");
vec![HighwayMessage::RequestDependency(uuid, dependency)]
}
(
IndexObservation::NextSeq(our_next_seq),
IndexObservation::NextSeq(their_next_seq),
) => self.batch_request(rng, vid, our_next_seq, their_next_seq),
}
};
IndexPanorama::from_panorama(state.panorama(), state)
.enumerate()
.zip(&their_index_panorama)
.map(create_message)
.flat_map(|msgs| {
msgs.into_iter().map(|msg| {
ProtocolOutcome::CreatedTargetedMessage(msg.serialize(), sender.clone())
})
})
.collect()
}
}
}
fn handle_timer(&mut self, now: Timestamp, timer_id: TimerId) -> ProtocolOutcomes<I, C> {
match timer_id {
TIMER_ID_ACTIVE_VALIDATOR => {
let effects = self.highway.handle_timer(now);
self.process_av_effects(effects, now)
}
TIMER_ID_VERTEX_WITH_FUTURE_TIMESTAMP => {
self.synchronizer.add_past_due_stored_vertices(now)
}
TIMER_ID_PURGE_VERTICES => {
let oldest = now.saturating_sub(self.config.pending_vertex_timeout);
self.synchronizer.purge_vertices(oldest);
self.pvv_cache.clear();
let next_time = now + self.config.pending_vertex_timeout;
vec![ProtocolOutcome::ScheduleTimer(next_time, timer_id)]
}
TIMER_ID_LOG_PARTICIPATION => {
self.log_participation();
match self.config.log_participation_interval {
Some(interval) if !self.evidence_only && !self.finalized_switch_block() => {
vec![ProtocolOutcome::ScheduleTimer(now + interval, timer_id)]
}
_ => vec![],
}
}
TIMER_ID_REQUEST_STATE => self.handle_request_state_timer(now),
TIMER_ID_STANDSTILL_ALERT => self.handle_standstill_alert_timer(now),
TIMER_ID_SYNCHRONIZER_LOG => {
self.synchronizer.log_len();
match self.config.log_synchronizer_interval {
Some(interval) if !self.finalized_switch_block() => {
vec![ProtocolOutcome::ScheduleTimer(now + interval, timer_id)]
}
_ => vec![],
}
}
_ => unreachable!("unexpected timer ID"),
}
}
fn handle_is_current(&self, now: Timestamp) -> ProtocolOutcomes<I, C> {
let mut outcomes = self.latest_state_request();
if let Some(interval) = self.config.request_state_interval {
outcomes.push(ProtocolOutcome::ScheduleTimer(
now + interval,
TIMER_ID_REQUEST_STATE,
));
}
outcomes
}
fn handle_action(&mut self, action_id: ActionId, now: Timestamp) -> ProtocolOutcomes<I, C> {
match action_id {
ACTION_ID_VERTEX => self.add_vertex(now),
_ => unreachable!("unexpected action ID"),
}
}
fn propose(
&mut self,
proposed_block: ProposedBlock<C>,
now: Timestamp,
) -> ProtocolOutcomes<I, C> {
let (value, block_context) = proposed_block.destructure();
let effects = self.highway.propose(value, block_context);
self.process_av_effects(effects, now)
}
fn resolve_validity(
&mut self,
proposed_block: ProposedBlock<C>,
valid: bool,
now: Timestamp,
) -> ProtocolOutcomes<I, C> {
if valid {
let mut outcomes = self
.pending_values
.remove(&proposed_block)
.into_iter()
.flatten()
.flat_map(|(vv, _)| self.add_valid_vertex(vv, now))
.collect_vec();
outcomes.extend(self.synchronizer.remove_satisfied_deps(&self.highway));
outcomes.extend(self.detect_finality());
outcomes
} else {
let dropped_vertices = self.pending_values.remove(&proposed_block);
warn!(?proposed_block, ?dropped_vertices, "proposal is invalid");
let dropped_vertex_ids = dropped_vertices
.into_iter()
.flatten()
.map(|(vv, _)| {
self.log_proposal(vv.inner(), "dropping invalid proposal");
vv.inner().id()
})
.collect();
let _faulty_senders = self.synchronizer.invalid_vertices(dropped_vertex_ids);
vec![]
}
}
fn activate_validator(
&mut self,
our_id: C::ValidatorId,
secret: C::ValidatorSecret,
now: Timestamp,
unit_hash_file: Option<PathBuf>,
) -> ProtocolOutcomes<I, C> {
let ftt = self.finality_detector.fault_tolerance_threshold();
let av_effects = self
.highway
.activate_validator(our_id, secret, now, unit_hash_file, ftt);
self.process_av_effects(av_effects, now)
}
fn deactivate_validator(&mut self) {
self.highway.deactivate_validator()
}
fn set_evidence_only(&mut self) {
self.pending_values.clear();
self.synchronizer.retain_evidence_only();
self.highway.retain_evidence_only();
self.evidence_only = true;
}
fn has_evidence(&self, vid: &C::ValidatorId) -> bool {
self.highway.has_evidence(vid)
}
fn mark_faulty(&mut self, vid: &C::ValidatorId) {
self.highway.mark_faulty(vid);
}
fn request_evidence(&self, sender: I, vid: &C::ValidatorId) -> ProtocolOutcomes<I, C> {
self.highway
.validators()
.get_index(vid)
.and_then(
move |vidx| match self.highway.get_dependency(&Dependency::Evidence(vidx)) {
GetDepOutcome::None | GetDepOutcome::Evidence(_) => None,
GetDepOutcome::Vertex(vv) => {
let msg = HighwayMessage::NewVertex(vv.into());
Some(ProtocolOutcome::CreatedTargetedMessage(
msg.serialize(),
sender,
))
}
},
)
.into_iter()
.collect()
}
fn set_paused(&mut self, paused: bool) {
self.highway.set_paused(paused);
}
fn validators_with_evidence(&self) -> Vec<&C::ValidatorId> {
self.highway.validators_with_evidence().collect()
}
fn has_received_messages(&self) -> bool {
!self.highway.state().is_empty()
|| !self.synchronizer.is_empty()
|| !self.pending_values.is_empty()
}
fn as_any(&self) -> &dyn Any {
self
}
fn is_active(&self) -> bool {
self.highway.is_active()
}
fn instance_id(&self) -> &C::InstanceId {
self.highway.instance_id()
}
fn next_round_length(&self) -> Option<TimeDiff> {
self.highway.next_round_length()
}
}