use crate::{
aggregation::{
scheme,
types::{Ack, Activity, Certificate},
},
types::{Epoch, Height},
};
use commonware_codec::{Decode, DecodeExt, Encode};
use commonware_cryptography::{certificate::Scheme, Digest};
use commonware_parallel::Sequential;
use commonware_utils::channel::{mpsc, oneshot};
use rand_core::CryptoRngCore;
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
#[allow(clippy::large_enum_variant)]
enum Message<S: Scheme, D: Digest> {
Ack(Ack<S, D>),
Certified(Certificate<S, D>),
Tip(Height),
GetTip(oneshot::Sender<Option<(Height, Epoch)>>),
GetContiguousTip(oneshot::Sender<Option<Height>>),
Get(Height, oneshot::Sender<Option<(D, Epoch)>>),
}
pub struct Reporter<R: CryptoRngCore, S: Scheme, D: Digest> {
mailbox: mpsc::Receiver<Message<S, D>>,
rng: R,
scheme: S,
acks: HashSet<(Height, Epoch)>,
digests: BTreeMap<Height, (D, Epoch)>,
contiguous: Option<Height>,
highest: Option<(Height, Epoch)>,
current_epoch: Epoch,
}
impl<R, S, D> Reporter<R, S, D>
where
R: CryptoRngCore,
S: scheme::Scheme<D>,
D: Digest,
{
pub fn new(rng: R, scheme: S) -> (Self, Mailbox<S, D>) {
let (sender, receiver) = mpsc::channel(1024);
(
Self {
mailbox: receiver,
rng,
scheme,
acks: HashSet::new(),
digests: BTreeMap::new(),
contiguous: None,
highest: None,
current_epoch: Epoch::new(111), },
Mailbox { sender },
)
}
pub async fn run(mut self) {
while let Some(msg) = self.mailbox.recv().await {
match msg {
Message::Ack(ack) => {
assert!(ack.verify(&mut self.rng, &self.scheme, &Sequential));
let encoded = ack.encode();
Ack::<S, D>::decode(encoded).unwrap();
self.current_epoch = ack.epoch;
self.acks.insert((ack.item.height, ack.epoch));
}
Message::Certified(certificate) => {
assert!(certificate.verify(&mut self.rng, &self.scheme, &Sequential));
let encoded = certificate.encode();
let cfg = self.scheme.certificate_codec_config();
Certificate::<S, D>::decode_cfg(encoded, &cfg).unwrap();
let entry = self.digests.entry(certificate.item.height);
match entry {
Entry::Occupied(mut entry) => {
let (existing_payload, _existing_epoch) = entry.get();
assert_eq!(*existing_payload, certificate.item.digest);
entry.insert((certificate.item.digest, self.current_epoch));
}
Entry::Vacant(entry) => {
entry.insert((certificate.item.digest, self.current_epoch));
}
}
if self
.highest
.is_none_or(|(h, _)| certificate.item.height > h)
{
self.highest = Some((certificate.item.height, self.current_epoch));
}
let mut next_contiguous =
self.contiguous.map(|c| c.next()).unwrap_or(Height::zero());
while self.digests.contains_key(&next_contiguous) {
next_contiguous = next_contiguous.next();
}
if !next_contiguous.is_zero() {
self.contiguous = Some(next_contiguous.previous().unwrap());
}
}
Message::Tip(height) => {
if self.highest.is_none_or(|(h, _)| height > h) {
self.highest = Some((height, self.current_epoch));
}
}
Message::GetTip(sender) => {
sender.send(self.highest).unwrap();
}
Message::GetContiguousTip(sender) => {
sender.send(self.contiguous).unwrap();
}
Message::Get(height, sender) => {
let digest = self.digests.get(&height).cloned();
sender.send(digest).unwrap();
}
}
}
}
}
#[derive(Clone)]
pub struct Mailbox<S: Scheme, D: Digest> {
sender: mpsc::Sender<Message<S, D>>,
}
impl<S, D> crate::Reporter for Mailbox<S, D>
where
S: Scheme,
D: Digest,
{
type Activity = Activity<S, D>;
async fn report(&mut self, activity: Self::Activity) {
match activity {
Activity::Ack(ack) => {
self.sender
.send(Message::Ack(ack))
.await
.expect("Failed to send ack");
}
Activity::Certified(certificate) => {
self.sender
.send(Message::Certified(certificate))
.await
.expect("Failed to send certified signature");
}
Activity::Tip(height) => {
self.sender
.send(Message::Tip(height))
.await
.expect("Failed to send tip");
}
}
}
}
impl<S, D> Mailbox<S, D>
where
S: Scheme,
D: Digest,
{
pub async fn get_tip(&mut self) -> Option<(Height, Epoch)> {
let (sender, receiver) = oneshot::channel();
self.sender.send(Message::GetTip(sender)).await.unwrap();
receiver.await.unwrap()
}
pub async fn get_contiguous_tip(&mut self) -> Option<Height> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Message::GetContiguousTip(sender))
.await
.unwrap();
receiver.await.unwrap()
}
pub async fn get(&mut self, height: Height) -> Option<(D, Epoch)> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Message::Get(height, sender))
.await
.unwrap();
receiver.await.unwrap()
}
}