#[cfg(all(test, feature = "mock"))]
use crate::dev_utils::ParsedContents;
#[cfg(all(test, feature = "malice-detection", feature = "mock"))]
use crate::gossip::EventHash;
#[cfg(all(test, any(feature = "testing", feature = "mock")))]
use crate::gossip::GraphSnapshot;
#[cfg(feature = "malice-detection")]
use crate::observation::Malice;
use crate::{
block::{Block, BlockGroup},
dump_graph,
error::{Error, Result},
gossip::{
Event, EventContextRef, EventIndex, Graph, IndexedEventRef, PackedEvent, Request, Response,
},
id::{PublicId, SecretId},
key_gen::{
dkg_threshold, message::DkgMessage, parsec_rng::ParsecRng, Ack, AckOutcome, KeyGen, Part,
PartOutcome,
},
meta_voting::{MetaElection, MetaEvent, MetaEventBuilder, MetaVote, Observer},
network_event::NetworkEvent,
observation::{
is_more_than_two_thirds, ConsensusMode, Observation, ObservationHash, ObservationKey,
ObservationStore,
},
parsec_helpers::find_interesting_content_for_event,
peer_list::{Peer, PeerIndex, PeerIndexMap, PeerIndexSet, PeerList, PeerListChange, PeerState},
};
#[cfg(any(feature = "testing", all(test, feature = "mock")))]
use crate::{
hash::Hash,
mock::{PeerId, Transaction},
};
use itertools::Itertools;
use rand::RngCore;
#[cfg(any(test, feature = "testing"))]
use std::ops::{Deref, DerefMut};
use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
iter,
marker::PhantomData,
mem,
num::NonZeroUsize,
usize,
};
pub(crate) type KeyGenId = usize;
pub struct Parsec<T: NetworkEvent, S: SecretId> {
peer_list: PeerList<S>,
key_gen: BTreeMap<KeyGenId, KeyGen<S>>,
key_gen_next_id: KeyGenId,
graph: Graph<S::PublicId>,
observations: ObservationStore<T, S::PublicId>,
consensused_blocks: VecDeque<BlockGroup<T, S::PublicId>>,
meta_election: MetaElection,
consensus_mode: ConsensusMode,
pending_dkg_msgs: Vec<DkgMessage>,
#[cfg(feature = "malice-detection")]
pending_accusations: Accusations<T, S::PublicId>,
pending_events: Vec<PendingEvent<T, S::PublicId>>,
#[cfg(any(test, feature = "testing"))]
ignore_process_events: bool,
secure_rng: ParsecRng,
}
impl<T: NetworkEvent, S: SecretId> Parsec<T, S> {
pub fn from_genesis(
our_id: S,
genesis_group: &BTreeSet<S::PublicId>,
genesis_related_info: Vec<u8>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
if !genesis_group.contains(our_id.public_id()) {
log_or_panic!("Genesis group must contain us");
}
let mut peer_list = PeerList::new(our_id);
let genesis_indices: PeerIndexSet = genesis_group
.iter()
.map(|peer_id| {
if peer_id == peer_list.our_pub_id() {
let peer_index = PeerIndex::OUR;
peer_list.change_peer_state(peer_index, PeerState::active());
peer_index
} else {
peer_list.add_peer(peer_id.clone(), PeerState::active())
}
})
.collect();
let mut parsec = Self::empty(peer_list, genesis_indices, consensus_mode, secure_rng);
parsec.add_initial_event();
let genesis_observation = Observation::Genesis {
group: genesis_group.clone(),
related_info: genesis_related_info,
};
let event = parsec.our_last_event_index().and_then(|self_parent| {
parsec.new_event_from_observation(self_parent, genesis_observation)
});
if let Err(error) = event.and_then(|event| parsec.add_event(event)) {
log_or_panic!(
"{:?} initialising Parsec failed when adding the genesis observation: {:?}",
parsec.our_pub_id(),
error,
);
}
parsec
}
pub fn from_existing(
our_id: S,
genesis_group: &BTreeSet<S::PublicId>,
section: &BTreeSet<S::PublicId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
if genesis_group.is_empty() {
log_or_panic!("Genesis group can't be empty");
}
if genesis_group.contains(our_id.public_id()) {
log_or_panic!("Genesis group can't already contain us");
}
if section.is_empty() {
log_or_panic!("Section can't be empty");
}
if section.contains(our_id.public_id()) {
log_or_panic!("Section can't already contain us");
}
let mut peer_list = PeerList::new(our_id);
peer_list.change_peer_state(PeerIndex::OUR, PeerState::RECV);
let genesis_indices: PeerIndexSet = genesis_group
.iter()
.map(|peer_id| peer_list.add_peer(peer_id.clone(), PeerState::VOTE | PeerState::SEND))
.collect();
for peer_id in section {
if peer_list.contains(peer_id) {
continue;
}
let _ = peer_list.add_peer(peer_id.clone(), PeerState::SEND);
}
Self::empty(peer_list, genesis_indices, consensus_mode, secure_rng)
}
fn empty(
peer_list: PeerList<S>,
genesis_group: PeerIndexSet,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
dump_graph::init();
Self {
peer_list,
key_gen: BTreeMap::new(),
key_gen_next_id: KeyGenId::default(),
graph: Graph::new(),
consensused_blocks: VecDeque::new(),
observations: BTreeMap::new(),
meta_election: MetaElection::new(genesis_group),
consensus_mode,
pending_dkg_msgs: vec![],
#[cfg(feature = "malice-detection")]
pending_accusations: vec![],
pending_events: vec![],
#[cfg(any(test, feature = "testing"))]
ignore_process_events: false,
secure_rng: ParsecRng::new(secure_rng),
}
}
pub fn our_pub_id(&self) -> &S::PublicId {
self.peer_list.our_pub_id()
}
pub fn vote_for(&mut self, observation: Observation<T, S::PublicId>) -> Result<()> {
debug!("{:?} voting for {:?}", self.our_pub_id(), observation);
self.confirm_self_state(PeerState::VOTE)?;
if self.have_voted_for(&observation) {
return Err(Error::DuplicateVote);
}
self.flush_pending_events()?;
let self_parent = self.our_last_event_index()?;
let event = self.new_event_from_observation(self_parent, observation)?;
let _ = self.add_event(event)?;
self.create_sole_voter_gossip_event()
}
pub fn gossip_recipients(&self) -> impl Iterator<Item = &S::PublicId> {
self.peer_list
.gossip_recipients()
.map(|(_, peer)| peer.id())
}
pub fn create_gossip(&mut self, peer_id: &S::PublicId) -> Result<Request<T, S::PublicId>> {
let peer_index = self.get_peer_index(peer_id)?;
self.confirm_allowed_to_gossip_to(peer_index)?;
self.add_requesting_event(peer_id)?;
let events = if self.peer_list.last_event(peer_index).is_some() {
self.events_to_gossip_to_peer(peer_index)?
} else {
self.graph.iter().map(|e| e.inner()).collect()
};
self.pack_events(events).map(Request::new)
}
fn create_sole_voter_gossip_event(&mut self) -> Result<()> {
if !iter::once(PeerIndex::OUR).eq(self.voters()) {
return Ok(());
}
let self_id = self.our_pub_id().clone();
self.add_requesting_event(&self_id)
}
fn add_requesting_event(&mut self, peer_id: &S::PublicId) -> Result<()> {
debug!(
"{:?} creating gossip request for {:?}",
self.our_pub_id(),
peer_id
);
let self_parent = self.peer_list.last_event(PeerIndex::OUR).ok_or_else(|| {
log_or_panic!("{:?} missing our own last event hash.", self.our_pub_id());
Error::Logic
})?;
let sync_event = Event::new_from_requesting(self_parent, peer_id, self.event_context())?;
let _ = self.add_event(sync_event)?;
Ok(())
}
pub fn handle_request(
&mut self,
src: &S::PublicId,
req: Request<T, S::PublicId>,
) -> Result<Response<T, S::PublicId>> {
debug!(
"{:?} received gossip request from {:?}",
self.our_pub_id(),
src
);
let src_index = self.get_peer_index(src)?;
let other_parent = self.unpack_and_add_events(src_index, req.packed_events)?;
self.create_dkg_events()?;
#[cfg(feature = "malice-detection")]
self.create_accusation_events(other_parent)?;
self.create_sync_event(true, other_parent)?;
self.flush_pending_events()?;
let events = self.events_to_gossip_to_peer(src_index)?;
self.pack_events(events).map(Response::new)
}
pub fn handle_response(
&mut self,
src: &S::PublicId,
resp: Response<T, S::PublicId>,
) -> Result<()> {
debug!(
"{:?} received gossip response from {:?}",
self.our_pub_id(),
src
);
let src_index = self.get_peer_index(src)?;
let other_parent = self.unpack_and_add_events(src_index, resp.packed_events)?;
self.create_dkg_events()?;
#[cfg(feature = "malice-detection")]
self.create_accusation_events(other_parent)?;
self.create_sync_event(false, other_parent)?;
self.flush_pending_events()
}
pub fn poll(&mut self) -> Option<Block<T, S::PublicId>> {
let mut block_group = self.batch_poll()?;
let block = block_group.pop_front()?;
if !block_group.is_empty() {
self.consensused_blocks.push_front(block_group);
}
Some(block)
}
pub(crate) fn batch_poll(&mut self) -> Option<BlockGroup<T, S::PublicId>> {
self.consensused_blocks.pop_front()
}
pub fn can_vote(&self) -> bool {
self.peer_list.our_state().can_vote()
}
pub fn have_voted_for(&self, observation: &Observation<T, S::PublicId>) -> bool {
let hash = ObservationHash::from(observation);
let key = ObservationKey::new(hash, PeerIndex::OUR, self.consensus_mode.of(observation));
self.observations
.get(&key)
.map(|info| info.created_by_us)
.unwrap_or(false)
}
pub fn has_unpolled_observations(&self) -> bool {
self.observations.values().any(|info| !info.consensused)
|| !self.consensused_blocks.is_empty()
|| !self.key_gen.is_empty()
}
pub fn our_unpolled_observations(&self) -> impl Iterator<Item = &Observation<T, S::PublicId>> {
self.our_consensused_observations()
.chain(self.our_unconsensused_observations())
}
fn our_consensused_observations(&self) -> impl Iterator<Item = &Observation<T, S::PublicId>> {
self.observations.values().filter_map(move |info| {
if info.created_by_us
&& info.consensused
&& self.has_our_unpolled_blocks(&info.observation)
{
Some(&info.observation)
} else {
None
}
})
}
fn our_unconsensused_observations(&self) -> impl Iterator<Item = &Observation<T, S::PublicId>> {
self.observations.values().filter_map(|info| {
if info.created_by_us && !info.consensused {
Some(&info.observation)
} else {
None
}
})
}
fn has_our_unpolled_blocks(&self, payload: &Observation<T, S::PublicId>) -> bool {
let mut matching_blocks = self
.consensused_blocks
.iter()
.flatten()
.filter(|block| block.payload() == payload);
match self.consensus_mode.of(payload) {
ConsensusMode::Supermajority => matching_blocks.next().is_some(),
ConsensusMode::Single => {
matching_blocks.any(|block| block.is_signed_by(self.our_pub_id()))
}
}
}
fn get_known_event(&self, event_index: EventIndex) -> Result<IndexedEventRef<S::PublicId>> {
get_known_event(self.our_pub_id(), &self.graph, event_index)
}
fn confirm_allowed_to_gossip_to(&self, peer_index: PeerIndex) -> Result<()> {
self.confirm_self_state(PeerState::SEND)?;
self.confirm_peer_state(peer_index, PeerState::DKG | PeerState::RECV)
}
fn confirm_peer_state(&self, peer_index: PeerIndex, required: PeerState) -> Result<()> {
let actual = self.peer_list.peer_state(peer_index);
if actual.contains(required) {
Ok(())
} else {
trace!(
"{:?} detected invalid state of {:?} (required: {:?}, actual: {:?})",
self.our_pub_id(),
peer_index,
required,
actual,
);
Err(Error::InvalidPeerState { required, actual })
}
}
fn confirm_self_state(&self, required: PeerState) -> Result<()> {
let actual = self.peer_list.our_state();
if actual.contains(required) {
Ok(())
} else {
trace!(
"{:?} has invalid state (required: {:?}, actual: {:?})",
self.our_pub_id(),
required,
actual,
);
Err(Error::InvalidSelfState { required, actual })
}
}
fn confirm_can_add_event(&self, event: &Event<S::PublicId>) -> Result<()> {
let peer = self
.peer_list
.get(event.creator())
.ok_or(Error::UnknownPeer)?;
if event.creator() == PeerIndex::OUR || peer.state().can_send() {
return Ok(());
}
if let Some(removal_event) = peer
.removal_event()
.and_then(|event_index| self.graph.get(event_index))
{
if !event.is_descendant_of(removal_event) {
return Ok(());
}
}
Err(Error::InvalidPeerState {
required: PeerState::SEND,
actual: peer.state(),
})
}
fn get_peer_index(&self, peer_id: &S::PublicId) -> Result<PeerIndex> {
self.peer_list.get_index(peer_id).ok_or(Error::UnknownPeer)
}
fn our_last_event_index(&self) -> Result<EventIndex> {
self.peer_list.last_event(PeerIndex::OUR).ok_or_else(|| {
log_or_panic!(
"{:?} has no last event.\n{:?}\n",
self.our_pub_id(),
self.peer_list
);
Error::Logic
})
}
fn pack_events<'a, I>(&self, events: I) -> Result<Vec<PackedEvent<T, S::PublicId>>>
where
I: IntoIterator<Item = &'a Event<S::PublicId>>,
S::PublicId: 'a,
{
events
.into_iter()
.map(|event| event.pack(self.event_context()))
.collect()
}
fn unpack_and_add_events(
&mut self,
src_index: PeerIndex,
packed_events: Vec<PackedEvent<T, S::PublicId>>,
) -> Result<EventIndex> {
self.confirm_self_state(PeerState::RECV)?;
self.confirm_peer_state(src_index, PeerState::SEND)?;
let hash_of_last_event = packed_events
.last()
.map(PackedEvent::compute_hash)
.ok_or_else(|| Error::InvalidMessage)?;
for packed_event in packed_events {
if let Some(event) = self.unpack(packed_event)? {
let event_creator = event.creator();
let event_index = self.add_event(event)?;
self.peer_list
.change_peer_state(event_creator, PeerState::RECV);
self.peer_list
.record_gossiped_event_by(src_index, event_index);
#[cfg(feature = "malice-detection")]
self.detect_accomplice(event_index)?;
}
}
#[cfg(feature = "malice-detection")]
self.detect_premature_gossip()?;
let last_event_index = self
.graph
.get_index(&hash_of_last_event)
.ok_or_else(|| Error::InvalidMessage)?;
Ok(last_event_index)
}
fn unpack(
&mut self,
packed_event: PackedEvent<T, S::PublicId>,
) -> Result<Option<Event<S::PublicId>>> {
if let Some(unpacked_event) = Event::unpack(packed_event, self.event_context())? {
if let Some((payload_key, observation_info)) = unpacked_event.observation_for_store {
let _ = self
.observations
.entry(payload_key)
.or_insert_with(|| observation_info);
}
Ok(Some(unpacked_event.event))
} else {
Ok(None)
}
}
fn new_event_from_observation(
&mut self,
self_parent: EventIndex,
observation: Observation<T, S::PublicId>,
) -> Result<Event<S::PublicId>> {
let (event, observation_for_store) =
Event::new_from_observation(self_parent, observation, self.event_context())?;
if let Some((payload_key, observation_info)) = observation_for_store {
let _ = self
.observations
.entry(payload_key)
.or_insert_with(|| observation_info);
}
Ok(event)
}
fn add_event(&mut self, event: Event<S::PublicId>) -> Result<EventIndex> {
let our = event.creator() == PeerIndex::OUR;
if !our {
#[cfg(feature = "malice-detection")]
self.detect_malice(&event)?;
}
self.confirm_can_add_event(&event)?;
if our && event.is_initial() {
log_or_panic!(
"{:?} attempted to add initial event with add_event. It must be added with add_initial_event instead.",
self.our_pub_id(),
);
return Err(Error::InvalidEvent);
}
let unconsensused_payload_key = event
.payload_key()
.and_then(|key| self.observations.get_mut(key).map(|info| (key, info)))
.and_then(|(key, info)| {
if our {
info.created_by_us = true;
}
if info.consensused {
None
} else {
Some(*key)
}
});
let event_index = self.insert_event(event);
let _ = unconsensused_payload_key.map(|payload_key| {
self.meta_election
.add_unconsensused_event(event_index, payload_key);
});
#[cfg(any(test, feature = "testing"))]
let ignore_process_events = self.ignore_process_events;
#[cfg(not(any(test, feature = "testing")))]
let ignore_process_events = false;
if !ignore_process_events {
self.process_events(event_index.topological_index())?;
}
Ok(event_index)
}
fn add_initial_event(&mut self) {
let event = Event::new_initial(self.event_context());
let _ = self.insert_event(event);
}
fn insert_event(&mut self, event: Event<S::PublicId>) -> EventIndex {
let event = self.graph.insert(event);
self.peer_list.add_event(event);
event.event_index()
}
fn process_events(&mut self, mut start_index: usize) -> Result<()> {
'outer: loop {
for event_index in self.graph.indices_from(start_index) {
match self.process_event(event_index)? {
PostProcessAction::Restart(new_start_index)
if new_start_index <= event_index.topological_index() =>
{
start_index = new_start_index;
continue 'outer;
}
PostProcessAction::Restart(_) | PostProcessAction::Continue => (),
}
}
break;
}
Ok(())
}
fn process_event(&mut self, event_index: EventIndex) -> Result<PostProcessAction> {
if self.peer_list.our_state() == PeerState::inactive() {
return Ok(PostProcessAction::Continue);
}
self.create_needed_meta_event(event_index)?;
let payload_keys = self.compute_consensus(event_index);
if payload_keys.is_empty() {
return Ok(PostProcessAction::Continue);
}
self.output_consensus_info(&payload_keys);
let blocks = self.create_blocks(&payload_keys)?;
if !blocks.is_empty() {
self.consensused_blocks.push_back(blocks);
}
self.mark_observations_as_consensused(&payload_keys);
let peer_list_changes = payload_keys
.iter()
.filter_map(|payload_key| self.handle_consensus(event_index, payload_key))
.collect();
self.meta_election
.new_election(&self.graph, payload_keys, peer_list_changes);
let start_index = self.meta_election.continue_consensus_start_index();
Ok(PostProcessAction::Restart(start_index))
}
fn output_consensus_info(&self, payload_keys: &[ObservationKey]) {
dump_graph::to_file(dump_graph::ToFileInfo {
owner_id: self.our_pub_id(),
consensus_mode: self.consensus_mode,
gossip_graph: &self.graph,
meta_election: &self.meta_election,
peer_list: &self.peer_list,
observations: &self.observations,
secure_rng: &self.secure_rng,
key_gens_and_next_id: (&self.key_gen, &self.key_gen_next_id),
info: &dump_graph::DumpGraphContext::ConsensusReached,
});
for (index, payload_key) in payload_keys.iter().enumerate() {
let payload = self
.observations
.get(payload_key)
.map(|info| &info.observation);
info!(
"{:?} got consensus on block {} with payload {:?} and payload hash {:?}",
self.our_pub_id(),
self.meta_election.consensus_history().len() + index,
payload,
payload_key.hash()
)
}
}
fn mark_observations_as_consensused(&mut self, payload_keys: &[ObservationKey]) {
for payload_key in payload_keys {
if let Some(info) = self.observations.get_mut(payload_key) {
info.consensused = true;
} else {
log_or_panic!(
"{:?} doesn't know about observation with hash {:?}",
self.peer_list.our_pub_id(),
payload_key.hash()
);
}
}
}
fn handle_consensus(
&mut self,
event_index: EventIndex,
payload_key: &ObservationKey,
) -> Option<PeerListChange> {
match self
.observations
.get(payload_key)
.map(|info| info.observation.clone())
{
Some(Observation::Add { ref peer_id, .. }) => self.handle_add_peer(peer_id).into(),
Some(Observation::Remove { ref peer_id, .. }) => {
self.handle_remove_peer(event_index, peer_id)
}
Some(Observation::Accusation {
ref offender,
ref malice,
}) => {
info!(
"{:?} removing {:?} due to consensus on accusation of malice {:?}",
self.our_pub_id(),
offender,
malice
);
self.handle_remove_peer(event_index, offender)
}
Some(Observation::StartDkg(peers)) => {
if self.handle_dkg_start_consensus(&peers).is_none() {
warn!("Not starting DKG on StartDkg consensus because of error");
}
None
}
Some(Observation::DkgResult { .. }) => {
log_or_panic!("Unexpected DkgResult consensus.");
None
}
Some(Observation::DkgMessage(msg)) => {
if self.handle_dkg_message(payload_key, msg.clone()).is_none() {
warn!(
"Ignoring DkgMessage with Error: key: {:?}, msg: {:?}",
payload_key, msg
);
}
None
}
Some(Observation::Genesis { .. }) | Some(Observation::OpaquePayload(_)) => None,
None => {
log_or_panic!("Failed to get observation from hash.");
None
}
}
}
fn handle_dkg_message(&mut self, payload_key: &ObservationKey, msg: DkgMessage) -> Option<()> {
let creator_id = self.peer_list.get(payload_key.peer_index()?)?.id().clone();
match msg {
DkgMessage::Part { key_gen_id, part } => {
self.handle_dkg_message_part(&creator_id, key_gen_id, part)
}
DkgMessage::Ack { key_gen_id, ack } => {
self.handle_dkg_message_ack(&creator_id, key_gen_id, ack)
}
}
}
fn handle_dkg_message_part(
&mut self,
creator_id: &S::PublicId,
key_gen_id: KeyGenId,
part: Part,
) -> Option<()> {
if let Some(key_gen) = &mut self.key_gen.get_mut(&key_gen_id) {
let part_result = key_gen
.handle_part(self.peer_list.our_id(), creator_id, part)
.map_err(|err| warn!("handle_dkg_message_part error: {:?}", err))
.ok()?;
match part_result {
PartOutcome::Valid(Some(ack)) => {
self.pending_dkg_msgs
.push(DkgMessage::Ack { key_gen_id, ack });
}
PartOutcome::Valid(None) => (),
PartOutcome::Invalid(fault) => {
warn!(
"An invalid Part was detected from {:?} with fault {:?}",
creator_id, fault
);
}
}
}
Some(())
}
fn handle_dkg_message_ack(
&mut self,
creator_id: &S::PublicId,
key_gen_id: KeyGenId,
ack: Ack,
) -> Option<()> {
if let Some(key_gen) = &mut self.key_gen.get_mut(&key_gen_id) {
let ack_result = key_gen
.handle_ack(&self.peer_list.our_id(), creator_id, ack)
.map_err(|err| warn!("handle_dkg_message_ack error: {:?}", err))
.ok()?;
match ack_result {
AckOutcome::Valid => {
if key_gen.is_ready() {
debug!(
"{:?}: key_gen for block number {} is ready.",
self.peer_list.our_pub_id(),
key_gen_id
);
let dkg_result = key_gen.generate().ok()?;
self.consensused_blocks.push_back(BlockGroup(
iter::once(Block::new_dkg_block(dkg_result)).collect(),
));
let _ = self.key_gen.remove(&key_gen_id);
}
}
AckOutcome::Invalid(fault) => {
warn!(
"An invalid Ack was detected from {:?} with fault {:?}",
creator_id, fault
);
}
}
}
Some(())
}
pub fn add_force_gossip_peer(&mut self, peer_id: &S::PublicId) {
debug!(
"{:?}: adding a new gossip_peer {:?} by force.",
self.peer_list.our_pub_id(),
peer_id
);
let state = PeerState::DKG | PeerState::SEND | PeerState::RECV;
let _ = self.add_gossip_peer(peer_id, state);
}
fn handle_dkg_start_consensus(&mut self, peers: &BTreeSet<S::PublicId>) -> Option<()> {
let state = if self.new_peer_can_recv(self.our_pub_id()) {
PeerState::DKG | PeerState::SEND | PeerState::RECV
} else {
PeerState::DKG | PeerState::SEND
};
for peer_id in peers {
if !self
.peer_list
.get_index(peer_id)
.and_then(|index| self.peer_list.get(index))
.map(|peer| peer.state().can_dkg())
.unwrap_or(false)
{
let _ = self.add_gossip_peer(peer_id, state);
}
}
let threshold = dkg_threshold(peers.len());
let (key_gen, part) = KeyGen::new(
self.peer_list.our_id(),
peers.clone(),
threshold,
&mut self.secure_rng,
)
.map_err(|error| {
error!("Vote for new DKG Error: {}", error);
})
.ok()?;
let key_gen_id = self.key_gen_next_id;
self.key_gen_next_id += 1;
if let Some(part) = part {
self.pending_dkg_msgs
.push(DkgMessage::Part { key_gen_id, part });
}
let _ = self.key_gen.insert(key_gen_id, key_gen);
Some(())
}
fn handle_add_peer(&mut self, peer_id: &S::PublicId) -> PeerListChange {
let state = if self.new_peer_can_recv(peer_id) {
PeerState::VOTE | PeerState::SEND | PeerState::RECV
} else {
PeerState::VOTE | PeerState::SEND
};
let peer_index = self.add_gossip_peer(peer_id, state);
PeerListChange::Add(peer_index)
}
fn new_peer_can_recv(&self, peer_id: &S::PublicId) -> bool {
self.peer_list
.iter()
.filter(|(peer_index, peer)| {
peer.state().can_vote() &&
*peer_index != PeerIndex::OUR &&
peer.id() != peer_id
})
.all(|(_, peer)| {
peer.state().can_recv()
})
}
fn add_gossip_peer(&mut self, peer_id: &S::PublicId, state: PeerState) -> PeerIndex {
let peer_index = if let Some(peer_index) = self.peer_list.get_index(peer_id) {
self.peer_list.change_peer_state(peer_index, state);
peer_index
} else {
self.peer_list.add_peer(peer_id.clone(), state)
};
if peer_index == PeerIndex::OUR && self.peer_list.our_events().next().is_none() {
self.add_initial_event();
}
peer_index
}
fn handle_remove_peer(
&mut self,
event_index: EventIndex,
peer_id: &S::PublicId,
) -> Option<PeerListChange> {
self.peer_list.get_index(peer_id).map(|peer_index| {
self.peer_list.remove_peer(peer_index, event_index);
PeerListChange::Remove(peer_index)
})
}
fn create_needed_meta_event(&mut self, event_index: EventIndex) -> Result<()> {
let event = get_known_event(self.our_pub_id(), &self.graph, event_index)?;
if !event.is_sync_event() || !self.voters().contains(event.creator()) {
return Ok(());
}
trace!(
"{:?} creating a meta-event for event {:?}",
self.our_pub_id(),
event
);
let mut builder =
if let Some(meta_event) = self.meta_election.remove_meta_event(event_index) {
meta_event.rebuild(event)
} else {
MetaEvent::build(event)
};
self.set_interesting_content(&mut builder);
self.set_observer(&mut builder);
self.set_meta_votes(&mut builder)?;
self.meta_election.add_meta_event(builder);
Ok(())
}
fn set_interesting_content(&self, builder: &mut MetaEventBuilder<S::PublicId>) {
if !builder.is_new() {
return;
}
let peers_that_can_vote = self.voters();
let consistent_cmp = |lhs_key: &ObservationKey, rhs_key: &ObservationKey| {
lhs_key.consistent_cmp(rhs_key, &self.peer_list)
};
let is_descendant = |x: IndexedEventRef<_>, y| x.is_descendant_of(y);
let is_already_interesting_content = |payload_key: &ObservationKey| {
self.meta_election
.is_already_interesting_content(builder.event().creator(), payload_key)
};
let is_interesting_payload = |payload_key: &ObservationKey| {
self.is_interesting_payload(builder, &peers_that_can_vote, payload_key)
};
let payloads = find_interesting_content_for_event(
builder.event(),
self.unconsensused_events(None),
consistent_cmp,
is_descendant,
is_already_interesting_content,
is_interesting_payload,
);
builder.set_interesting_content(payloads);
}
fn is_interesting_payload(
&self,
builder: &MetaEventBuilder<S::PublicId>,
peers_that_can_vote: &PeerIndexSet,
payload_key: &ObservationKey,
) -> bool {
match payload_key.consensus_mode() {
ConsensusMode::Single => {
let num_ancestor_peers =
self.num_creators_of_ancestors(peers_that_can_vote, &*builder.event());
is_more_than_two_thirds(num_ancestor_peers, peers_that_can_vote.len())
&& self.has_ancestor_carrying_payload(builder.event(), payload_key)
}
ConsensusMode::Supermajority => {
let num_peers_that_did_vote = self.num_creators_of_ancestors_carrying_payload(
peers_that_can_vote,
builder.event(),
payload_key,
);
is_more_than_two_thirds(num_peers_that_did_vote, peers_that_can_vote.len())
}
}
}
fn num_creators_of_ancestors(
&self,
peers_that_can_vote: &PeerIndexSet,
event: &Event<S::PublicId>,
) -> usize {
event
.last_ancestors()
.filter(|(peer_index, _)| peers_that_can_vote.contains(*peer_index))
.count()
}
fn num_creators_of_ancestors_carrying_payload(
&self,
peers_that_can_vote: &PeerIndexSet,
event: IndexedEventRef<S::PublicId>,
payload_key: &ObservationKey,
) -> usize {
let unconsensused_events = self.unconsensused_events(Some(payload_key)).collect_vec();
peers_that_can_vote
.iter()
.filter(|peer_index| {
unconsensused_events.iter().any(|that_event| {
that_event.creator() == *peer_index && event.is_descendant_of(*that_event)
})
})
.count()
}
fn has_ancestor_carrying_payload(
&self,
event: IndexedEventRef<S::PublicId>,
payload_key: &ObservationKey,
) -> bool {
self.unconsensused_events(Some(payload_key))
.any(|that_event| event.is_descendant_of(that_event))
}
fn set_observer(&self, builder: &mut MetaEventBuilder<S::PublicId>) {
if self.is_descendant_of_observer(builder.event()) {
builder.set_observer(Observer::Ancestor);
return;
}
let voter_count = self.voter_count();
let observees: PeerIndexSet = self
.meta_election
.interesting_events()
.filter_map(|(peer_index, event_indices)| {
let event_index = event_indices.first()?;
let event = self.get_known_event(*event_index).ok()?;
if self.strongly_sees(builder.event(), event) {
Some(peer_index)
} else {
None
}
})
.collect();
if is_more_than_two_thirds(observees.len(), voter_count) {
builder.set_observer(Observer::This(observees));
} else {
builder.set_observer(Observer::None);
}
}
fn is_descendant_of_observer(&self, event: IndexedEventRef<S::PublicId>) -> bool {
self.graph
.self_sync_parent(event)
.and_then(|self_parent| self.meta_election.meta_event(self_parent.event_index()))
.map(|meta_parent| meta_parent.is_observer() || meta_parent.has_ancestor_observer())
.unwrap_or(false)
}
fn set_meta_votes(&self, builder: &mut MetaEventBuilder<S::PublicId>) -> Result<()> {
let parent_meta_votes = self
.graph
.self_sync_parent(builder.event())
.and_then(|parent| {
self.meta_election
.populated_meta_votes(parent.event_index())
});
if parent_meta_votes.is_none() && !builder.is_observer() {
return Ok(());
}
let voters = self.voters();
let voters_len = match NonZeroUsize::new(voters.len()) {
Some(num) => num,
None => {
log_or_panic!("{:?} has no voters", self.our_pub_id());
return Ok(());
}
};
let is_voter = voters.contains(builder.event().creator());
if !is_voter {
return Ok(());
}
let ancestors_meta_votes =
self.other_voting_ancestors_meta_votes(&voters, &builder.event());
if let Some(parent_meta_votes) = parent_meta_votes {
let temp_votes: PeerIndexMap<_> = parent_meta_votes
.into_iter()
.map(|(peer_index, parent_votes)| {
let other_votes = Self::peer_meta_votes(&ancestors_meta_votes, peer_index);
let temp_votes = MetaVote::next_temp(parent_votes, &other_votes, voters_len);
(peer_index, temp_votes)
})
.collect();
for (peer_index, temp_votes) in &temp_votes {
let coin_tosses = self.toss_coins(&voters, peer_index, temp_votes)?;
let final_meta_votes = MetaVote::next_final(temp_votes, &coin_tosses, voters_len);
builder.add_meta_votes(peer_index, final_meta_votes);
}
} else {
for peer_index in voters {
let new_meta_votes = {
let other_votes = Self::peer_meta_votes(&ancestors_meta_votes, peer_index);
let initial_estimate = builder.has_observee(peer_index);
MetaVote::new_for_observer(initial_estimate, &other_votes, voters_len)
};
builder.add_meta_votes(peer_index, new_meta_votes);
}
}
trace!(
"{:?} has set the meta votes for {:?}",
self.our_pub_id(),
*builder.event(),
);
Ok(())
}
fn toss_coins(
&self,
voters: &PeerIndexSet,
peer_index: PeerIndex,
temp_votes: &[MetaVote],
) -> Result<BTreeMap<usize, bool>> {
let mut coin_tosses = BTreeMap::new();
for temp_vote in temp_votes {
if let Some(coin) = self.toss_coin(voters, peer_index, temp_vote)? {
let _ = coin_tosses.insert(temp_vote.round, coin);
}
}
Ok(coin_tosses)
}
fn toss_coin(
&self,
_voters: &PeerIndexSet,
_peer_index: PeerIndex,
temp_vote: &MetaVote,
) -> Result<Option<bool>> {
Ok(Some(temp_vote.round % 2 != 0))
}
fn other_voting_ancestors_meta_votes(
&self,
voters: &PeerIndexSet,
event: &Event<S::PublicId>,
) -> Vec<&PeerIndexMap<Vec<MetaVote>>> {
voters
.iter()
.filter(|voter_index| *voter_index != event.creator())
.filter_map(|creator| {
event
.non_fork_last_ancestor_by(creator)
.and_then(|index_by_creator| {
let event_index = self
.peer_list
.events_by_index(creator, index_by_creator)
.next()?;
self.meta_election.populated_meta_votes(event_index)
})
})
.collect()
}
fn peer_meta_votes<'a>(
meta_votes_maps: &'a [&PeerIndexMap<Vec<MetaVote>>],
peer_index: PeerIndex,
) -> Vec<&'a [MetaVote]> {
meta_votes_maps
.iter()
.filter_map(|meta_votes| meta_votes.get(peer_index))
.map(Vec::as_slice)
.collect()
}
fn voters(&self) -> &PeerIndexSet {
self.meta_election.voters()
}
fn voter_count(&self) -> usize {
self.meta_election.voters().len()
}
fn unconsensused_events(
&self,
filter_key: Option<&ObservationKey>,
) -> impl Iterator<Item = IndexedEventRef<S::PublicId>> {
self.meta_election
.unconsensused_events(filter_key)
.filter_map(move |index| self.get_known_event(index).ok())
}
fn compute_consensus(&self, event_index: EventIndex) -> Vec<ObservationKey> {
let event = if let Ok(event) = self.get_known_event(event_index) {
event
} else {
return vec![];
};
if iter::once(event.creator()).eq(self.voters()) {
return self.compute_payloads_for_consensus(iter::once((event.creator(), true)));
}
let last_meta_votes = match self.meta_election.populated_meta_votes(event_index) {
Some(meta_votes) => meta_votes,
None => return Vec::new(),
};
let decided_meta_votes = last_meta_votes
.iter()
.filter_map(|(peer_index, event_votes)| {
event_votes
.last()
.and_then(MetaVote::decision)
.map(|v| (peer_index, v))
});
if decided_meta_votes.clone().count() < self.voter_count() {
return Vec::new();
}
self.compute_payloads_for_consensus(decided_meta_votes)
}
fn compute_payloads_for_consensus<I>(&self, decided_meta_votes: I) -> Vec<ObservationKey>
where
I: IntoIterator<Item = (PeerIndex, bool)>,
{
let payloads = decided_meta_votes
.into_iter()
.flat_map(|(peer_index, decision)| {
if decision {
match self.meta_election.interesting_content_by(peer_index) {
Some(content) => content.iter().enumerate().collect_vec(),
None => Vec::new(),
}
} else {
Vec::new()
}
})
.fold(BTreeMap::new(), |mut map, (idx, payload_key)| {
let (count, min_index) = map.entry(*payload_key).or_insert((0, idx));
*count += 1;
*min_index = std::cmp::min(*min_index, idx);
map
});
payloads
.into_iter()
.sorted_by(
|(lhs_key, (lhs_count, lhs_min_index)), (rhs_key, (rhs_count, rhs_min_index))| {
lhs_min_index
.cmp(rhs_min_index)
.then_with(|| lhs_count.cmp(rhs_count).reverse())
.then_with(|| lhs_key.consistent_cmp(rhs_key, &self.peer_list))
},
)
.map(|(key, _)| key)
.collect()
}
fn create_blocks(&self, payload_keys: &[ObservationKey]) -> Result<BlockGroup<T, S::PublicId>> {
let voters = self.voters();
let blocks: Result<VecDeque<_>> = payload_keys
.iter()
.map(|payload_key| {
let votes = self
.unconsensused_events(Some(payload_key))
.map(|event| event.inner())
.filter(|event| voters.contains(event.creator()))
.filter_map(|event| {
let (vote, key) = event.vote_and_payload_key(&self.observations)?;
let creator_id = self.peer_list.get(event.creator()).map(Peer::id)?;
Some((key, vote, creator_id))
})
.map(|(_, vote, creator_id)| (creator_id.clone(), vote))
.collect();
Block::new(&votes)
})
.filter(|block| match block {
Err(Error::MissingVotes) => false,
Err(_) => true,
Ok(block) => {
!block.payload().is_internal()
}
})
.collect();
Ok(BlockGroup(blocks?))
}
fn num_peers_created_events_seen_by_x_that_can_see_y(
&self,
x: &Event<S::PublicId>,
y: &Event<S::PublicId>,
) -> usize {
x.last_ancestors()
.filter(|(peer_index, event_index)| {
for event_idx in self.peer_list.events_by_index(*peer_index, *event_index) {
if let Ok(event) = self.get_known_event(event_idx) {
if x.sees(event) && event.sees(y) {
return true;
}
}
}
false
})
.count()
}
fn strongly_sees<A, B>(&self, x: A, y: B) -> bool
where
A: AsRef<Event<S::PublicId>>,
B: AsRef<Event<S::PublicId>>,
{
is_more_than_two_thirds(
self.num_peers_created_events_seen_by_x_that_can_see_y(x.as_ref(), y.as_ref()),
self.voter_count(),
)
}
fn create_sync_event(&mut self, is_request: bool, other_parent: EventIndex) -> Result<()> {
self.process_or_queue_pending_event(PendingEvent::Sync {
is_request,
other_parent,
_phantom: PhantomData,
})
}
fn add_sync_event(&mut self, is_request: bool, other_parent: EventIndex) -> Result<()> {
let self_parent = self.our_last_event_index()?;
let event = if is_request {
Event::new_from_request(self_parent, other_parent, self.event_context())?
} else {
Event::new_from_response(self_parent, other_parent, self.event_context())?
};
#[cfg(feature = "malice-detection")]
{
if !self.graph.is_valid_sync_event(&event).unwrap_or(false) {
return Err(Error::InvalidMessage);
}
}
let _ = self.add_event(event)?;
Ok(())
}
fn create_dkg_events(&mut self) -> Result<()> {
for msg in mem::replace(&mut self.pending_dkg_msgs, vec![]) {
self.create_dkg_event(msg)?;
}
Ok(())
}
fn create_dkg_event(&mut self, msg: DkgMessage) -> Result<()> {
self.process_or_queue_pending_event(PendingEvent::DkgMessage { msg })
}
fn add_dkg_event(&mut self, msg: DkgMessage) -> Result<()> {
let event = self.new_event_from_observation(
self.our_last_event_index()?,
Observation::DkgMessage(msg),
)?;
let _ = self.add_event(event)?;
Ok(())
}
fn events_to_gossip_to_peer(&self, peer_index: PeerIndex) -> Result<Vec<&Event<S::PublicId>>> {
let last_event = if let Some(event_index) = self.peer_list.last_event(peer_index) {
self.get_known_event(event_index)?
} else {
log_or_panic!("{:?} doesn't have peer {:?}", self.our_pub_id(), peer_index);
return Err(Error::Logic);
};
let last_ancestors: PeerIndexMap<_> = last_event.last_ancestors().collect();
let mut event_indices: Vec<_> = self
.peer_list
.iter()
.flat_map(|(peer_index, peer)| {
let first = last_ancestors
.get(peer_index)
.map(|index_by_creator| index_by_creator + 1)
.unwrap_or(0);
peer.events_from(first)
})
.collect();
for (peer_index, &last_index_by_creator) in &last_ancestors {
let peer = if let Some(peer) = self
.peer_list
.get(peer_index)
.filter(|peer| peer.has_fork())
{
peer
} else {
continue;
};
for index_by_creator in 0..=last_index_by_creator {
if !peer.has_fork_at(index_by_creator) {
continue;
}
for event_index in peer.events_by_index(index_by_creator) {
event_indices.push(event_index);
}
}
}
event_indices.sort();
Ok(event_indices
.into_iter()
.filter_map(|event_index| self.graph.get(event_index))
.map(|event| event.inner())
.collect())
}
fn flush_pending_events(&mut self) -> Result<()> {
if self.peer_list.our_events().next().is_none() {
return Ok(());
}
for event in mem::replace(&mut self.pending_events, vec![]) {
self.process_pending_event(event)?;
}
Ok(())
}
fn process_or_queue_pending_event(
&mut self,
event: PendingEvent<T, S::PublicId>,
) -> Result<()> {
if self.peer_list.last_event(PeerIndex::OUR).is_none() {
self.pending_events.push(event);
Ok(())
} else {
self.process_pending_event(event)
}
}
fn process_pending_event(&mut self, event: PendingEvent<T, S::PublicId>) -> Result<()> {
match event {
PendingEvent::Sync {
is_request,
other_parent,
..
} => self.add_sync_event(is_request, other_parent),
PendingEvent::DkgMessage { msg } => self.add_dkg_event(msg),
#[cfg(feature = "malice-detection")]
PendingEvent::Accusation {
offender,
malice,
other_parent,
} => self.add_accusation_event(offender, malice, other_parent),
}
}
fn event_context(&self) -> EventContextRef<T, S> {
EventContextRef {
graph: &self.graph,
peer_list: &self.peer_list,
observations: &self.observations,
consensus_mode: self.consensus_mode,
}
}
#[cfg(any(all(test, feature = "mock"), feature = "malice-detection"))]
fn event_payload<'a>(
&'a self,
event: &Event<S::PublicId>,
) -> Option<&'a Observation<T, S::PublicId>> {
event
.payload_key()
.and_then(|key| self.observations.get(key))
.map(|info| &info.observation)
}
#[cfg(any(all(test, feature = "mock"), feature = "malice-detection"))]
fn event_creator_id<'a>(&'a self, event: &Event<S::PublicId>) -> Result<&'a S::PublicId> {
self.peer_list
.get(event.creator())
.map(Peer::id)
.ok_or_else(|| {
log_or_panic!(
"{:?} doesn't know the creator of {:?}",
self.our_pub_id(),
event
);
Error::Logic
})
}
}
#[cfg(feature = "malice-detection")]
impl<T: NetworkEvent, S: SecretId> Parsec<T, S> {
fn create_accusation_events(&mut self, other_parent: EventIndex) -> Result<()> {
let pending_accusations = mem::replace(&mut self.pending_accusations, vec![]);
for (offender, malice) in pending_accusations {
self.create_accusation_event(offender, malice, other_parent)?;
}
Ok(())
}
fn create_accusation_event(
&mut self,
offender: PeerIndex,
malice: Malice<T, S::PublicId>,
other_parent: EventIndex,
) -> Result<()> {
self.process_or_queue_pending_event(PendingEvent::Accusation {
offender,
malice,
other_parent,
})
}
fn add_accusation_event(
&mut self,
offender: PeerIndex,
malice: Malice<T, S::PublicId>,
other_parent: EventIndex,
) -> Result<()> {
if self
.will_be_ancestor_of_our_next_sync(&malice, other_parent)
.unwrap_or(false)
{
let offender = self.peer_list.get_known(offender)?.id().clone();
let event = self.new_event_from_observation(
self.our_last_event_index()?,
Observation::Accusation { offender, malice },
)?;
let _ = self.add_event(event)?;
} else {
self.pending_accusations.push((offender, malice));
}
Ok(())
}
fn will_be_ancestor_of_our_next_sync(
&self,
malice: &Malice<T, S::PublicId>,
other_parent: EventIndex,
) -> Result<bool> {
let self_parent = self
.our_last_event_index()
.and_then(|index| self.get_known_event(index))?;
let other_parent = self.get_known_event(other_parent)?;
Ok(self.accused_events(&malice).iter().all(|accused_event| {
self_parent.is_descendant_of(accused_event)
|| other_parent.is_descendant_of(accused_event)
}))
}
fn detect_malice(&mut self, event: &Event<S::PublicId>) -> Result<()> {
self.detect_incorrect_genesis(event)?;
self.detect_other_parent_by_same_creator(event)?;
self.detect_self_parent_by_different_creator(event)?;
self.detect_invalid_sync_event(event)?;
self.detect_unexpected_genesis(event);
self.detect_missing_genesis(event);
self.detect_duplicate_vote(event);
self.detect_fork(event);
self.detect_invalid_accusations(event);
Ok(())
}
fn detect_incorrect_genesis(&mut self, event: &Event<S::PublicId>) -> Result<()> {
if let Some(Observation::Genesis { ref group, .. }) = self.event_payload(event) {
if self.genesis_group() == group.iter().collect() {
return Ok(());
}
} else {
return Ok(());
}
let packed_event = Box::new(event.pack(self.event_context())?);
self.accuse(event.creator(), Malice::IncorrectGenesis(packed_event));
Err(Error::InvalidEvent)
}
fn detect_other_parent_by_same_creator(&mut self, event: &Event<S::PublicId>) -> Result<()> {
if let Some(other_parent) = self.graph.other_parent(event) {
if other_parent.creator() != event.creator() {
return Ok(());
}
} else {
return Ok(());
}
let packed_event = event.pack(self.event_context())?;
self.accuse(
event.creator(),
Malice::OtherParentBySameCreator(Box::new(packed_event)),
);
Err(Error::InvalidEvent)
}
fn detect_self_parent_by_different_creator(
&mut self,
event: &Event<S::PublicId>,
) -> Result<()> {
if let Some(self_parent) = self.graph.self_parent(event) {
if self_parent.creator() == event.creator() {
return Ok(());
}
} else {
return Ok(());
}
let packed_event = event.pack(self.event_context())?;
self.accuse(
event.creator(),
Malice::SelfParentByDifferentCreator(Box::new(packed_event)),
);
Err(Error::InvalidEvent)
}
fn detect_invalid_sync_event(&mut self, event: &Event<S::PublicId>) -> Result<()> {
if self.graph.is_valid_sync_event(event).unwrap_or(true) {
return Ok(());
}
let packed_event = Box::new(event.pack(self.event_context())?);
let malice = if event.is_request() {
Malice::InvalidRequest(packed_event)
} else {
Malice::InvalidResponse(packed_event)
};
self.accuse(event.creator(), malice);
Err(Error::InvalidEvent)
}
fn detect_unexpected_genesis(&mut self, event: &Event<S::PublicId>) {
let accuse = {
let payload = if let Some(payload) = self.event_payload(event) {
payload
} else {
return;
};
let genesis_group = if let Observation::Genesis { ref group, .. } = *payload {
group
} else {
return;
};
let creator_id = if let Ok(id) = self.event_creator_id(event) {
id
} else {
return;
};
!genesis_group.contains(creator_id)
|| self
.graph
.self_parent(event)
.map_or(true, |self_parent| !self_parent.is_initial())
};
if accuse {
self.accuse(event.creator(), Malice::UnexpectedGenesis(*event.hash()));
}
}
fn detect_missing_genesis(&mut self, event: &Event<S::PublicId>) {
if event.index_by_creator() != 1 {
return;
}
if let Some(&Observation::Genesis { .. }) = self.event_payload(event) {
return;
}
let accuse = {
let creator_id = if let Ok(id) = self.event_creator_id(event) {
id
} else {
return;
};
self.genesis_group().contains(creator_id)
};
if accuse {
self.accuse(event.creator(), Malice::MissingGenesis(*event.hash()));
}
}
fn detect_duplicate_vote(&mut self, event: &Event<S::PublicId>) {
let other_hash = {
let payload_key = if let Some(key) = event.payload_key() {
key
} else {
return;
};
let mut duplicates = self
.peer_list
.peer_events(event.creator())
.rev()
.filter_map(|index| self.get_known_event(index).ok())
.filter(|event| {
event
.payload_key()
.map_or(false, |event_payload_key| event_payload_key == payload_key)
})
.map(|event| *event.hash())
.take(2);
let hash = if let Some(hash) = duplicates.next() {
hash
} else {
return;
};
if duplicates.next().is_some() {
return;
}
hash
};
self.accuse(
event.creator(),
Malice::DuplicateVote(other_hash, *event.hash()),
);
}
fn detect_fork(&mut self, event: &Event<S::PublicId>) {
if self.is_first_fork(event) {
if let Some(self_parent_hash) = self.graph.self_parent(event).map(|event| *event.hash())
{
self.accuse(event.creator(), Malice::Fork(self_parent_hash));
}
}
}
fn is_first_fork(&self, event: &Event<S::PublicId>) -> bool {
let same_index_events = self
.peer_list
.events_by_index(event.creator(), event.index_by_creator());
same_index_events
.filter_map(|other_event| self.graph.get(other_event))
.filter(|other_event| other_event.inner().self_parent() == event.self_parent())
.count()
== 1
}
fn detect_invalid_accusations(&mut self, event: &Event<S::PublicId>) {
if !event.is_sync_event() {
return;
}
let mut invalid_accusations = vec![];
let mut self_parent_index = event.self_parent();
while let Some(self_parent) =
self_parent_index.and_then(|event_index| self.get_known_event(event_index).ok())
{
match self.event_payload(&self_parent) {
Some(&Observation::Accusation {
ref offender,
ref malice,
}) => {
if malice.is_provable() && !self.we_have_accused(offender, malice) {
invalid_accusations.push(*self_parent.hash());
}
self_parent_index = self_parent.self_parent();
}
_ => {
self_parent_index = None;
}
}
}
for event_hash in invalid_accusations {
self.accuse(event.creator(), Malice::InvalidAccusation(event_hash))
}
}
fn we_have_accused(&self, offender: &S::PublicId, malice: &Malice<T, S::PublicId>) -> bool {
let their_accusation = if let Some(offender_index) = self.peer_list.get_index(offender) {
(offender_index, malice)
} else {
return false;
};
if self
.pending_accusations
.iter()
.any(|&(our_offender, ref our_malice)| their_accusation == (our_offender, our_malice))
{
return true;
}
self.peer_list
.our_events()
.rev()
.filter_map(|event_index| self.get_known_event(event_index).ok())
.filter_map(|event| {
if let Some(&Observation::Accusation {
ref offender,
ref malice,
}) = self.event_payload(event.inner())
{
Some((offender, malice))
} else {
None
}
})
.filter_map(|(offender_id, malice)| {
self.peer_list
.get_index(offender_id)
.map(|index| (index, malice))
})
.any(|our_accusation| their_accusation == our_accusation)
}
fn detect_premature_gossip(&self) -> Result<()> {
self.confirm_self_state(PeerState::DKG)
.map_err(|_| Error::PrematureGossip)
}
fn accuse(&mut self, offender: PeerIndex, malice: Malice<T, S::PublicId>) {
self.pending_accusations.push((offender, malice));
}
fn accusations_by_peer_since(
&self,
peer_index: PeerIndex,
oldest_event: Option<EventIndex>,
) -> impl Iterator<Item = (PeerIndex, &Malice<T, S::PublicId>)> {
self.graph
.iter_from(oldest_event.map(EventIndex::topological_index).unwrap_or(0))
.filter(move |event| event.creator() == peer_index)
.filter_map(move |event| match self.event_payload(event.inner()) {
Some(Observation::Accusation { offender, malice }) => Some((offender, malice)),
_ => None,
})
.filter_map(move |(offender, malice)| {
self.peer_list
.get_index(offender)
.map(|offender| (offender, malice))
})
}
fn accused_events<'a>(
&'a self,
malice: &'a Malice<T, S::PublicId>,
) -> Vec<IndexedEventRef<'a, S::PublicId>> {
malice
.accused_events_in_graph()
.iter()
.filter_map(move |hash| self.graph.get_by_hash(hash))
.collect()
}
fn accused_events_are_ancestors_of(
&self,
malice: &Malice<T, S::PublicId>,
event_index: EventIndex,
) -> bool {
let event = if let Some(event) = self.graph.get(event_index) {
event
} else {
return false;
};
self.accused_events(malice)
.iter()
.all(|accused_event| event.is_descendant_of(accused_event))
}
fn events_with_self_parent(
&self,
parent: IndexedEventRef<S::PublicId>,
) -> impl Iterator<Item = IndexedEventRef<S::PublicId>> {
let parent_index = parent.event_index();
self.peer_list
.events_by_index(parent.creator(), parent.index_by_creator() + 1)
.filter_map(move |descendant_index| self.get_known_event(descendant_index).ok())
.filter(move |descendant| descendant.self_parent() == Some(parent_index))
}
fn accused_event_is_fork_but_this_event_is_not_a_fork_descendant(
&self,
malice: &Malice<T, S::PublicId>,
event_index: EventIndex,
) -> bool {
if let Malice::Fork(accused_event_hash) = malice {
self.graph
.get_by_hash(accused_event_hash)
.and_then(|accused_event| {
self.graph
.get(event_index)
.map(|event| (accused_event, event))
})
.map(|(accused_event, event)| {
self.events_with_self_parent(accused_event)
.filter(|forked_event| event.is_descendant_of(forked_event))
.count()
< 2
})
.unwrap_or(true)
} else {
false
}
}
fn detect_accomplice(&mut self, event_index: EventIndex) -> Result<()> {
let (event_hash, creator) = {
let event = self.get_known_event(event_index)?;
if !event.is_request() && !event.is_response() {
return Ok(());
}
(*event.hash(), event.creator())
};
let starting_index = self.peer_list.accomplice_event_checkpoint_by(creator);
let accusations =
self.detect_accomplice_for_our_accusations(event_index, starting_index)?;
for (_, malice) in accusations {
self.accuse(creator, Malice::Accomplice(event_hash, Box::new(malice)));
}
let last_malice_event_accused_by_peer = self
.accusations_by_peer_since(creator, starting_index)
.filter_map(|(_, malice)| malice.single_hash().and_then(|h| self.graph.get_index(&h)))
.max_by_key(|event_index| event_index.topological_index());
if let Some(index) = last_malice_event_accused_by_peer {
self.peer_list
.update_accomplice_event_checkpoint_by(creator, index);
}
Ok(())
}
fn detect_accomplice_for_our_accusations(
&self,
event_index: EventIndex,
starting_event: Option<EventIndex>,
) -> Result<Accusations<T, S::PublicId>> {
let creator = self.get_known_event(event_index)?.creator();
let our_accusations = self.accusations_by_peer_since(PeerIndex::OUR, starting_event);
let accusations_by_peer_since_starter_event = self
.accusations_by_peer_since(creator, starting_event)
.collect_vec();
Ok(self
.pending_accusations
.iter()
.map(|(offender, malice)| (*offender, malice))
.chain(our_accusations)
.filter(|(offender, _)| offender != &creator)
.filter(|(_, malice)| self.accused_events_are_ancestors_of(&malice, event_index))
.filter(|(_, malice)| {
!self.accused_event_is_fork_but_this_event_is_not_a_fork_descendant(
&malice,
event_index,
)
})
.filter(|accusation| !accusations_by_peer_since_starter_event.contains(accusation))
.filter(|(_, malice)| {
!self.pending_accusations.iter().any(|(off, mal)| match mal {
Malice::Accomplice(_, ori_mal) => off == &creator && **ori_mal == **malice,
_ => false,
})
})
.map(|(offender, malice)| (offender, malice.clone()))
.collect())
}
fn genesis_group(&self) -> BTreeSet<&S::PublicId> {
self.graph
.iter()
.filter_map(|event| {
let observation = self.event_payload(&*event)?;
if let Observation::Genesis { ref group, .. } = *observation {
Some(group.iter().collect())
} else {
None
}
})
.next()
.unwrap_or_else(|| self.peer_list.voters().map(|(_, peer)| peer.id()).collect())
}
}
impl<T: NetworkEvent, S: SecretId> Drop for Parsec<T, S> {
fn drop(&mut self) {
dump_graph::to_file(dump_graph::ToFileInfo {
owner_id: self.our_pub_id(),
consensus_mode: self.consensus_mode,
gossip_graph: &self.graph,
meta_election: &self.meta_election,
peer_list: &self.peer_list,
observations: &self.observations,
secure_rng: &self.secure_rng,
key_gens_and_next_id: (&self.key_gen, &self.key_gen_next_id),
info: &dump_graph::DumpGraphContext::DroppingParsec,
});
}
}
fn get_known_event<'a, P: PublicId>(
our_pub_id: &P,
graph: &'a Graph<P>,
event_index: EventIndex,
) -> Result<IndexedEventRef<'a, P>> {
graph.get(event_index).ok_or_else(|| {
log_or_panic!("{:?} doesn't have event {:?}", our_pub_id, event_index);
Error::Logic
})
}
enum PostProcessAction {
Continue,
Restart(usize),
}
#[cfg(feature = "malice-detection")]
type Accusations<T, P> = Vec<(PeerIndex, Malice<T, P>)>;
enum PendingEvent<T: NetworkEvent, P: PublicId> {
Sync {
is_request: bool,
other_parent: EventIndex,
_phantom: PhantomData<(T, P)>,
},
DkgMessage {
msg: DkgMessage,
},
#[cfg(feature = "malice-detection")]
Accusation {
offender: PeerIndex,
malice: Malice<T, P>,
other_parent: EventIndex,
},
}
#[cfg(any(test, feature = "testing"))]
impl<T: NetworkEvent, S: SecretId> Parsec<T, S> {
pub(crate) fn set_ignore_process_events(&mut self) {
self.ignore_process_events = true;
}
pub(crate) fn ignore_process_events(&self) -> bool {
self.ignore_process_events
}
}
#[cfg(any(feature = "testing", all(test, feature = "mock")))]
impl Parsec<Transaction, PeerId> {
#[cfg(all(test, feature = "mock"))]
pub(crate) fn from_parsed_contents(
mut parsed_contents: ParsedContents,
secure_rng: Box<dyn RngCore>,
) -> Self {
let peer_list = PeerList::new(parsed_contents.our_id);
let mut parsec = Parsec::empty(
peer_list,
PeerIndexSet::default(),
parsed_contents.consensus_mode,
secure_rng,
);
for event in &parsed_contents.graph {
if let Some(payload_key) = event.payload_key() {
if let Some(info) = parsed_contents.observations.get_mut(payload_key) {
if event.creator() == PeerIndex::OUR {
info.created_by_us = true;
}
}
}
}
for consensused in parsed_contents.meta_election.consensus_history() {
let _ = parsed_contents
.observations
.get_mut(consensused)
.map(|info| info.consensused = true);
}
if let Some(serialized_key_gens_and_next_id) =
&parsed_contents.serialized_key_gens_and_next_id
{
let (key_gen, key_gen_next_id) =
bincode::deserialize(serialized_key_gens_and_next_id).unwrap();
parsec.key_gen = key_gen;
parsec.key_gen_next_id = key_gen_next_id;
}
parsec.graph = parsed_contents.graph;
parsec.meta_election = parsed_contents.meta_election;
parsec.peer_list = parsed_contents.peer_list;
parsec.observations = parsed_contents.observations;
parsec
}
pub fn meta_election_consensus_history_hash(&self) -> Vec<Hash> {
self.meta_election
.consensus_history
.iter()
.map(|observation| observation.hash().0)
.collect()
}
}
#[cfg(any(test, feature = "testing"))]
pub(crate) struct TestParsec<T: NetworkEvent, S: SecretId>(Parsec<T, S>);
#[cfg(any(test, feature = "testing"))]
impl<T: NetworkEvent, S: SecretId> TestParsec<T, S> {
pub fn from_genesis(
our_id: S,
genesis_group: &BTreeSet<S::PublicId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
TestParsec(Parsec::from_genesis(
our_id,
genesis_group,
vec![],
consensus_mode,
secure_rng,
))
}
pub fn from_existing(
our_id: S,
genesis_group: &BTreeSet<S::PublicId>,
section: &BTreeSet<S::PublicId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
TestParsec(Parsec::from_existing(
our_id,
genesis_group,
section,
consensus_mode,
secure_rng,
))
}
pub fn graph(&self) -> &Graph<S::PublicId> {
&self.0.graph
}
pub fn peer_list(&self) -> &PeerList<S> {
&self.0.peer_list
}
pub fn our_last_event_index(&self) -> EventIndex {
unwrap!(self.0.our_last_event_index())
}
pub fn event_context(&self) -> EventContextRef<T, S> {
self.0.event_context()
}
pub fn events_to_gossip_to_peer(
&self,
peer_index: PeerIndex,
) -> Result<Vec<&Event<S::PublicId>>> {
self.0.events_to_gossip_to_peer(peer_index)
}
pub fn get_peer_index(&self, peer_id: &S::PublicId) -> Result<PeerIndex> {
self.0.get_peer_index(peer_id)
}
pub fn confirm_allowed_to_gossip_to(&self, peer_index: PeerIndex) -> Result<()> {
self.0.confirm_allowed_to_gossip_to(peer_index)
}
#[cfg(all(test, feature = "mock"))]
pub fn event_payload(
&self,
event: &Event<S::PublicId>,
) -> Option<&Observation<T, S::PublicId>> {
self.0.event_payload(event)
}
}
#[cfg(all(test, feature = "mock"))]
impl TestParsec<Transaction, PeerId> {
pub fn from_parsed_contents(
parsed_contents: ParsedContents,
secure_rng: Box<dyn RngCore>,
) -> Self {
TestParsec(Parsec::from_parsed_contents(parsed_contents, secure_rng))
}
pub fn meta_election(&self) -> &MetaElection {
&self.meta_election
}
pub fn consensused_blocks(&self) -> impl Iterator<Item = &Block<Transaction, PeerId>> {
self.0.consensused_blocks.iter().flatten()
}
pub fn change_peer_state(&mut self, peer_id: &PeerId, state: PeerState) {
let peer_index = unwrap!(self.0.peer_list.get_index(peer_id));
self.0.peer_list.change_peer_state(peer_index, state)
}
pub fn pack_event(&self, event: &Event<PeerId>) -> PackedEvent<Transaction, PeerId> {
unwrap!(event.pack(self.0.event_context()))
}
pub fn unpack_and_add_event(
&mut self,
packed_event: PackedEvent<Transaction, PeerId>,
) -> Result<EventIndex> {
match self.0.unpack(packed_event)? {
Some(event) => self.0.add_event(event),
None => Err(Error::Logic),
}
}
pub fn add_event(&mut self, event: Event<PeerId>) -> Result<EventIndex> {
self.0.add_event(event)
}
pub fn event_creator_id(&self, event: &Event<PeerId>) -> &PeerId {
unwrap!(self.0.event_creator_id(event))
}
pub fn new_event_from_observation(
&mut self,
self_parent: EventIndex,
observation: Observation<Transaction, PeerId>,
) -> Result<Event<PeerId>> {
self.0.new_event_from_observation(self_parent, observation)
}
}
#[cfg(all(test, feature = "malice-detection", feature = "mock"))]
impl TestParsec<Transaction, PeerId> {
pub fn remove_last_event(&mut self) -> Option<(EventIndex, Event<PeerId>)> {
let (event_index, event) = self.graph.remove_last()?;
assert_eq!(
event_index,
unwrap!(self.peer_list.remove_last_event(event.creator()))
);
if let Some(payload_key) = event.payload_key() {
let _ = self
.0
.meta_election
.unconsensused_events
.ordered_indices
.remove(&event_index);
let _ = self
.0
.meta_election
.unconsensused_events
.indices_by_key
.get_mut(payload_key)
.map(|indices| indices.remove(&event_index));
}
Some((event_index, event))
}
pub fn handle_request_make_false_accusation(
&mut self,
src: &PeerId,
req: Request<Transaction, PeerId>,
) -> EventHash {
let src_index = unwrap!(self.0.get_peer_index(src));
let last_hash = unwrap!(req.packed_events.last()).compute_hash();
let other_parent = unwrap!(self.0.unpack_and_add_events(src_index, req.packed_events));
unwrap!(self.0.create_accusation_events(other_parent));
let invalid_observation = Observation::<Transaction, _>::Accusation {
offender: src.clone(),
malice: Malice::Fork(last_hash),
};
unwrap!(self.0.vote_for(invalid_observation.clone()));
let invalid_accusation_hash = {
let invalid_accusation = unwrap!(self.0.graph.get(self.our_last_event_index()));
assert_eq!(
self.0.event_payload(&invalid_accusation),
Some(&invalid_observation)
);
*invalid_accusation.hash()
};
unwrap!(self.0.create_sync_event(true, other_parent));
invalid_accusation_hash
}
pub fn handle_request_as_accomplice(
&mut self,
src: &PeerId,
req: Request<Transaction, PeerId>,
) {
let src_index = unwrap!(self.0.get_peer_index(src));
let other_parent = unwrap!(self.0.unpack_and_add_events(src_index, req.packed_events));
self.0.pending_accusations.clear();
unwrap!(self.0.create_sync_event(true, other_parent));
}
pub fn pending_accusations(&self) -> &Accusations<Transaction, PeerId> {
&self.0.pending_accusations
}
pub fn add_peer(&mut self, peer_id: PeerId, state: PeerState) {
let _ = self.0.peer_list.add_peer(peer_id, state);
}
pub fn restart_consensus(&mut self) -> Result<()> {
self.0.process_events(0)
}
}
#[cfg(any(test, feature = "testing"))]
impl<T: NetworkEvent, S: SecretId> From<Parsec<T, S>> for TestParsec<T, S> {
fn from(parsec: Parsec<T, S>) -> Self {
TestParsec(parsec)
}
}
#[cfg(any(test, feature = "testing"))]
impl<T: NetworkEvent, S: SecretId> Deref for TestParsec<T, S> {
type Target = Parsec<T, S>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[cfg(any(test, feature = "testing"))]
impl<T: NetworkEvent, S: SecretId> DerefMut for TestParsec<T, S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[cfg(all(test, any(feature = "testing", feature = "mock")))]
pub(crate) fn get_graph_snapshot<T: NetworkEvent, S: SecretId>(
parsec: &Parsec<T, S>,
ignore_last_events: usize,
) -> GraphSnapshot {
GraphSnapshot::new_with_ignore(&parsec.graph, ignore_last_events)
}