use std::{
collections::{BTreeMap, HashMap, HashSet},
env,
};
use datasize::DataSize;
use itertools::Itertools;
use once_cell::sync::Lazy;
use tracing::{debug, info, warn};
use casper_types::{PublicKey, Timestamp, U512};
use crate::components::consensus::{
cl_context::ClContext,
consensus_protocol::{ConsensusProtocol, ProposedBlock},
protocols::{highway::HighwayProtocol, zug::Zug},
};
const CASPER_ENABLE_DETAILED_CONSENSUS_METRICS_ENV_VAR: &str =
"CASPER_ENABLE_DETAILED_CONSENSUS_METRICS";
static CASPER_ENABLE_DETAILED_CONSENSUS_METRICS: Lazy<bool> =
Lazy::new(|| env::var(CASPER_ENABLE_DETAILED_CONSENSUS_METRICS_ENV_VAR).is_ok());
#[derive(DataSize)]
pub struct ValidationState {
validated: bool,
missing_evidence: Vec<PublicKey>,
}
impl ValidationState {
fn new(missing_evidence: Vec<PublicKey>) -> Self {
ValidationState {
validated: false,
missing_evidence,
}
}
fn is_complete(&self) -> bool {
self.validated && self.missing_evidence.is_empty()
}
}
pub struct Era {
pub(crate) consensus: Box<dyn ConsensusProtocol<ClContext>>,
pub(crate) start_time: Timestamp,
pub(crate) start_height: u64,
pub(crate) validation_states: HashMap<ProposedBlock<ClContext>, ValidationState>,
pub(crate) faulty: HashSet<PublicKey>,
pub(crate) cannot_propose: HashSet<PublicKey>,
pub(crate) accusations: HashSet<PublicKey>,
pub(crate) validators: BTreeMap<PublicKey, U512>,
}
impl Era {
pub(crate) fn new(
consensus: Box<dyn ConsensusProtocol<ClContext>>,
start_time: Timestamp,
start_height: u64,
faulty: HashSet<PublicKey>,
cannot_propose: HashSet<PublicKey>,
validators: BTreeMap<PublicKey, U512>,
) -> Self {
Era {
consensus,
start_time,
start_height,
validation_states: HashMap::new(),
faulty,
cannot_propose,
accusations: HashSet::new(),
validators,
}
}
pub(crate) fn add_block(
&mut self,
proposed_block: ProposedBlock<ClContext>,
missing_evidence: Vec<PublicKey>,
) {
self.validation_states
.insert(proposed_block, ValidationState::new(missing_evidence));
}
pub(crate) fn resolve_evidence_and_mark_faulty(
&mut self,
pub_key: &PublicKey,
) -> Vec<ProposedBlock<ClContext>> {
for pc in self.validation_states.values_mut() {
pc.missing_evidence.retain(|pk| pk != pub_key);
}
self.consensus.mark_faulty(pub_key);
let (complete, incomplete): (HashMap<_, _>, HashMap<_, _>) = self
.validation_states
.drain()
.partition(|(_, validation_state)| validation_state.is_complete());
self.validation_states = incomplete;
complete.into_keys().collect()
}
pub(crate) fn resolve_validity(
&mut self,
proposed_block: &ProposedBlock<ClContext>,
valid: bool,
) -> bool {
if valid {
if let Some(vs) = self.validation_states.get_mut(proposed_block) {
if !vs.missing_evidence.is_empty() {
info!("Cannot resolve validity of proposed block (timestamp {}) due to missing_evidence still present.", proposed_block.context().timestamp());
vs.validated = true;
return false;
}
}
}
self.validation_states.remove(proposed_block).is_some()
}
pub(crate) fn add_accusations(&mut self, accusations: &[PublicKey]) {
for pub_key in accusations {
if !self.faulty.contains(pub_key) {
self.accusations.insert(pub_key.clone());
}
}
}
pub(crate) fn accusations(&self) -> Vec<PublicKey> {
self.accusations.iter().cloned().sorted().collect()
}
pub(crate) fn validators(&self) -> &BTreeMap<PublicKey, U512> {
&self.validators
}
}
impl DataSize for Era {
const IS_DYNAMIC: bool = true;
const STATIC_HEAP_SIZE: usize = 0;
#[inline]
fn estimate_heap_size(&self) -> usize {
let Era {
consensus,
start_time,
start_height,
validation_states,
faulty,
cannot_propose,
accusations,
validators,
} = self;
let consensus_heap_size = {
let any_ref = consensus.as_any();
if let Some(highway) = any_ref.downcast_ref::<HighwayProtocol<ClContext>>() {
if *CASPER_ENABLE_DETAILED_CONSENSUS_METRICS {
let detailed = (*highway).estimate_detailed_heap_size();
match serde_json::to_string(&detailed) {
Ok(encoded) => debug!(%encoded, "consensus memory metrics"),
Err(err) => warn!(%err, "error encoding consensus memory metrics"),
}
detailed.total()
} else {
(*highway).estimate_heap_size()
}
} else if let Some(zug) = any_ref.downcast_ref::<Zug<ClContext>>() {
if *CASPER_ENABLE_DETAILED_CONSENSUS_METRICS {
let detailed = (*zug).estimate_detailed_heap_size();
match serde_json::to_string(&detailed) {
Ok(encoded) => debug!(%encoded, "consensus memory metrics"),
Err(err) => warn!(%err, "error encoding consensus memory metrics"),
}
detailed.total()
} else {
(*zug).estimate_heap_size()
}
} else {
warn!("could not downcast consensus protocol to determine heap allocation size");
0
}
};
consensus_heap_size
.saturating_add(start_time.estimate_heap_size())
.saturating_add(start_height.estimate_heap_size())
.saturating_add(validation_states.estimate_heap_size())
.saturating_add(faulty.estimate_heap_size())
.saturating_add(cannot_propose.estimate_heap_size())
.saturating_add(accusations.estimate_heap_size())
.saturating_add(validators.estimate_heap_size())
}
}