use super::{
meta_event::{MetaEvent, MetaEventBuilder},
meta_vote::MetaVote,
};
use crate::{
gossip::{EventIndex, Graph},
id::PublicId,
observation::ObservationKey,
peer_list::{PeerIndex, PeerIndexMap, PeerIndexSet, PeerListChange},
};
use fnv::{FnvHashMap, FnvHashSet};
use std::{cmp, collections::BTreeSet, usize};
lazy_static! {
static ref EMPTY_BTREESET_EVENT_INDEX: BTreeSet<EventIndex> = BTreeSet::new();
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub(crate) struct UnconsensusedEvents {
pub(crate) ordered_indices: BTreeSet<EventIndex>,
pub(crate) indices_by_key: FnvHashMap<ObservationKey, BTreeSet<EventIndex>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct MetaElection {
pub(crate) meta_events: FnvHashMap<EventIndex, MetaEvent>,
pub(crate) voters: PeerIndexSet,
pub(crate) interesting_events: PeerIndexMap<(Vec<EventIndex>, FnvHashSet<ObservationKey>)>,
pub(crate) unconsensused_events: UnconsensusedEvents,
pub(crate) consensus_history: Vec<ObservationKey>,
pub(crate) continue_consensus_start_index: usize,
pub(crate) new_consensus_start_index: usize,
}
impl MetaElection {
pub fn new(voters: PeerIndexSet) -> Self {
MetaElection {
meta_events: FnvHashMap::default(),
voters,
interesting_events: PeerIndexMap::default(),
unconsensused_events: UnconsensusedEvents::default(),
consensus_history: Vec::new(),
continue_consensus_start_index: 0,
new_consensus_start_index: 0,
}
}
pub fn add_meta_event<P: PublicId>(&mut self, builder: MetaEventBuilder<P>) {
let event_index = builder.event().event_index();
let creator = builder.event().creator();
let meta_event = builder.finish();
if !meta_event.interesting_content.is_empty() {
self.add_interesting_event(creator, event_index, &meta_event.interesting_content);
}
let _ = self.meta_events.insert(event_index, meta_event);
}
pub fn remove_meta_event(&mut self, event_index: EventIndex) -> Option<MetaEvent> {
self.meta_events.remove(&event_index)
}
pub fn meta_event(&self, event_index: EventIndex) -> Option<&MetaEvent> {
self.meta_events.get(&event_index)
}
pub fn populated_meta_votes(
&self,
event_index: EventIndex,
) -> Option<&PeerIndexMap<Vec<MetaVote>>> {
self.meta_events
.get(&event_index)
.map(|meta_event| &meta_event.meta_votes)
.filter(|meta_votes| !meta_votes.is_empty())
}
pub fn voters(&self) -> &PeerIndexSet {
&self.voters
}
pub fn consensus_history(&self) -> &[ObservationKey] {
&self.consensus_history
}
pub fn interesting_events(&self) -> impl Iterator<Item = (PeerIndex, &[EventIndex])> {
self.interesting_events
.iter()
.map(|(peer_index, (event_indices, _))| (peer_index, &event_indices[..]))
}
pub fn interesting_content_by(&self, creator: PeerIndex) -> Option<&Vec<ObservationKey>> {
self.interesting_events
.get(creator)
.and_then(|(indices, _)| indices.first())
.and_then(|event_index| self.meta_events.get(event_index))
.map(|meta_event| &meta_event.interesting_content)
}
pub fn is_already_interesting_content(
&self,
creator: PeerIndex,
payload_key: &ObservationKey,
) -> bool {
self.interesting_events
.get(creator)
.map_or(false, |(_, contents)| contents.contains(payload_key))
}
pub fn continue_consensus_start_index(&self) -> usize {
self.continue_consensus_start_index
}
pub fn new_election<P: PublicId>(
&mut self,
graph: &Graph<P>,
decided_keys: Vec<ObservationKey>,
peer_list_changes: Vec<PeerListChange>,
) {
let peer_list_changed = !peer_list_changes.is_empty();
self.update_voters(peer_list_changes);
self.update_unconsensused_events(&decided_keys);
self.update_new_consensus_start_index(graph.len());
self.update_continue_consensus_start_index(peer_list_changed);
self.update_meta_events(&decided_keys, peer_list_changed);
self.update_interesting_content(graph);
self.consensus_history.extend(decided_keys);
}
#[cfg(feature = "dump-graphs")]
pub fn meta_events(&self) -> &FnvHashMap<EventIndex, MetaEvent> {
&self.meta_events
}
pub fn add_unconsensused_event(&mut self, event_index: EventIndex, key: ObservationKey) {
let _ = self
.unconsensused_events
.ordered_indices
.insert(event_index);
let _ = self
.unconsensused_events
.indices_by_key
.entry(key)
.or_insert_with(BTreeSet::new)
.insert(event_index);
}
pub fn unconsensused_events<'a>(
&'a self,
filter_key: Option<&ObservationKey>,
) -> impl Iterator<Item = EventIndex> + 'a {
let indices = if let Some(filter_key) = filter_key {
self.unconsensused_events
.indices_by_key
.get(filter_key)
.unwrap_or(&*EMPTY_BTREESET_EVENT_INDEX)
} else {
&self.unconsensused_events.ordered_indices
};
indices.iter().cloned()
}
fn add_interesting_event(
&mut self,
creator: PeerIndex,
event_index: EventIndex,
interesting_content: &[ObservationKey],
) {
let (indices, contents) = self
.interesting_events
.entry(creator)
.or_insert_with(|| (Vec::new(), FnvHashSet::default()));
indices.push(event_index);
contents.extend(interesting_content);
}
fn update_unconsensused_events(&mut self, decided_keys: &[ObservationKey]) {
for key in decided_keys {
if let Some(remove) = self.unconsensused_events.indices_by_key.remove(key) {
for event_index in remove {
let _ = self
.unconsensused_events
.ordered_indices
.remove(&event_index);
}
}
}
}
fn update_voters(&mut self, peer_list_changes: Vec<PeerListChange>) {
for peer_list_change in peer_list_changes {
match peer_list_change {
PeerListChange::Add(peer_index) => {
if !self.voters.insert(peer_index) {
log_or_panic!("Meta election already contains {:?}", peer_index);
}
}
PeerListChange::Remove(peer_index) => {
if !self.voters.remove(peer_index) {
log_or_panic!("Meta election doesn't contain {:?}", peer_index);
}
}
}
}
}
fn update_new_consensus_start_index(&mut self, graph_len: usize) {
self.new_consensus_start_index = self
.unconsensused_events
.ordered_indices
.iter()
.next()
.map(|event_index| event_index.topological_index())
.unwrap_or(graph_len);
}
fn update_continue_consensus_start_index(&mut self, peer_list_changed: bool) {
self.continue_consensus_start_index = if peer_list_changed {
self.new_consensus_start_index
} else {
cmp::max(
self.meta_events
.iter()
.filter(|(_, meta_event)| meta_event.is_observer())
.map(|(event_index, _)| event_index.topological_index())
.min()
.unwrap_or(self.new_consensus_start_index),
self.new_consensus_start_index,
)
};
}
fn update_meta_events(&mut self, decided_keys: &[ObservationKey], peer_list_changed: bool) {
if peer_list_changed {
self.meta_events.clear();
} else {
let new_consensus_start_index = self.new_consensus_start_index;
self.meta_events.retain(|event_index, _| {
event_index.topological_index() >= new_consensus_start_index
});
let decided_keys_lookup: FnvHashSet<_> = decided_keys.iter().collect();
for meta_event in self.meta_events.values_mut() {
meta_event
.interesting_content
.retain(|payload_key| !decided_keys_lookup.contains(payload_key));
}
}
}
fn update_interesting_content<P: PublicId>(&mut self, graph: &Graph<P>) {
self.interesting_events.clear();
let continue_consensus_start_index = self.continue_consensus_start_index;
for event in graph
.iter_from(self.new_consensus_start_index)
.take_while(|event| event.topological_index() < continue_consensus_start_index)
{
let interesting = self
.meta_events
.get(&event.event_index())
.map(|meta_event| &meta_event.interesting_content)
.filter(|interesting_content| !interesting_content.is_empty())
.cloned();
if let Some(interesting) = interesting {
self.add_interesting_event(event.creator(), event.event_index(), &interesting)
}
}
}
}
#[cfg(any(all(test, feature = "mock"), feature = "dump-graphs"))]
pub(crate) mod snapshot {
use super::{super::meta_event::snapshot::MetaEventSnapshot, *};
use crate::{
gossip::{EventHash, Graph},
id::SecretId,
peer_list::PeerList,
};
use std::collections::BTreeMap;
#[serde(bound = "")]
#[derive(Eq, PartialEq, Debug, Serialize, Deserialize)]
pub(crate) struct MetaElectionSnapshot<P: PublicId> {
meta_events: BTreeMap<EventHash, MetaEventSnapshot<P>>,
voters: BTreeSet<P>,
interesting_events: BTreeMap<P, Vec<EventHash>>,
}
impl<P: PublicId> MetaElectionSnapshot<P> {
pub fn new<S>(
meta_election: &MetaElection,
graph: &Graph<P>,
peer_list: &PeerList<S>,
) -> Self
where
S: SecretId<PublicId = P>,
{
let meta_events = meta_election
.meta_events
.iter()
.filter_map(|(index, meta_event)| {
graph
.get(*index)
.map(|event| *event.hash())
.map(|hash| (hash, MetaEventSnapshot::new(meta_event, peer_list)))
})
.collect();
let interesting_events = meta_election
.interesting_events
.iter()
.filter_map(|(peer_index, event_indices)| {
peer_list
.get(peer_index)
.map(|peer| (peer.id(), event_indices))
})
.map(|(peer_id, (indices, _))| {
let hashes = indices
.iter()
.filter_map(|index| graph.get(*index).map(|event| *event.hash()))
.collect();
(peer_id.clone(), hashes)
})
.collect();
MetaElectionSnapshot {
meta_events,
voters: meta_election
.voters
.iter()
.filter_map(|index| peer_list.get(index).map(|peer| peer.id().clone()))
.collect(),
interesting_events,
}
}
}
}