use super::Observation;
use crate::{
block::{Block, BlockGroup},
error::Result,
gossip::{Cause, Event, EventIndex, Request, Response},
mock::{PeerId, Transaction},
observation::{
is_more_than_two_thirds, ConsensusMode, Malice, Observation as ParsecObservation,
},
parsec::{Parsec, TestParsec},
peer_list::PeerIndex,
};
use itertools::Itertools;
use rand::{seq::SliceRandom, Rng, RngCore};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::{self, Debug, Formatter},
iter,
ops::{Deref, DerefMut},
};
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum PeerStatus {
Pending,
Active,
Removed,
Failed,
}
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum NetworkView {
Joining,
Joined,
Leaving,
Left,
}
struct ForkedEvent {
partner_event_index: EventIndex,
event: Event<PeerId>,
}
struct MaliciousComponents {
test_parsec: TestParsec<Transaction, PeerId>,
forked_event: Option<ForkedEvent>,
}
impl MaliciousComponents {
fn create_gossip_with_fork<R: Rng>(
&mut self,
recipient_id: &PeerId,
rng: &mut R,
) -> Result<Request<Transaction, PeerId>> {
assert!(self.forked_event.is_none());
let recipient_index = self.test_parsec.get_peer_index(recipient_id)?;
self.test_parsec
.confirm_allowed_to_gossip_to(recipient_index)?;
let self_id = self.test_parsec.our_pub_id().clone();
let common_self_parent_index = self.test_parsec.our_last_event_index();
let dummy_recipient_id = {
let mut gossip_recipients = self
.test_parsec
.peer_list()
.gossip_recipients()
.collect_vec();
gossip_recipients.shuffle(rng);
unwrap!(gossip_recipients
.iter()
.filter(|(_, peer)| peer.id() != &self_id && peer.id() != recipient_id)
.map(|(_, peer)| peer.id().clone())
.next())
};
let _ = unwrap!(self.test_parsec.create_gossip(&dummy_recipient_id));
let event = unwrap!(Event::new_from_requesting(
common_self_parent_index,
recipient_id,
self.test_parsec.event_context()
));
let forked_event = ForkedEvent {
partner_event_index: self.test_parsec.our_last_event_index(),
event,
};
self.forked_event = Some(forked_event);
let events = self.events_to_gossip_to_peer(recipient_index);
Ok(Request::new(
events
.into_iter()
.map(|event| unwrap!(event.pack(self.test_parsec.event_context())))
.collect_vec(),
))
}
fn events_to_gossip_to_peer(&self, recipient_index: PeerIndex) -> Vec<&Event<PeerId>> {
let events = if self
.test_parsec
.peer_list()
.last_event(recipient_index)
.is_some()
{
unwrap!(self.test_parsec.events_to_gossip_to_peer(recipient_index))
} else {
self.test_parsec.graph().iter().map(|e| e.inner()).collect()
};
let use_fork = self
.forked_event
.as_ref()
.map(|forked_event| match forked_event.event.cause() {
Cause::Requesting { recipient, .. } => recipient == &recipient_index,
_ => false,
})
.unwrap_or(false);
if use_fork {
let forked_event = unwrap!(self.forked_event.as_ref());
events
.into_iter()
.take_while(|event| {
unwrap!(self.test_parsec.graph().get_index(event.hash()))
!= forked_event.partner_event_index
})
.chain(iter::once(&forked_event.event))
.collect()
} else {
events
}
}
}
#[allow(clippy::large_enum_variant)]
enum WrappedParsec {
Good(Parsec<Transaction, PeerId>),
Malicious(MaliciousComponents),
}
impl Deref for WrappedParsec {
type Target = Parsec<Transaction, PeerId>;
fn deref(&self) -> &Self::Target {
match self {
WrappedParsec::Good(parsec) => &parsec,
WrappedParsec::Malicious(MaliciousComponents { test_parsec, .. }) => &*test_parsec,
}
}
}
impl DerefMut for WrappedParsec {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
WrappedParsec::Good(ref mut parsec) => parsec,
WrappedParsec::Malicious(MaliciousComponents {
ref mut test_parsec,
..
}) => test_parsec,
}
}
}
pub struct Peer {
parsec: WrappedParsec,
grouped_blocks: Vec<BlockGroup<Transaction, PeerId>>,
status: PeerStatus,
network_view: NetworkView,
votes_to_make: Vec<Observation>,
added_peers_ids: BTreeSet<PeerId>,
removed_peers_ids: BTreeSet<PeerId>,
}
impl Peer {
pub fn from_genesis(
id: PeerId,
genesis_group: &BTreeSet<PeerId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
Self::new(WrappedParsec::Good(Parsec::from_genesis(
id,
genesis_group,
vec![],
consensus_mode,
secure_rng,
)))
}
pub fn malicious_from_genesis(
id: PeerId,
genesis_group: &BTreeSet<PeerId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
Self::new(WrappedParsec::Malicious(MaliciousComponents {
test_parsec: TestParsec::from_genesis(id, genesis_group, consensus_mode, secure_rng),
forked_event: None,
}))
}
pub fn from_existing(
id: PeerId,
genesis_group: &BTreeSet<PeerId>,
current_group: &BTreeSet<PeerId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
Self::new(WrappedParsec::Good(Parsec::from_existing(
id,
genesis_group,
current_group,
consensus_mode,
secure_rng,
)))
}
pub fn malicious_from_existing(
id: PeerId,
genesis_group: &BTreeSet<PeerId>,
current_group: &BTreeSet<PeerId>,
consensus_mode: ConsensusMode,
secure_rng: Box<dyn RngCore>,
) -> Self {
Self::new(WrappedParsec::Malicious(MaliciousComponents {
test_parsec: TestParsec::from_existing(
id,
genesis_group,
current_group,
consensus_mode,
secure_rng,
),
forked_event: None,
}))
}
fn new(parsec: WrappedParsec) -> Self {
let (status, network_view) = if parsec.can_vote() {
(PeerStatus::Active, NetworkView::Joined)
} else {
(PeerStatus::Pending, NetworkView::Joining)
};
Self {
parsec,
grouped_blocks: vec![],
status,
network_view,
votes_to_make: vec![],
added_peers_ids: BTreeSet::new(),
removed_peers_ids: BTreeSet::new(),
}
}
pub fn vote_for(&mut self, observation: &Observation) {
self.votes_to_make.push(observation.clone());
}
pub fn make_votes(&mut self) {
let parsec = &mut self.parsec;
self.votes_to_make
.retain(|obs| !parsec.have_voted_for(obs) && parsec.vote_for(obs.clone()).is_err());
}
pub fn gossip_recipients(&self) -> impl Iterator<Item = &PeerId> {
self.parsec.gossip_recipients()
}
pub fn create_gossip(&mut self, peer_id: &PeerId) -> Result<Request<Transaction, PeerId>> {
self.parsec.create_gossip(peer_id)
}
pub fn handle_request(
&mut self,
src: &PeerId,
req: Request<Transaction, PeerId>,
) -> Result<Response<Transaction, PeerId>> {
self.parsec.handle_request(src, req)
}
pub fn handle_response(
&mut self,
src: &PeerId,
resp: Response<Transaction, PeerId>,
) -> Result<()> {
self.parsec.handle_response(src, resp)
}
fn make_active_if_added(&mut self, block: &Block<Transaction, PeerId>) {
if self.status == PeerStatus::Pending {
if let ParsecObservation::Add { ref peer_id, .. } = *block.payload() {
if self.id() == peer_id {
self.status = PeerStatus::Active;
}
}
}
}
pub fn poll_all(&mut self) {
while let Some(block_group) = self.parsec.batch_poll() {
for block in &block_group {
self.make_active_if_added(block);
match block.payload() {
ParsecObservation::Add { peer_id, .. } => {
assert!(self.added_peers_ids.insert(peer_id.clone()))
}
ParsecObservation::Remove { peer_id, .. } => {
assert!(self.removed_peers_ids.insert(peer_id.clone()))
}
_ => (),
}
}
self.grouped_blocks.push(block_group);
}
}
pub fn id(&self) -> &PeerId {
self.parsec.our_pub_id()
}
pub(crate) fn grouped_blocks(&self) -> &[BlockGroup<Transaction, PeerId>] {
&self.grouped_blocks
}
pub fn blocks(&self) -> impl Iterator<Item = &Block<Transaction, PeerId>> {
self.grouped_blocks.iter().flatten()
}
pub fn status(&self) -> PeerStatus {
self.status
}
pub fn ignore_process_events(&self) -> bool {
self.parsec.ignore_process_events()
}
pub fn set_ignore_process_events(&mut self) {
self.parsec.set_ignore_process_events();
}
pub fn network_view(&self) -> NetworkView {
self.network_view
}
pub fn is_running(&self) -> bool {
match self.status {
PeerStatus::Pending | PeerStatus::Active => true,
PeerStatus::Removed | PeerStatus::Failed => false,
}
}
pub fn mark_as_removed(&mut self) {
if self.status == PeerStatus::Failed {
panic!("{:?} already has status Failed.", self.id());
}
self.mark_network_view_as_leaving();
self.status = PeerStatus::Removed;
}
pub fn mark_as_failed(&mut self) {
if self.status == PeerStatus::Removed {
panic!("{:?} already has status Removed.", self.id());
}
self.mark_network_view_as_leaving();
self.status = PeerStatus::Failed;
}
pub fn mark_network_view_as_leaving(&mut self) {
match self.network_view {
NetworkView::Joining => {
panic!("Network views {:?} as not yet having joined.", self.id())
}
NetworkView::Joined => (),
NetworkView::Leaving | NetworkView::Left => return,
}
self.network_view = NetworkView::Leaving;
}
pub fn update_network_views(all_peers: &mut BTreeMap<PeerId, Peer>) {
let mut added_counts = BTreeMap::new();
let mut removed_counts = BTreeMap::new();
let mut running_peers_count = 0;
let do_count = |peer_ids: &BTreeSet<PeerId>, counts: &mut BTreeMap<PeerId, usize>| {
for peer_id in peer_ids {
let count = counts.entry(peer_id.clone()).or_insert(0);
*count += 1;
}
};
for peer in all_peers
.values()
.filter(|peer| peer.network_view == NetworkView::Joined)
{
do_count(&peer.added_peers_ids, &mut added_counts);
do_count(&peer.removed_peers_ids, &mut removed_counts);
running_peers_count += 1;
}
for (added_peer_id, count) in &added_counts {
let peer = unwrap!(all_peers.get_mut(added_peer_id));
if peer.network_view == NetworkView::Joining
&& is_more_than_two_thirds(*count, running_peers_count)
{
peer.network_view = NetworkView::Joined;
}
}
for (removed_peer_id, count) in &removed_counts {
if is_more_than_two_thirds(*count, running_peers_count) {
unwrap!(all_peers.get_mut(removed_peer_id)).network_view = NetworkView::Left;
}
}
}
pub fn blocks_payloads(&self) -> Vec<&Observation> {
self.blocks().map(Block::payload).collect()
}
pub fn unpolled_accusations(
&self,
) -> impl Iterator<Item = (&PeerId, &Malice<Transaction, PeerId>)> {
self.parsec
.our_unpolled_observations()
.filter_map(|payload| match payload {
ParsecObservation::Accusation {
ref offender,
ref malice,
} => Some((offender, malice)),
_ => None,
})
}
pub fn is_malicious(&self) -> bool {
match self.parsec {
WrappedParsec::Good(..) => false,
WrappedParsec::Malicious(..) => true,
}
}
pub fn has_misbehaved(&self) -> bool {
match self.parsec {
WrappedParsec::Good(..) => false,
WrappedParsec::Malicious(ref malicious_components) => {
malicious_components.forked_event.is_some()
}
}
}
pub fn create_gossip_with_fork<R: Rng>(
&mut self,
recipient_id: &PeerId,
rng: &mut R,
) -> Result<Request<Transaction, PeerId>> {
self.malicious_components_mut()
.create_gossip_with_fork(recipient_id, rng)
}
fn malicious_components_mut(&mut self) -> &mut MaliciousComponents {
match self.parsec {
WrappedParsec::Good(..) => panic!("{:?} is not a malicious test peer", self.id()),
WrappedParsec::Malicious(ref mut malicious_components) => malicious_components,
}
}
}
impl Debug for Peer {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
let malicious = if self.is_malicious() {
"(MALICIOUS) "
} else {
""
};
write!(
formatter,
"{:?} {}[{:?}/{:?}]: Blocks: {:?}",
self.id(),
malicious,
self.status,
self.network_view,
self.blocks_payloads()
)
}
}