use std::collections::{BTreeMap, HashMap, HashSet};
use creep::Context;
use hummer::coding::hex_encode;
use crate::types::{
Address, AggregatedChoke, AggregatedVote, Hash, SignedChoke, SignedProposal, SignedVote,
VoteType,
};
use crate::{error::ConsensusError, Codec, ConsensusResult};
#[derive(Clone, Debug)]
pub struct ProposalCollector<T: Codec>(BTreeMap<u64, ProposalRoundCollector<T>>);
impl<T> ProposalCollector<T>
where
T: Codec,
{
pub fn new() -> Self {
ProposalCollector(BTreeMap::new())
}
pub fn insert(
&mut self,
ctx: Context,
height: u64,
round: u64,
proposal: SignedProposal<T>,
) -> ConsensusResult<()> {
self.0
.entry(height)
.or_insert_with(ProposalRoundCollector::new)
.insert(ctx, round, proposal)
.map_err(|_| ConsensusError::MultiProposal(height, round))
}
pub fn get(&self, height: u64, round: u64) -> ConsensusResult<(SignedProposal<T>, Context)> {
if let Some(round_collector) = self.0.get(&height) {
return Ok(round_collector
.get(round)
.map_err(|_| {
ConsensusError::StorageErr(format!(
"No proposal height {}, round {}",
height, round
))
})?
.to_owned());
}
Err(ConsensusError::StorageErr(format!(
"No proposal height {}, round {}",
height, round
)))
}
pub fn get_height_proposals(
&mut self,
height: u64,
) -> Option<Vec<(SignedProposal<T>, Context)>> {
self.0.remove(&height).map_or_else(
|| None,
|map| Some(map.0.values().cloned().collect::<Vec<_>>()),
)
}
pub fn flush(&mut self, till: u64) {
self.0 = self.0.split_off(&till);
}
}
#[derive(Clone, Debug)]
struct ProposalRoundCollector<T: Codec>(HashMap<u64, (SignedProposal<T>, Context)>);
impl<T> ProposalRoundCollector<T>
where
T: Codec,
{
fn new() -> Self {
ProposalRoundCollector(HashMap::new())
}
fn insert(
&mut self,
ctx: Context,
round: u64,
proposal: SignedProposal<T>,
) -> ConsensusResult<()> {
if let Some((sp, _)) = self.0.get(&round) {
if sp == &proposal {
return Ok(());
}
return Err(ConsensusError::Other("_".to_string()));
}
self.0.insert(round, (proposal, ctx));
Ok(())
}
fn get(&self, round: u64) -> ConsensusResult<&(SignedProposal<T>, Context)> {
self.0
.get(&round)
.ok_or_else(|| ConsensusError::StorageErr("_".to_string()))
}
}
#[derive(Clone, Debug)]
pub struct VoteCollector(BTreeMap<u64, VoteRoundCollector>);
impl VoteCollector {
pub fn new() -> Self {
VoteCollector(BTreeMap::new())
}
pub fn insert_vote(&mut self, ctx: Context, hash: Hash, vote: SignedVote, addr: Address) {
self.0
.entry(vote.get_height())
.or_insert_with(VoteRoundCollector::new)
.insert_vote(ctx, hash, vote, addr);
}
pub fn set_qc(&mut self, qc: AggregatedVote) {
self.0
.entry(qc.get_height())
.or_insert_with(VoteRoundCollector::new)
.set_qc(qc);
}
pub fn get_vote_map(
&mut self,
height: u64,
round: u64,
vote_type: VoteType,
) -> ConsensusResult<&HashMap<Hash, HashSet<Address>>> {
self.0
.get_mut(&height)
.and_then(|vrc| vrc.get_vote_map(round, vote_type.clone()))
.ok_or_else(|| {
ConsensusError::StorageErr(format!(
"Can not get {:?} vote map height {}, round {}",
vote_type, height, round
))
})
}
pub fn get_votes(
&mut self,
height: u64,
round: u64,
vote_type: VoteType,
hash: &Hash,
) -> ConsensusResult<Vec<(SignedVote, Context)>> {
self.0
.get_mut(&height)
.and_then(|vrc| vrc.get_votes(round, vote_type.clone(), hash))
.ok_or_else(|| {
ConsensusError::StorageErr(format!(
"Can not get {:?} votes height {}, round {}",
vote_type, height, round
))
})
}
pub fn get_qc_by_id(
&mut self,
height: u64,
round: u64,
qc_type: VoteType,
) -> ConsensusResult<AggregatedVote> {
self.0
.get_mut(&height)
.and_then(|vrc| vrc.get_qc_by_id(round, qc_type.clone()))
.ok_or_else(|| {
ConsensusError::StorageErr(format!(
"Can not get {:?} qc height {}, round {}",
qc_type, height, round
))
})
}
pub fn get_qc_by_hash(
&mut self,
height: u64,
hash: Hash,
qc_type: VoteType,
) -> Option<AggregatedVote> {
self.0
.get_mut(&height)
.and_then(|vrc| vrc.get_qc_by_hash(hash, qc_type))
}
#[allow(clippy::type_complexity)]
pub fn get_height_votes(
&mut self,
height: u64,
) -> Option<(Vec<(SignedVote, Context)>, Vec<AggregatedVote>)> {
self.0.remove(&height).map_or_else(
|| None,
|mut vrc| {
let mut votes = Vec::new();
let mut qcs = Vec::new();
for (_, rc) in vrc.general.iter_mut() {
votes.append(&mut rc.prevote.get_all_votes());
votes.append(&mut rc.precommit.get_all_votes());
qcs.append(&mut rc.qc.get_all_qcs());
}
Some((votes, qcs))
},
)
}
pub fn vote_count(&self, height: u64, round: u64, vote_type: VoteType) -> usize {
if let Some(vrc) = self.0.get(&height) {
return vrc.vote_count(round, vote_type);
}
0
}
pub fn flush(&mut self, till: u64) {
self.0 = self.0.split_off(&till);
}
}
#[derive(Clone, Debug)]
struct VoteRoundCollector {
general: HashMap<u64, RoundCollector>,
qc_by_hash: HashMap<Hash, QuorumCertificate>,
}
impl VoteRoundCollector {
fn new() -> Self {
VoteRoundCollector {
general: HashMap::new(),
qc_by_hash: HashMap::new(),
}
}
fn insert_vote(&mut self, ctx: Context, hash: Hash, vote: SignedVote, addr: Address) {
self.general
.entry(vote.get_round())
.or_insert_with(RoundCollector::new)
.insert_vote(ctx, hash, vote, addr);
}
fn set_qc(&mut self, qc: AggregatedVote) {
self.qc_by_hash
.entry(qc.block_hash.clone())
.or_insert_with(QuorumCertificate::new)
.set_quorum_certificate(qc.clone());
self.general
.entry(qc.get_round())
.or_insert_with(RoundCollector::new)
.set_qc(qc);
}
fn get_vote_map(
&mut self,
round: u64,
vote_type: VoteType,
) -> Option<&HashMap<Hash, HashSet<Address>>> {
self.general.get_mut(&round).and_then(|rc| {
let res = rc.get_vote_map(vote_type);
if res.is_empty() {
return None;
}
Some(res)
})
}
fn get_votes(
&mut self,
round: u64,
vote_type: VoteType,
hash: &Hash,
) -> Option<Vec<(SignedVote, Context)>> {
self.general
.get_mut(&round)
.and_then(|rc| rc.get_votes(vote_type, hash))
}
fn get_qc_by_id(&mut self, round: u64, qc_type: VoteType) -> Option<AggregatedVote> {
self.general
.get_mut(&round)
.and_then(|rc| rc.get_qc(qc_type))
}
fn get_qc_by_hash(&self, hash: Hash, qc_type: VoteType) -> Option<AggregatedVote> {
if let Some(qcs) = self.qc_by_hash.get(&hash) {
return qcs.get_quorum_certificate(qc_type);
}
None
}
fn vote_count(&self, round: u64, vote_type: VoteType) -> usize {
if let Some(rc) = self.general.get(&round) {
return rc.vote_count(vote_type);
}
0
}
}
#[derive(Clone, Debug)]
struct RoundCollector {
qc: QuorumCertificate,
prevote: Votes,
precommit: Votes,
}
impl RoundCollector {
fn new() -> Self {
RoundCollector {
qc: QuorumCertificate::new(),
prevote: Votes::new(),
precommit: Votes::new(),
}
}
fn insert_vote(&mut self, ctx: Context, hash: Hash, vote: SignedVote, addr: Address) {
if vote.is_prevote() {
self.prevote.insert(ctx, hash, addr, vote);
} else {
self.precommit.insert(ctx, hash, addr, vote);
}
}
fn set_qc(&mut self, qc: AggregatedVote) {
self.qc.set_quorum_certificate(qc);
}
fn get_vote_map(&self, vote_type: VoteType) -> &HashMap<Hash, HashSet<Address>> {
match vote_type {
VoteType::Prevote => self.prevote.get_vote_map(),
VoteType::Precommit => self.precommit.get_vote_map(),
}
}
fn get_votes(
&mut self,
vote_type: VoteType,
hash: &Hash,
) -> Option<Vec<(SignedVote, Context)>> {
match vote_type {
VoteType::Prevote => self.prevote.get_votes(hash),
VoteType::Precommit => self.precommit.get_votes(hash),
}
}
fn get_qc(&mut self, qc_type: VoteType) -> Option<AggregatedVote> {
self.qc.get_quorum_certificate(qc_type)
}
fn vote_count(&self, vote_type: VoteType) -> usize {
if vote_type == VoteType::Prevote {
return self.prevote.vote_count();
}
self.precommit.vote_count()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct QuorumCertificate {
prevote: Option<AggregatedVote>,
precommit: Option<AggregatedVote>,
}
impl QuorumCertificate {
fn new() -> Self {
QuorumCertificate {
prevote: None,
precommit: None,
}
}
fn set_quorum_certificate(&mut self, qc: AggregatedVote) {
if qc.is_prevote_qc() {
self.prevote = Some(qc);
} else {
self.precommit = Some(qc);
}
}
fn get_quorum_certificate(&self, qc_type: VoteType) -> Option<AggregatedVote> {
match qc_type {
VoteType::Prevote => self.prevote.clone(),
VoteType::Precommit => self.precommit.clone(),
}
}
fn get_all_qcs(&mut self) -> Vec<AggregatedVote> {
let mut res = Vec::new();
if let Some(tmp) = self.prevote.clone() {
res.push(tmp);
}
if let Some(tmp) = self.precommit.clone() {
res.push(tmp);
}
res
}
}
#[derive(Clone, Debug)]
struct Votes {
by_hash: HashMap<Hash, HashSet<Address>>,
by_address: HashMap<Address, (SignedVote, Context)>,
}
impl Votes {
fn new() -> Self {
Votes {
by_hash: HashMap::new(),
by_address: HashMap::new(),
}
}
fn insert(&mut self, ctx: Context, hash: Hash, addr: Address, vote: SignedVote) {
if self.by_address.contains_key(&addr) {
let exist = self.by_address.get(&addr).unwrap().clone();
if vote.vote.block_hash != exist.0.vote.block_hash {
log::error!("Overlord: VoteCollector detects byzantine behaviour: existing: {:?}, signed vote inserting: {:?}",
exist,vote);
}
return;
}
self.by_hash
.entry(hash)
.or_insert_with(HashSet::new)
.insert(addr.clone());
self.by_address.entry(addr).or_insert((vote, ctx));
}
fn get_vote_map(&self) -> &HashMap<Hash, HashSet<Address>> {
&self.by_hash
}
fn get_votes(&mut self, hash: &Hash) -> Option<Vec<(SignedVote, Context)>> {
self.by_hash.get(hash).and_then(|addresses| {
addresses
.iter()
.map(|addr| self.by_address.get(addr).cloned())
.collect::<Option<Vec<_>>>()
})
}
fn get_all_votes(&mut self) -> Vec<(SignedVote, Context)> {
self.by_address.values().cloned().collect::<Vec<_>>()
}
fn vote_count(&self) -> usize {
self.by_address.len()
}
}
#[derive(Clone, Debug)]
pub struct ChokeCollector {
chokes: BTreeMap<u64, HashMap<Address, SignedChoke>>,
qcs: HashMap<u64, AggregatedChoke>,
}
impl ChokeCollector {
pub fn new() -> Self {
ChokeCollector {
chokes: BTreeMap::new(),
qcs: HashMap::new(),
}
}
pub fn insert(&mut self, round: u64, signed_choke: SignedChoke) {
self.chokes
.entry(round)
.or_insert_with(HashMap::new)
.insert(signed_choke.address.clone(), signed_choke);
}
pub fn set_qc(&mut self, round: u64, qc: AggregatedChoke) {
self.qcs.insert(round, qc);
}
pub fn get_chokes(&self, round: u64) -> Option<Vec<SignedChoke>> {
self.chokes
.get(&round)
.map(|map| map.values().cloned().collect::<Vec<_>>())
}
pub fn get_qc(&self, round: u64) -> Option<AggregatedChoke> {
self.qcs.get(&round).cloned()
}
pub fn max_round_above_threshold(&self, nodes_num: usize) -> Option<u64> {
for (round, set) in self.chokes.iter().rev() {
if set.len() * 3 > nodes_num * 2 {
return Some(*round);
}
}
None
}
pub fn print_round_choke_log(&self, round: u64) {
if let Some(map) = self.chokes.get(&round) {
let voters = map.keys().map(hex_encode).collect::<Vec<_>>();
log::info!(
"Overlord: {} chokes in round {}, voters {:?}",
map.len(),
round,
voters
);
} else {
log::info!("Overlord: no choke in round {}", round);
}
}
pub fn clear(&mut self) {
self.chokes.clear();
self.qcs.clear();
}
}
#[cfg(test)]
mod test {
use std::collections::{HashMap, HashSet};
use std::error::Error;
use bincode::{deserialize, serialize};
use bytes::Bytes;
use creep::Context;
use rand::random;
use serde::{Deserialize, Serialize};
use crate::state::collection::{ProposalCollector, VoteCollector};
use crate::types::{
Address, AggregatedSignature, AggregatedVote, Hash, Proposal, Signature, SignedProposal,
SignedVote, Vote, VoteType,
};
use crate::Codec;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
struct Pill {
height: u64,
epoch: Vec<u64>,
}
impl Codec for Pill {
fn encode(&self) -> Result<Bytes, Box<dyn Error + Send>> {
let encode: Vec<u8> = serialize(&self).expect("Serialize Pill error");
Ok(Bytes::from(encode))
}
fn decode(data: Bytes) -> Result<Self, Box<dyn Error + Send>> {
let decode: Pill = deserialize(data.as_ref()).expect("Deserialize Pill error.");
Ok(decode)
}
}
impl Pill {
fn new() -> Self {
let height = random::<u64>();
let epoch = (0..128).map(|_| random::<u64>()).collect::<Vec<_>>();
Pill { height, epoch }
}
}
fn gen_hash() -> Hash {
Hash::from((0..16).map(|_| random::<u8>()).collect::<Vec<_>>())
}
fn gen_address() -> Address {
Address::from((0..32).map(|_| random::<u8>()).collect::<Vec<_>>())
}
fn gen_signature() -> Signature {
Signature::from((0..64).map(|_| random::<u8>()).collect::<Vec<_>>())
}
fn _gen_aggr_signature() -> AggregatedSignature {
AggregatedSignature {
signature: gen_signature(),
address_bitmap: Bytes::from((0..8).map(|_| random::<u8>()).collect::<Vec<_>>()),
}
}
fn gen_signed_proposal(height: u64, round: u64) -> SignedProposal<Pill> {
let signature = gen_signature();
let proposal = Proposal {
height,
round,
content: Pill::new(),
block_hash: gen_hash(),
lock: None,
proposer: gen_address(),
};
SignedProposal {
signature,
proposal,
}
}
fn gen_signed_vote(
height: u64,
round: u64,
vote_type: VoteType,
hash: Hash,
addr: Address,
) -> SignedVote {
let vote = Vote {
height,
round,
vote_type,
block_hash: hash,
};
SignedVote {
signature: gen_signature(),
voter: addr,
vote,
}
}
fn _gen_aggregated_vote(height: u64, round: u64, vote_type: VoteType) -> AggregatedVote {
let signature = _gen_aggr_signature();
AggregatedVote {
signature,
height,
round,
vote_type,
block_hash: gen_hash(),
leader: gen_address(),
}
}
#[test]
fn test_proposal_collector() {
let mut proposals = ProposalCollector::<Pill>::new();
let proposal_01 = gen_signed_proposal(1, 0);
let proposal_02 = gen_signed_proposal(1, 0);
assert!(proposals
.insert(Context::new(), 1, 0, proposal_01.clone())
.is_ok());
assert!(proposals.insert(Context::new(), 1, 0, proposal_02).is_err());
assert_eq!(proposals.get(1, 0).unwrap().0, proposal_01);
let proposal_03 = gen_signed_proposal(2, 0);
let proposal_04 = gen_signed_proposal(3, 0);
assert!(proposals
.insert(Context::new(), 2, 0, proposal_03.clone())
.is_ok());
assert!(proposals
.insert(Context::new(), 3, 0, proposal_04.clone())
.is_ok());
proposals.flush(2);
assert!(proposals.get(1, 0).is_err());
assert_eq!(proposals.get(2, 0).unwrap().0, proposal_03);
assert_eq!(proposals.get(3, 0).unwrap().0, proposal_04);
assert!(proposals.get_height_proposals(1).is_none());
assert_eq!(
proposals
.get_height_proposals(2)
.unwrap()
.into_iter()
.map(|item| item.0)
.collect::<Vec<_>>(),
vec![proposal_03]
);
assert!(proposals.get(2, 0).is_err());
}
#[test]
fn test_vote_collector() {
let mut votes = VoteCollector::new();
let mut map = HashMap::new();
let mut vec = Vec::new();
let mut set = HashSet::new();
let hash_01 = gen_hash();
let hash_02 = gen_hash();
let addr_01 = gen_address();
let addr_02 = gen_address();
let signed_vote_01 =
gen_signed_vote(1, 0, VoteType::Prevote, hash_01.clone(), addr_01.clone());
let signed_vote_02 =
gen_signed_vote(1, 0, VoteType::Prevote, hash_01.clone(), addr_02.clone());
votes.insert_vote(
Context::new(),
hash_01.clone(),
signed_vote_01.clone(),
addr_01.clone(),
);
set.insert(addr_01);
map.insert(hash_01.clone(), set);
vec.push(signed_vote_01);
assert_eq!(votes.get_vote_map(1, 0, VoteType::Prevote), Ok(&map));
assert_eq!(
votes
.get_votes(1, 0, VoteType::Prevote, &hash_01)
.unwrap()
.iter()
.map(|item| item.0.clone())
.clone()
.collect::<Vec<_>>(),
vec
);
assert!(votes.get_vote_map(1, 0, VoteType::Precommit).is_err());
assert!(votes
.get_votes(1, 0, VoteType::Precommit, &hash_01)
.is_err());
assert!(votes.get_vote_map(1, 1, VoteType::Prevote).is_err());
assert!(votes.get_votes(1, 1, VoteType::Prevote, &hash_01).is_err());
assert!(votes.get_votes(1, 0, VoteType::Prevote, &hash_02).is_err());
votes.insert_vote(
Context::new(),
hash_01.clone(),
signed_vote_02.clone(),
addr_02.clone(),
);
map.get_mut(&hash_01).unwrap().insert(addr_02);
vec.push(signed_vote_02);
assert_eq!(votes.get_vote_map(1, 0, VoteType::Prevote), Ok(&map));
let res = votes
.get_votes(1, 0, VoteType::Prevote, &hash_01)
.unwrap()
.iter()
.map(|item| item.0.clone())
.collect::<HashSet<_>>();
assert_eq!(res, vec.iter().cloned().collect::<HashSet<_>>());
}
}