use std::collections::{BTreeMap, HashMap, VecDeque};
use std::fmt::{Debug, Display};
use std::ops::{Add, Sub};
use std::sync::Arc;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub use crate::config::{Config, TimeoutValues};
pub use crate::error::ConsensusError;
use crate::internal::{InternalConsensus, InternalParams};
use crate::wal::{delete_wal_file, FileWalSink, NoopWal, WalSink};
mod config;
mod error;
mod internal;
mod wal;
pub type Signature = malachite_signing_ed25519::Signature;
pub type SigningKey = ed25519_consensus::SigningKey;
pub trait ValidatorAddress:
Sync + Send + Ord + Display + Debug + Default + Clone + Into<Vec<u8>> + Serialize + DeserializeOwned
{
}
impl<T> ValidatorAddress for T where
T: Sync
+ Send
+ Ord
+ Display
+ Debug
+ Default
+ Clone
+ Into<Vec<u8>>
+ Serialize
+ DeserializeOwned
{
}
pub trait ValuePayload:
Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
{
}
impl<T> ValuePayload for T where
T: Sync + Send + Ord + Display + Debug + Default + Clone + Serialize + DeserializeOwned
{
}
pub struct Consensus<
V: ValuePayload + 'static,
A: ValidatorAddress + 'static,
P: ProposerSelector<A> + Send + Sync + 'static,
> {
internal: HashMap<u64, InternalConsensus<V, A, P>>,
event_queue: VecDeque<ConsensusEvent<V, A>>,
config: Config<A>,
proposer_selector: P,
min_kept_height: Option<u64>,
last_decided_height: Option<u64>,
}
impl<
V: ValuePayload + 'static,
A: ValidatorAddress + 'static,
P: ProposerSelector<A> + Send + Sync + 'static,
> Consensus<V, A, P>
{
pub fn new(config: Config<A>) -> DefaultConsensus<V, A> {
Consensus {
internal: HashMap::new(),
event_queue: VecDeque::new(),
config,
proposer_selector: RoundRobinProposerSelector,
min_kept_height: None,
last_decided_height: None,
}
}
pub fn with_proposer_selector<PS: ProposerSelector<A> + Send + Sync + 'static>(
config: Config<A>,
proposer_selector: PS,
) -> Consensus<V, A, PS> {
Consensus {
internal: HashMap::new(),
event_queue: VecDeque::new(),
config,
proposer_selector,
min_kept_height: None,
last_decided_height: None,
}
}
pub fn recover<VS: ValidatorSetProvider<A> + 'static>(
config: Config<A>,
validator_sets: Arc<VS>,
highest_committed: Option<u64>,
) -> anyhow::Result<DefaultConsensus<V, A>> {
Self::recover_inner(
Self::new(config.clone()),
config,
validator_sets,
highest_committed,
)
}
pub fn recover_with_proposal_selector<
VS: ValidatorSetProvider<A> + 'static,
PS: ProposerSelector<A> + Send + Sync + 'static,
>(
config: Config<A>,
validator_sets: Arc<VS>,
proposer_selector: PS,
highest_committed: Option<u64>,
) -> anyhow::Result<Consensus<V, A, PS>> {
Self::recover_inner(
Self::with_proposer_selector(config.clone(), proposer_selector),
config,
validator_sets,
highest_committed,
)
}
fn recover_inner<
VS: ValidatorSetProvider<A> + 'static,
PS: ProposerSelector<A> + Send + Sync + 'static,
>(
mut consensus: Consensus<V, A, PS>,
config: Config<A>,
validator_sets: Arc<VS>,
highest_committed: Option<u64>,
) -> anyhow::Result<Consensus<V, A, PS>> {
use crate::wal::recovery;
tracing::info!(
validator = ?config.address,
wal_dir = %config.wal_dir.display(),
"Starting consensus recovery from WAL"
);
let (incomplete_heights, finalized_heights, highest_decision) =
match recovery::recover_incomplete_heights(&config.wal_dir, highest_committed) {
Ok((incomplete, finalized, decision_height)) => {
tracing::info!(
validator = ?config.address,
incomplete_heights = incomplete.len(),
finalized_heights = finalized.len(),
highest_decision = ?decision_height,
"Found incomplete and finalized heights in WAL"
);
(incomplete, finalized, decision_height)
}
Err(e) => {
tracing::error!(
validator = ?config.address,
wal_dir = %config.wal_dir.display(),
error = %e,
"Failed to recover incomplete heights from WAL"
);
(Vec::new(), Vec::new(), None)
}
};
consensus.last_decided_height = highest_decision;
if let Some(h) = highest_decision {
tracing::info!(
validator = ?consensus.config.address,
last_decided_height = %h,
"Set last_decided_height from WAL recovery"
);
}
let max_height = incomplete_heights
.iter()
.chain(finalized_heights.iter())
.map(|(height, _)| *height)
.max()
.or(highest_decision);
let min_height_to_restore =
max_height.and_then(|max| max.checked_sub(config.history_depth));
for (height, entries) in finalized_heights {
let should_restore = min_height_to_restore
.map(|min| height >= min)
.unwrap_or(true);
if should_restore {
tracing::info!(
validator = ?consensus.config.address,
height = %height,
"Restoring finalized height within history_depth"
);
let validator_set = validator_sets.get_validator_set(height)?;
let mut internal_consensus = consensus.create_consensus(height, &validator_set);
internal_consensus.recover_from_wal(entries);
if !internal_consensus.is_finalized() {
internal_consensus
.handle_command(ConsensusCommand::StartHeight(height, validator_set));
}
consensus.internal.insert(height, internal_consensus);
} else {
tracing::debug!(
validator = ?consensus.config.address,
height = %height,
min_height = ?min_height_to_restore,
"Skipping finalized height outside history_depth"
);
}
}
for (height, entries) in incomplete_heights {
tracing::info!(
validator = ?consensus.config.address,
height = %height,
entry_count = entries.len(),
"Recovering incomplete height from WAL"
);
let validator_set = validator_sets.get_validator_set(height)?;
let mut internal_consensus = consensus.create_consensus(height, &validator_set);
internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
internal_consensus.recover_from_wal(entries);
consensus.internal.insert(height, internal_consensus);
}
consensus.min_kept_height = min_height_to_restore;
tracing::info!(
validator = ?consensus.config.address,
recovered_heights = consensus.internal.len(),
last_decided_height = ?consensus.last_decided_height,
min_kept_height = ?consensus.min_kept_height,
"Completed consensus recovery"
);
Ok(consensus)
}
fn create_consensus(
&mut self,
height: u64,
validator_set: &ValidatorSet<A>,
) -> InternalConsensus<V, A, P> {
let params = InternalParams {
height,
validator_set: validator_set.clone(),
address: self.config.address.clone(),
threshold_params: self.config.threshold_params,
value_payload: malachite_types::ValuePayload::ProposalOnly,
};
let wal = match FileWalSink::new(&self.config.address, height, &self.config.wal_dir) {
Ok(wal) => Box::new(wal) as Box<dyn WalSink<V, A>>,
Err(e) => {
tracing::error!(
validator = ?self.config.address,
height = %height,
error = %e,
"Failed to create wal for height"
);
Box::new(NoopWal)
}
};
InternalConsensus::new(
params,
self.config.timeout_values.clone(),
wal,
self.proposer_selector.clone(),
)
}
pub fn handle_command(&mut self, cmd: ConsensusCommand<V, A>) {
match cmd {
ConsensusCommand::StartHeight(height, validator_set) => {
let mut consensus = self.create_consensus(height, &validator_set);
consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
self.internal.insert(height, consensus);
tracing::debug!(
validator = ?self.config.address,
height = %height,
"Started new consensus"
);
}
other => {
let height = other.height();
if let Some(engine) = self.internal.get_mut(&height) {
engine.handle_command(other);
} else {
tracing::warn!(
validator = ?self.config.address,
height = %height,
command = ?other,
"Received command for unknown height"
);
}
}
}
}
pub async fn next_event(&mut self) -> Option<ConsensusEvent<V, A>> {
let mut finished_heights = Vec::new();
for (height, engine) in self.internal.iter_mut() {
if let Some(event) = engine.poll_internal().await {
tracing::trace!(
validator = ?self.config.address,
height = %height,
event = ?event,
"Engine returned event"
);
if let ConsensusEvent::Decision { height, .. } = &event {
finished_heights.push(*height);
self.last_decided_height = Some(
self.last_decided_height
.map(|h| h.max(*height))
.unwrap_or(*height),
);
}
self.event_queue.push_back(event);
}
}
if !finished_heights.is_empty() {
self.prune_old_engines();
}
self.event_queue.pop_front()
}
fn prune_old_engines(&mut self) {
let max_height = self.internal.keys().max().copied();
if let Some(max_height) = max_height {
let new_min_height = max_height.checked_sub(self.config.history_depth);
if let Some(new_min) = new_min_height {
let pruned_heights: Vec<u64> = self
.internal
.keys()
.filter(|height| **height < new_min)
.copied()
.collect();
self.min_kept_height = Some(new_min);
self.internal.retain(|height, _| *height >= new_min);
for height in &pruned_heights {
if let Err(e) =
delete_wal_file(&self.config.address, *height, &self.config.wal_dir)
{
tracing::warn!(
validator = ?self.config.address,
height = %height,
error = %e,
"Failed to delete WAL file for pruned height"
);
}
}
tracing::debug!(
validator = ?self.config.address,
min_height = %new_min,
max_height = %max_height,
pruned_count = pruned_heights.len(),
"Pruned old consensus engines and deleted WAL files"
);
}
}
}
pub fn is_height_finalized(&self, height: u64) -> bool {
if let Some(engine) = self.internal.get(&height) {
engine.is_finalized()
} else {
if let Some(min_height) = self.min_kept_height {
if height < min_height {
return true;
}
}
false
}
}
pub fn is_height_active(&self, height: u64) -> bool {
self.internal.contains_key(&height)
}
pub fn max_active_height(&self) -> Option<u64> {
self.internal.keys().max().copied()
}
pub fn last_decided_height(&self) -> Option<u64> {
self.last_decided_height
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Round(pub Option<u32>);
impl Round {
pub fn new(round: u32) -> Self {
Self(Some(round))
}
pub fn nil() -> Self {
Self(None)
}
pub fn as_u32(&self) -> Option<u32> {
self.0
}
}
impl From<u32> for Round {
fn from(round: u32) -> Self {
Self::new(round)
}
}
impl Add<u32> for Round {
type Output = Self;
fn add(self, rhs: u32) -> Self::Output {
Self(self.0.map(|round| round + rhs))
}
}
impl Sub<u32> for Round {
type Output = Self;
fn sub(self, rhs: u32) -> Self::Output {
Self(self.0.map(|round| round - rhs))
}
}
impl Display for Round {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
Some(round) => write!(f, "{round}"),
None => write!(f, "Nil"),
}
}
}
impl Debug for Round {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Proposal<V, A> {
pub height: u64,
pub round: Round,
pub value: V,
pub pol_round: Round,
pub proposer: A,
}
impl<V: Debug, A: Debug> std::fmt::Debug for Proposal<V, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"H:{} R:{} From:{:?} Val:{:?}",
self.height, self.round, self.proposer, self.value
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum VoteType {
Prevote,
Precommit,
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Vote<V, A> {
pub r#type: VoteType,
pub height: u64,
pub round: Round,
pub value: Option<V>,
pub validator_address: A,
}
impl<V: Debug, A: Debug> std::fmt::Debug for Vote<V, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let val = match &self.value {
Some(val) => format!("{val:?}"),
None => "Nil".to_string(),
};
write!(
f,
"H:{} R:{} {:?} From:{:?} Val:{val}",
self.height, self.round, self.r#type, self.validator_address
)
}
}
impl<V, A> Vote<V, A> {
pub fn is_nil(&self) -> bool {
self.value.is_none()
}
}
#[derive(Clone, Serialize, Deserialize)]
pub struct SignedProposal<V, A> {
pub proposal: Proposal<V, A>,
pub signature: Signature,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct SignedVote<V, A> {
pub vote: Vote<V, A>,
pub signature: Signature,
}
impl<V: Debug, A: Debug> std::fmt::Debug for SignedProposal<V, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.proposal)
}
}
impl<V: Debug, A: Debug> std::fmt::Debug for SignedVote<V, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.vote)
}
}
pub type PublicKey = malachite_signing_ed25519::PublicKey;
pub type VotingPower = u64;
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Validator<A> {
pub address: A,
pub public_key: PublicKey,
pub voting_power: VotingPower,
}
impl<A: Debug> std::fmt::Debug for Validator<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?} ({})", self.address, self.voting_power)
}
}
impl<A> Validator<A> {
pub fn new(address: A, public_key: PublicKey) -> Self {
Self {
address,
public_key,
voting_power: 1,
}
}
pub fn with_voting_power(mut self, voting_power: VotingPower) -> Self {
self.voting_power = voting_power;
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ValidatorSet<A> {
pub validators: Vec<Validator<A>>,
}
impl<A: Clone + Ord> ValidatorSet<A> {
pub fn new(validators: impl IntoIterator<Item = Validator<A>>) -> Self {
let validators: BTreeMap<A, Validator<A>> = validators
.into_iter()
.map(|v| (v.address.clone(), v))
.collect();
assert!(!validators.is_empty());
let validators = validators.into_values().collect::<Vec<Validator<A>>>();
Self { validators }
}
pub fn count(&self) -> usize {
self.validators.len()
}
}
pub enum ConsensusCommand<V, A> {
StartHeight(u64, ValidatorSet<A>),
Propose(Proposal<V, A>),
Proposal(SignedProposal<V, A>),
Vote(SignedVote<V, A>),
}
impl<V, A> ConsensusCommand<V, A> {
pub fn height(&self) -> u64 {
match self {
ConsensusCommand::StartHeight(height, _) => *height,
ConsensusCommand::Propose(proposal) => proposal.height,
ConsensusCommand::Proposal(proposal) => proposal.proposal.height,
ConsensusCommand::Vote(vote) => vote.vote.height,
}
}
}
impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusCommand<V, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsensusCommand::StartHeight(height, validator_set) => write!(
f,
"StartHeight({}, [{}])",
height,
validator_set
.validators
.iter()
.map(|v| format!("{:?}", v.address))
.collect::<Vec<_>>()
.join(", ")
),
ConsensusCommand::Propose(proposal) => write!(f, "Propose({proposal:?})"),
ConsensusCommand::Proposal(proposal) => write!(f, "Proposal({proposal:?})"),
ConsensusCommand::Vote(vote) => write!(f, "Vote({vote:?})"),
}
}
}
#[derive(Clone, Debug)]
pub enum NetworkMessage<V, A> {
Proposal(SignedProposal<V, A>),
Vote(SignedVote<V, A>),
}
pub enum ConsensusEvent<V, A> {
Gossip(NetworkMessage<V, A>),
RequestProposal { height: u64, round: u32 },
Decision { height: u64, round: u32, value: V },
Error(ConsensusError),
}
impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusEvent<V, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConsensusEvent::Gossip(msg) => match msg {
NetworkMessage::Proposal(proposal) => write!(f, "Gossip(Proposal({proposal:?}))"),
NetworkMessage::Vote(vote) => write!(f, "Gossip(Vote({vote:?}))"),
},
ConsensusEvent::RequestProposal { height, round, .. } => {
write!(f, "RequestProposal(H:{height} R:{round})")
}
ConsensusEvent::Decision {
height,
round,
value,
} => {
write!(f, "Decision(H:{height} R: {round} Val:{value:?})")
}
ConsensusEvent::Error(error) => write!(f, "Error({error:?})"),
}
}
}
pub trait ValidatorSetProvider<A> {
fn get_validator_set(&self, height: u64) -> Result<ValidatorSet<A>, anyhow::Error>;
}
pub trait ProposerSelector<A: ValidatorAddress>: Clone + Send + Sync {
fn select_proposer<'a>(
&self,
validator_set: &'a ValidatorSet<A>,
height: u64,
round: u32,
) -> &'a Validator<A>;
}
#[derive(Clone, Default)]
pub struct RoundRobinProposerSelector;
impl<A: ValidatorAddress> ProposerSelector<A> for RoundRobinProposerSelector {
fn select_proposer<'a>(
&self,
validator_set: &'a ValidatorSet<A>,
_height: u64,
round: u32,
) -> &'a Validator<A> {
let index = round as usize % validator_set.count();
&validator_set.validators[index]
}
}
pub type DefaultConsensus<V, A> = Consensus<V, A, RoundRobinProposerSelector>;
pub struct StaticValidatorSetProvider<A> {
validator_set: ValidatorSet<A>,
}
impl<A> StaticValidatorSetProvider<A> {
pub fn new(validator_set: ValidatorSet<A>) -> Self {
Self { validator_set }
}
}
impl<A: Clone + Send + Sync> ValidatorSetProvider<A> for StaticValidatorSetProvider<A> {
fn get_validator_set(&self, _height: u64) -> Result<ValidatorSet<A>, anyhow::Error> {
Ok(self.validator_set.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn regression_validator_set_is_unique_by_address() {
let with_duplicates = [1, 1, 2, 2, 2, 3, 3, 3, 3, 2, 1, 1, 2, 3, 2, 2, 1, 1, 3, 3]
.into_iter()
.map(|i| Validator::new(i, crate::PublicKey::from_bytes([0; 32])));
let set = ValidatorSet::new(with_duplicates);
assert_eq!(
set.validators.iter().map(|v| v.address).collect::<Vec<_>>(),
vec![1, 2, 3]
);
}
}