use crate::{
simplex::{
elector::{Config as ElectorConfig, Elector},
scheme,
types::{
Activity, Attributable, ConflictingFinalize, ConflictingNotarize, Finalization,
Finalize, Notarization, Notarize, Nullification, Nullify, NullifyFinalize, Subject,
},
},
types::{Round, View},
Monitor, Viewable,
};
use commonware_codec::{Decode, DecodeExt, Encode};
use commonware_cryptography::{certificate::Scheme, Digest};
use commonware_parallel::Sequential;
use commonware_utils::{
channel::{
fallible::AsyncFallibleExt,
mpsc::{Receiver, Sender},
},
ordered::{Quorum, Set},
sync::Mutex,
N3f1,
};
use rand_core::CryptoRngCore;
use std::{
collections::{HashMap, HashSet},
hash::Hash,
sync::Arc,
};
type Participation<P, D> = HashMap<View, HashMap<D, HashSet<P>>>;
type Faults<S, D> = HashMap<<S as Scheme>::PublicKey, HashMap<View, HashSet<Activity<S, D>>>>;
#[derive(Clone, Debug)]
pub struct Config<S: Scheme, L: ElectorConfig<S>> {
pub participants: Set<S::PublicKey>,
pub scheme: S,
pub elector: L,
}
#[derive(Clone)]
pub struct Reporter<E: CryptoRngCore, S: Scheme, L: ElectorConfig<S>, D: Digest> {
context: E,
pub participants: Set<S::PublicKey>,
scheme: S,
elector: L::Elector,
pub leaders: Arc<Mutex<HashMap<View, S::PublicKey>>>,
pub certified: Arc<Mutex<HashSet<View>>>,
pub notarizes: Arc<Mutex<Participation<S::PublicKey, D>>>,
pub notarizations: Arc<Mutex<HashMap<View, Notarization<S, D>>>>,
pub nullifies: Arc<Mutex<HashMap<View, HashSet<S::PublicKey>>>>,
pub nullifications: Arc<Mutex<HashMap<View, Nullification<S>>>>,
pub finalizes: Arc<Mutex<Participation<S::PublicKey, D>>>,
pub finalizations: Arc<Mutex<HashMap<View, Finalization<S, D>>>>,
pub faults: Arc<Mutex<Faults<S, D>>>,
pub invalid_votes: Arc<Mutex<usize>>,
pub invalid_certificates: Arc<Mutex<usize>>,
latest: Arc<Mutex<View>>,
subscribers: Arc<Mutex<Vec<Sender<View>>>>,
}
impl<E, S, L, D> Reporter<E, S, L, D>
where
E: CryptoRngCore,
S: Scheme,
L: ElectorConfig<S>,
D: Digest + Eq + Hash + Clone,
{
pub fn new(context: E, cfg: Config<S, L>) -> Self {
let elector = cfg.elector.build(&cfg.participants);
Self {
context,
participants: cfg.participants,
scheme: cfg.scheme,
elector,
leaders: Arc::new(Mutex::new(HashMap::new())),
certified: Arc::new(Mutex::new(HashSet::new())),
notarizes: Arc::new(Mutex::new(HashMap::new())),
notarizations: Arc::new(Mutex::new(HashMap::new())),
nullifies: Arc::new(Mutex::new(HashMap::new())),
nullifications: Arc::new(Mutex::new(HashMap::new())),
finalizes: Arc::new(Mutex::new(HashMap::new())),
finalizations: Arc::new(Mutex::new(HashMap::new())),
faults: Arc::new(Mutex::new(HashMap::new())),
invalid_votes: Arc::new(Mutex::new(0)),
invalid_certificates: Arc::new(Mutex::new(0)),
latest: Arc::new(Mutex::new(View::zero())),
subscribers: Arc::new(Mutex::new(Vec::new())),
}
}
fn certified(&self, round: Round, certificate: &S::Certificate) {
self.certified.lock().insert(round.view());
let next_round = Round::new(round.epoch(), round.view().next());
let mut leaders = self.leaders.lock();
leaders.entry(next_round.view()).or_insert_with(|| {
let leader = self.elector.elect(next_round, Some(certificate));
self.participants.key(leader).cloned().unwrap()
});
}
pub fn assert_no_faults(&self) {
let faults = self.faults.lock();
assert!(faults.is_empty(), "faults detected");
}
pub fn assert_no_invalid(&self) {
let invalid_votes = self.invalid_votes.lock();
let invalid_certificates = self.invalid_certificates.lock();
assert_eq!(*invalid_votes, 0, "invalid votes detected");
assert_eq!(*invalid_certificates, 0, "invalid certificates detected");
}
}
impl<E, S, L, D> crate::Reporter for Reporter<E, S, L, D>
where
E: Clone + CryptoRngCore + Send + Sync + 'static,
S: scheme::Scheme<D>,
L: ElectorConfig<S>,
D: Digest + Eq + Hash + Clone,
{
type Activity = Activity<S, D>;
async fn report(&mut self, activity: Self::Activity) {
match &activity {
Activity::Notarize(notarize) => {
if !notarize.verify(&mut self.context, &self.scheme, &Sequential) {
*self.invalid_votes.lock() += 1;
return;
}
let encoded = notarize.encode();
Notarize::<S, D>::decode(encoded).unwrap();
let public_key = self.participants.key(notarize.signer()).unwrap().clone();
self.notarizes
.lock()
.entry(notarize.view())
.or_default()
.entry(notarize.proposal.payload)
.or_default()
.insert(public_key);
}
Activity::Notarization(notarization) | Activity::Certification(notarization) => {
let view = notarization.view();
if !self.scheme.verify_certificate::<_, D, N3f1>(
&mut self.context,
Subject::Notarize {
proposal: ¬arization.proposal,
},
¬arization.certificate,
&Sequential,
) {
*self.invalid_certificates.lock() += 1;
return;
}
let encoded = notarization.encode();
Notarization::<S, D>::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
self.notarizations.lock().insert(view, notarization.clone());
self.certified(notarization.round(), ¬arization.certificate);
}
Activity::Nullify(nullify) => {
if !nullify.verify(&mut self.context, &self.scheme, &Sequential) {
*self.invalid_votes.lock() += 1;
return;
}
let encoded = nullify.encode();
Nullify::<S>::decode(encoded).unwrap();
let public_key = self.participants.key(nullify.signer()).unwrap().clone();
self.nullifies
.lock()
.entry(nullify.view())
.or_default()
.insert(public_key);
}
Activity::Nullification(nullification) => {
let view = nullification.view();
if !self.scheme.verify_certificate::<_, D, N3f1>(
&mut self.context,
Subject::Nullify {
round: nullification.round,
},
&nullification.certificate,
&Sequential,
) {
*self.invalid_certificates.lock() += 1;
return;
}
let encoded = nullification.encode();
Nullification::<S>::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
self.nullifications
.lock()
.insert(view, nullification.clone());
self.certified(nullification.round, &nullification.certificate);
}
Activity::Finalize(finalize) => {
if !finalize.verify(&mut self.context, &self.scheme, &Sequential) {
*self.invalid_votes.lock() += 1;
return;
}
let encoded = finalize.encode();
Finalize::<S, D>::decode(encoded).unwrap();
let public_key = self.participants.key(finalize.signer()).unwrap().clone();
self.finalizes
.lock()
.entry(finalize.view())
.or_default()
.entry(finalize.proposal.payload)
.or_default()
.insert(public_key);
}
Activity::Finalization(finalization) => {
let view = finalization.view();
if !self.scheme.verify_certificate::<_, D, N3f1>(
&mut self.context,
Subject::Finalize {
proposal: &finalization.proposal,
},
&finalization.certificate,
&Sequential,
) {
*self.invalid_certificates.lock() += 1;
return;
}
let encoded = finalization.encode();
Finalization::<S, D>::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
self.finalizations.lock().insert(view, finalization.clone());
self.certified(finalization.round(), &finalization.certificate);
*self.latest.lock() = finalization.view();
let mut subscribers = self.subscribers.lock();
for subscriber in subscribers.iter_mut() {
subscriber.try_send_lossy(finalization.view());
}
}
Activity::ConflictingNotarize(conflicting) => {
let view = conflicting.view();
if !conflicting.verify(&mut self.context, &self.scheme, &Sequential) {
*self.invalid_votes.lock() += 1;
return;
}
let encoded = conflicting.encode();
ConflictingNotarize::<S, D>::decode(encoded).unwrap();
let public_key = self.participants.key(conflicting.signer()).unwrap().clone();
self.faults
.lock()
.entry(public_key)
.or_default()
.entry(view)
.or_default()
.insert(activity);
}
Activity::ConflictingFinalize(conflicting) => {
let view = conflicting.view();
if !conflicting.verify(&mut self.context, &self.scheme, &Sequential) {
*self.invalid_votes.lock() += 1;
return;
}
let encoded = conflicting.encode();
ConflictingFinalize::<S, D>::decode(encoded).unwrap();
let public_key = self.participants.key(conflicting.signer()).unwrap().clone();
self.faults
.lock()
.entry(public_key)
.or_default()
.entry(view)
.or_default()
.insert(activity);
}
Activity::NullifyFinalize(conflicting) => {
let view = conflicting.view();
if !conflicting.verify(&mut self.context, &self.scheme, &Sequential) {
*self.invalid_votes.lock() += 1;
return;
}
let encoded = conflicting.encode();
NullifyFinalize::<S, D>::decode(encoded).unwrap();
let public_key = self.participants.key(conflicting.signer()).unwrap().clone();
self.faults
.lock()
.entry(public_key)
.or_default()
.entry(view)
.or_default()
.insert(activity);
}
}
}
}
impl<E, S, L, D> Monitor for Reporter<E, S, L, D>
where
E: Clone + CryptoRngCore + Send + Sync + 'static,
S: Scheme,
L: ElectorConfig<S>,
D: Digest + Eq + Hash + Clone,
{
type Index = View;
async fn subscribe(&mut self) -> (Self::Index, Receiver<Self::Index>) {
let (tx, rx) = commonware_utils::channel::mpsc::channel(128);
self.subscribers.lock().push(tx);
let latest = *self.latest.lock();
(latest, rx)
}
}