use crate::{
ordered_broadcast::{
scheme,
types::{Activity, Chunk, ChunkVerifier, Lock, Proposal},
},
types::{Epoch, Height},
};
use commonware_codec::{Decode, DecodeExt, Encode};
use commonware_cryptography::{certificate::Scheme, Digest, PublicKey};
use commonware_parallel::Sequential;
use commonware_utils::channel::{mpsc, oneshot};
use rand_core::CryptoRngCore;
use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet};
#[allow(clippy::large_enum_variant)]
enum Message<C: PublicKey, S: Scheme, D: Digest> {
Proposal(Proposal<C, D>),
Locked(Lock<C, S, D>),
GetTip(C, oneshot::Sender<Option<(Height, Epoch)>>),
GetContiguousTip(C, oneshot::Sender<Option<Height>>),
Get(C, Height, oneshot::Sender<Option<(D, Epoch)>>),
}
pub struct Reporter<R: CryptoRngCore, C: PublicKey, S: Scheme, D: Digest> {
mailbox: mpsc::Receiver<Message<C, S, D>>,
rng: R,
chunk_verifier: ChunkVerifier,
scheme: S,
proposals: HashSet<Chunk<C, D>>,
limit_misses: Option<usize>,
digests: HashMap<C, BTreeMap<Height, (D, Epoch)>>,
contiguous: HashMap<C, Height>,
highest: HashMap<C, (Height, Epoch)>,
}
impl<R, C, S, D> Reporter<R, C, S, D>
where
R: CryptoRngCore,
C: PublicKey,
S: Scheme,
D: Digest,
{
pub fn new(
rng: R,
chunk_verifier: ChunkVerifier,
scheme: S,
limit_misses: Option<usize>,
) -> (Self, Mailbox<C, S, D>) {
let (sender, receiver) = mpsc::channel(1024);
(
Self {
rng,
mailbox: receiver,
chunk_verifier,
scheme,
proposals: HashSet::new(),
limit_misses,
digests: HashMap::new(),
contiguous: HashMap::new(),
highest: HashMap::new(),
},
Mailbox { sender },
)
}
pub async fn run(mut self)
where
S: scheme::Scheme<C, D>,
{
let mut misses = 0;
while let Some(msg) = self.mailbox.recv().await {
match msg {
Message::Proposal(proposal) => {
if !proposal.verify(&self.chunk_verifier) {
panic!("Invalid proof");
}
let encoded = proposal.encode();
Proposal::<C, D>::decode(encoded).unwrap();
self.proposals.insert(proposal.chunk);
}
Message::Locked(lock) => {
if !lock.verify(&mut self.rng, &self.scheme, &Sequential) {
panic!("Invalid proof");
}
let encoded = lock.encode();
Lock::<C, S, D>::decode_cfg(encoded, &self.scheme.certificate_codec_config())
.unwrap();
if let Some(misses_allowed) = self.limit_misses {
if !self.proposals.contains(&lock.chunk) {
misses += 1;
}
assert!(misses <= misses_allowed, "Missed too many proposals");
}
let chunk = lock.chunk;
let digests = self.digests.entry(chunk.sequencer.clone()).or_default();
let entry = digests.entry(chunk.height);
match entry {
Entry::Occupied(mut entry) => {
let (existing_payload, existing_epoch) = entry.get();
assert_eq!(*existing_payload, chunk.payload);
assert_ne!(*existing_epoch, lock.epoch);
if lock.epoch > *existing_epoch {
entry.insert((chunk.payload, lock.epoch));
}
}
Entry::Vacant(entry) => {
entry.insert((chunk.payload, lock.epoch));
}
}
let highest = self
.highest
.get(&chunk.sequencer)
.copied()
.unwrap_or((Height::zero(), Epoch::zero()));
if chunk.height > highest.0 {
self.highest
.insert(chunk.sequencer.clone(), (chunk.height, lock.epoch));
}
let highest = self.contiguous.get(&chunk.sequencer);
if (highest.is_none() && chunk.height.is_zero())
|| (highest.is_some() && chunk.height == highest.unwrap().next())
{
let mut contiguous = chunk.height;
while digests.contains_key(&contiguous.next()) {
contiguous = contiguous.next();
}
self.contiguous.insert(chunk.sequencer, contiguous);
}
}
Message::GetTip(sequencer, sender) => {
let hi = self.highest.get(&sequencer).copied();
sender.send(hi).unwrap();
}
Message::GetContiguousTip(sequencer, sender) => {
let contiguous = self.contiguous.get(&sequencer).copied();
sender.send(contiguous).unwrap();
}
Message::Get(sequencer, height, sender) => {
let digest = self
.digests
.get(&sequencer)
.and_then(|map| map.get(&height))
.cloned();
sender.send(digest).unwrap();
}
}
}
}
}
#[derive(Clone)]
pub struct Mailbox<C: PublicKey, S: Scheme, D: Digest> {
sender: mpsc::Sender<Message<C, S, D>>,
}
impl<C: PublicKey, S: Scheme, D: Digest> crate::Reporter for Mailbox<C, S, D> {
type Activity = Activity<C, S, D>;
async fn report(&mut self, activity: Self::Activity) {
match activity {
Activity::Tip(proposal) => {
self.sender
.send(Message::Proposal(proposal))
.await
.expect("Failed to send proposal");
}
Activity::Lock(lock) => {
self.sender
.send(Message::Locked(lock))
.await
.expect("Failed to send locked");
}
}
}
}
impl<C: PublicKey, S: Scheme, D: Digest> Mailbox<C, S, D> {
pub async fn get_tip(&mut self, sequencer: C) -> Option<(Height, Epoch)> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Message::GetTip(sequencer, sender))
.await
.unwrap();
receiver.await.unwrap()
}
pub async fn get_contiguous_tip(&mut self, sequencer: C) -> Option<Height> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Message::GetContiguousTip(sequencer, sender))
.await
.unwrap();
receiver.await.unwrap()
}
pub async fn get(&mut self, sequencer: C, height: Height) -> Option<(D, Epoch)> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Message::Get(sequencer, height, sender))
.await
.unwrap();
receiver.await.unwrap()
}
}