use crate::error::{Error, Result};
use crate::node::Node;
use crate::pbft::{Message, Round};
use crate::types::{Block, Role, Stage};
pub struct Config {
fault_tolerance: u64,
expected_proposers: u64,
expected_validators: u64,
expected_packers: u64,
}
impl Config {
pub fn new(fault_tolerance: u64) -> Self {
Self {
fault_tolerance,
expected_proposers: 1,
expected_validators: 3 * fault_tolerance + 1,
expected_packers: fault_tolerance + 1,
}
}
pub fn with_expected_committee(mut self, n: u64) -> Self {
self.expected_validators = n;
self
}
pub fn threshold(&self) -> u64 {
2 * self.fault_tolerance + 1
}
pub fn min_nodes(&self) -> u64 {
3 * self.fault_tolerance + 1
}
}
pub struct ConsensusEngine {
nodes: Vec<Node>,
config: Config,
round: u64,
committed: Vec<Block>,
messages: Vec<Message>,
}
impl ConsensusEngine {
pub fn new(config: Config) -> Self {
Self {
nodes: Vec::new(),
config,
round: 0,
committed: Vec::new(),
messages: Vec::new(),
}
}
pub fn add_node(&mut self, node: Node) {
self.nodes.push(node);
}
pub fn committed_blocks(&self) -> &[Block] {
&self.committed
}
pub fn round(&self) -> u64 {
self.round
}
pub fn nodes(&self) -> &[Node] {
&self.nodes
}
pub fn messages(&self) -> &[Message] {
&self.messages
}
fn total_weight(&self) -> u64 {
self.nodes.iter().map(|n| n.weight()).sum()
}
pub fn run_round(&mut self) -> Result<Block> {
let min = self.config.min_nodes() as usize;
if self.nodes.len() < min {
return Err(Error::NotEnoughNodes {
needed: min,
have: self.nodes.len(),
});
}
self.round += 1;
self.messages.clear();
let total_weight = self.total_weight();
let role_expected = [
(Role::Proposer, self.config.expected_proposers),
(Role::Validator, self.config.expected_validators),
(Role::Packer, self.config.expected_packers),
];
for node in &mut self.nodes {
node.assign_role(self.round, total_weight, &role_expected)?;
}
let proposer_idx = self
.nodes
.iter()
.position(|n| n.role() == Role::Proposer)
.ok_or(Error::NoProposer(self.round))?;
let validator_indices: Vec<usize> = self
.nodes
.iter()
.enumerate()
.filter(|(_, n)| n.role() == Role::Validator)
.map(|(i, _)| i)
.collect();
let effective_threshold = if validator_indices.is_empty() {
self.config.threshold()
} else {
let adaptive = (validator_indices.len() as u64 * 2 / 3) + 1;
adaptive.min(self.config.threshold())
};
let mut protocol = Round::new(self.round, Role::Proposer, effective_threshold);
let block = self.nodes[proposer_idx].propose_block(self.round)?;
let block_hash = block.hash();
protocol.set_block(block.clone());
self.messages.push(Message::pre_prepare(
self.nodes[proposer_idx].id(),
self.round,
block.clone(),
));
protocol.advance(Stage::Prepare)?;
for &idx in &validator_indices {
let approve = self.nodes[idx].validate_block(&block);
if approve {
protocol.add_prepare_vote(self.nodes[idx].id());
}
self.messages.push(Message::prepare(
self.nodes[idx].id(),
self.round,
block_hash,
approve,
));
}
if !protocol.has_prepare_quorum() {
return Err(Error::InsufficientVotes {
needed: effective_threshold,
got: protocol.prepare_count(),
});
}
protocol.advance(Stage::Commit)?;
for &idx in &validator_indices {
let approve = self.nodes[idx].validate_block(&block);
if approve {
protocol.add_commit_vote(self.nodes[idx].id());
}
self.messages.push(Message::commit(
self.nodes[idx].id(),
self.round,
block_hash,
approve,
));
}
if !protocol.has_commit_quorum() {
return Err(Error::InsufficientVotes {
needed: effective_threshold,
got: protocol.commit_count(),
});
}
protocol.advance(Stage::Reply)?;
self.messages.push(Message::reply(
self.nodes[proposer_idx].id(),
self.round,
block.clone(),
));
for node in &mut self.nodes {
node.commit_block(block.clone());
}
self.committed.push(block.clone());
Ok(block)
}
pub fn run(&mut self, rounds: u64) -> Result<Vec<Block>> {
let mut blocks = Vec::new();
for _ in 0..rounds {
match self.run_round() {
Ok(block) => blocks.push(block),
Err(Error::NoProposer(_)) => continue,
Err(Error::InsufficientVotes { .. }) => continue,
Err(e) => return Err(e),
}
}
Ok(blocks)
}
}