use super::relay::Relay;
use crate::{
simplex::{types::Context, Plan},
types::{Epoch, Round},
Automaton as Au, CertifiableAutomaton as CAu, Relay as Re,
};
use bytes::Bytes;
use commonware_codec::{DecodeExt, Encode};
use commonware_cryptography::{Digest, Hasher, PublicKey};
use commonware_macros::select_loop;
use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Spawner};
use commonware_utils::channel::{
fallible::{AsyncFallibleExt, OneshotExt},
mpsc, oneshot,
};
use rand::{Rng, RngCore};
use rand_distr::{Distribution, Normal};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use tracing::debug;
pub enum Message<D: Digest, P: PublicKey> {
Genesis {
epoch: Epoch,
response: oneshot::Sender<D>,
},
Propose {
context: Context<D, P>,
response: oneshot::Sender<D>,
},
Verify {
context: Context<D, P>,
payload: D,
response: oneshot::Sender<bool>,
},
Certify {
round: Round,
payload: D,
response: oneshot::Sender<bool>,
},
Broadcast {
payload: D,
},
}
#[derive(Clone)]
pub struct Mailbox<D: Digest, P: PublicKey> {
sender: mpsc::Sender<Message<D, P>>,
}
impl<D: Digest, P: PublicKey> Mailbox<D, P> {
pub(super) const fn new(sender: mpsc::Sender<Message<D, P>>) -> Self {
Self { sender }
}
}
impl<D: Digest, P: PublicKey> Au for Mailbox<D, P> {
type Digest = D;
type Context = Context<D, P>;
async fn genesis(&mut self, epoch: Epoch) -> Self::Digest {
let (response, receiver) = oneshot::channel();
self.sender
.send_lossy(Message::Genesis { epoch, response })
.await;
receiver.await.expect("Failed to receive genesis")
}
async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
let (response, receiver) = oneshot::channel();
self.sender
.send_lossy(Message::Propose { context, response })
.await;
receiver
}
async fn verify(
&mut self,
context: Self::Context,
payload: Self::Digest,
) -> oneshot::Receiver<bool> {
let (response, receiver) = oneshot::channel();
self.sender
.send_lossy(Message::Verify {
context,
payload,
response,
})
.await;
receiver
}
}
impl<D: Digest, P: PublicKey> CAu for Mailbox<D, P> {
async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
self.sender
.send_lossy(Message::Certify {
round,
payload,
response: tx,
})
.await;
rx
}
}
impl<D: Digest, P: PublicKey> Re for Mailbox<D, P> {
type Digest = D;
type PublicKey = P;
type Plan = Plan<P>;
async fn broadcast(&mut self, payload: Self::Digest, _plan: Plan<P>) {
self.sender.send_lossy(Message::Broadcast { payload }).await;
}
}
const GENESIS_BYTES: &[u8] = b"genesis";
type Latency = (f64, f64);
type ProposeObserver<H, P> = Box<dyn Fn(Context<<H as Hasher>::Digest, P>) + Send + 'static>;
type VerifyObserver<H, P> =
Box<dyn Fn(Context<<H as Hasher>::Digest, P>, <H as Hasher>::Digest) + Send + 'static>;
pub enum Certifier<D: Digest> {
Always,
Sometimes,
Custom(Box<dyn Fn(D) -> bool + Send + 'static>),
Cancel,
Pending,
}
pub struct Config<H: Hasher, P: PublicKey> {
pub hasher: H,
pub relay: Arc<Relay<H::Digest, P>>,
pub me: P,
pub propose_latency: Latency,
pub verify_latency: Latency,
pub certify_latency: Latency,
pub should_certify: Certifier<H::Digest>,
}
pub struct Application<E: Clock + RngCore + Spawner, H: Hasher, P: PublicKey> {
context: ContextCell<E>,
hasher: H,
me: P,
relay: Arc<Relay<H::Digest, P>>,
broadcast: mpsc::UnboundedReceiver<(H::Digest, Bytes)>,
mailbox: mpsc::Receiver<Message<H::Digest, P>>,
propose_latency: Normal<f64>,
verify_latency: Normal<f64>,
certify_latency: Normal<f64>,
fail_verification: bool,
drop_proposals: bool,
drop_verifications: bool,
should_certify: Certifier<H::Digest>,
pending: HashMap<H::Digest, Bytes>,
verified: HashSet<H::Digest>,
propose_observer: Option<ProposeObserver<H, P>>,
verify_observer: Option<VerifyObserver<H, P>>,
pending_certifications: Vec<oneshot::Sender<bool>>,
}
impl<E: Clock + RngCore + Spawner, H: Hasher, P: PublicKey> Application<E, H, P> {
pub fn new(context: E, cfg: Config<H, P>) -> (Self, Mailbox<H::Digest, P>) {
let broadcast = cfg.relay.register(cfg.me.clone());
let propose_latency = Normal::new(cfg.propose_latency.0, cfg.propose_latency.1).unwrap();
let verify_latency = Normal::new(cfg.verify_latency.0, cfg.verify_latency.1).unwrap();
let certify_latency = Normal::new(cfg.certify_latency.0, cfg.certify_latency.1).unwrap();
let (sender, receiver) = mpsc::channel(1024);
(
Self {
context: ContextCell::new(context),
hasher: cfg.hasher,
me: cfg.me,
relay: cfg.relay,
broadcast,
mailbox: receiver,
propose_latency,
verify_latency,
certify_latency,
fail_verification: false,
drop_proposals: false,
drop_verifications: false,
should_certify: cfg.should_certify,
pending: HashMap::new(),
verified: HashSet::new(),
propose_observer: None,
verify_observer: None,
pending_certifications: Vec::new(),
},
Mailbox::new(sender),
)
}
pub const fn set_fail_verification(&mut self, fail: bool) {
self.fail_verification = fail;
}
pub const fn set_drop_proposals(&mut self, drop: bool) {
self.drop_proposals = drop;
}
pub const fn set_drop_verifications(&mut self, drop: bool) {
self.drop_verifications = drop;
}
pub fn set_propose_observer(&mut self, observer: ProposeObserver<H, P>) {
self.propose_observer = Some(observer);
}
pub fn set_verify_observer(&mut self, observer: VerifyObserver<H, P>) {
self.verify_observer = Some(observer);
}
#[cfg(not(feature = "mocks"))]
fn panic(&self, msg: &str) -> ! {
panic!("[{:?}] {}", self.me, msg);
}
fn genesis(&mut self, epoch: Epoch) -> H::Digest {
self.hasher
.update(&(Bytes::from(GENESIS_BYTES), epoch).encode());
let digest = self.hasher.finalize();
self.verified.insert(digest);
digest
}
async fn propose(&mut self, context: Context<H::Digest, P>) -> H::Digest {
let duration = self.propose_latency.sample(&mut self.context);
self.context
.sleep(Duration::from_millis(duration as u64))
.await;
let rand = self.context.gen::<u64>();
let payload = (context.round, context.parent.1, rand).encode();
self.hasher.update(&payload);
let digest = self.hasher.finalize();
self.verified.insert(digest);
self.pending.insert(digest, payload);
digest
}
async fn verify(
&mut self,
context: Context<H::Digest, P>,
payload: H::Digest,
mut contents: Bytes,
) -> bool {
let duration = self.verify_latency.sample(&mut self.context);
self.context
.sleep(Duration::from_millis(duration as u64))
.await;
if self.fail_verification {
return false;
}
let Ok((parsed_round, parent, _)) = <(Round, H::Digest, u64)>::decode(&mut contents) else {
#[cfg(feature = "mocks")]
{
return false;
}
#[cfg(not(feature = "mocks"))]
panic!("[{:?}] invalid payload", self.me);
};
if parsed_round != context.round {
#[cfg(feature = "mocks")]
{
return false;
}
#[cfg(not(feature = "mocks"))]
self.panic(&format!(
"invalid round (in payload): {} != {}",
parsed_round, context.round
));
}
if parent != context.parent.1 {
#[cfg(feature = "mocks")]
{
return false;
}
#[cfg(not(feature = "mocks"))]
self.panic(&format!(
"invalid parent (in payload): {:?} != {:?}",
parent, context.parent.1
));
}
self.verified.insert(payload);
true
}
async fn certify(&mut self, payload: H::Digest, _contents: Bytes) -> Option<bool> {
let duration = self.certify_latency.sample(&mut self.context);
self.context
.sleep(Duration::from_millis(duration as u64))
.await;
match &self.should_certify {
Certifier::Always => Some(true),
Certifier::Sometimes => Some((payload.as_ref().last().copied().unwrap_or(0) % 11) < 9),
Certifier::Custom(func) => Some(func(payload)),
Certifier::Cancel | Certifier::Pending => None,
}
}
fn broadcast(&mut self, payload: H::Digest) {
let contents = self.pending.remove(&payload).expect("missing payload");
self.relay.broadcast(&self.me, (payload, contents));
}
pub fn start(mut self) -> Handle<()> {
spawn_cell!(self.context, self.run().await)
}
async fn run(mut self) {
#[allow(clippy::type_complexity)]
let mut waiters: HashMap<
H::Digest,
Vec<(Context<H::Digest, P>, oneshot::Sender<bool>)>,
> = HashMap::new();
let mut seen: HashMap<H::Digest, Bytes> = HashMap::new();
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping application");
},
Some(message) = self.mailbox.recv() else break => {
match message {
Message::Genesis { epoch, response } => {
let digest = self.genesis(epoch);
response.send_lossy(digest);
}
Message::Propose { context, response } => {
if let Some(observer) = &self.propose_observer {
observer(context.clone());
}
if self.drop_proposals {
continue;
}
let digest = self.propose(context).await;
response.send_lossy(digest);
}
Message::Verify {
context,
payload,
response,
} => {
if let Some(observer) = &self.verify_observer {
observer(context.clone(), payload);
}
if self.drop_verifications {
continue;
}
if let Some(contents) = seen.get(&payload) {
let verified = self.verify(context, payload, contents.clone()).await;
response.send_lossy(verified);
} else {
waiters
.entry(payload)
.or_default()
.push((context, response));
}
}
Message::Certify {
round: _,
payload,
response,
} => {
let contents = seen.get(&payload).cloned().unwrap_or_default();
if let Some(certified) = self.certify(payload, contents).await {
response.send_lossy(certified);
} else if matches!(self.should_certify, Certifier::Pending) {
self.pending_certifications.push(response);
}
}
Message::Broadcast { payload } => {
self.broadcast(payload);
}
}
},
Some((digest, contents)) = self.broadcast.recv() else break => {
seen.insert(digest, contents.clone());
if let Some(waiters) = waiters.remove(&digest) {
for (context, sender) in waiters {
let verified = self.verify(context, digest, contents.clone()).await;
sender.send_lossy(verified);
}
}
},
}
}
}