use super::{
ingress::{Handler, HandlerMessage, Mailbox, MailboxMessage},
Config,
};
use crate::{
simplex::{
actors::{resolver::state::State, voter},
scheme::Scheme,
types::Certificate,
},
types::{Epoch, View},
Epochable, Viewable,
};
use bytes::Bytes;
use commonware_codec::{Decode, Encode};
use commonware_cryptography::Digest;
use commonware_macros::select_loop;
use commonware_p2p::{utils::StaticProvider, Blocker, Receiver, Sender};
use commonware_parallel::Strategy;
use commonware_resolver::p2p;
use commonware_runtime::{spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner};
use commonware_utils::{
channel::{fallible::OneshotExt, mpsc},
ordered::Quorum,
sequence::U64,
};
use rand_core::CryptoRngCore;
use std::time::Duration;
use tracing::debug;
pub struct Actor<
E: BufferPooler + Clock + CryptoRngCore + Metrics + Spawner,
S: Scheme<D>,
B: Blocker<PublicKey = S::PublicKey>,
D: Digest,
T: Strategy,
> {
context: ContextCell<E>,
scheme: S,
blocker: Option<B>,
strategy: T,
epoch: Epoch,
mailbox_size: usize,
fetch_timeout: Duration,
state: State<S, D>,
mailbox_receiver: mpsc::Receiver<MailboxMessage<S, D>>,
}
impl<
E: BufferPooler + Clock + CryptoRngCore + Metrics + Spawner,
S: Scheme<D>,
B: Blocker<PublicKey = S::PublicKey>,
D: Digest,
T: Strategy,
> Actor<E, S, B, D, T>
{
pub fn new(context: E, cfg: Config<S, B, T>) -> (Self, Mailbox<S, D>) {
let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
(
Self {
context: ContextCell::new(context),
scheme: cfg.scheme,
blocker: Some(cfg.blocker),
strategy: cfg.strategy,
epoch: cfg.epoch,
mailbox_size: cfg.mailbox_size,
fetch_timeout: cfg.fetch_timeout,
state: State::new(cfg.fetch_concurrent),
mailbox_receiver: receiver,
},
Mailbox::new(sender),
)
}
pub fn start(
mut self,
voter: voter::Mailbox<S, D>,
sender: impl Sender<PublicKey = S::PublicKey>,
receiver: impl Receiver<PublicKey = S::PublicKey>,
) -> Handle<()> {
spawn_cell!(self.context, self.run(voter, sender, receiver).await)
}
async fn run(
mut self,
mut voter: voter::Mailbox<S, D>,
sender: impl Sender<PublicKey = S::PublicKey>,
receiver: impl Receiver<PublicKey = S::PublicKey>,
) {
let participants = self.scheme.participants().clone();
let me = self
.scheme
.me()
.and_then(|index| participants.key(index))
.cloned();
let (handler_tx, mut handler_rx) = mpsc::channel(self.mailbox_size);
let handler = Handler::new(handler_tx);
let (resolver_engine, mut resolver) = p2p::Engine::new(
self.context.with_label("resolver"),
p2p::Config {
peer_provider: StaticProvider::new(self.epoch.get(), participants),
blocker: self.blocker.take().expect("blocker must be set"),
consumer: handler.clone(),
producer: handler,
mailbox_size: self.mailbox_size,
me,
initial: self.fetch_timeout / 2,
timeout: self.fetch_timeout,
fetch_retry_timeout: self.fetch_timeout,
priority_requests: true,
priority_responses: false,
},
);
let mut resolver_task = resolver_engine.start((sender, receiver));
select_loop! {
self.context,
on_stopped => {
debug!("context shutdown, stopping resolver");
},
_ = &mut resolver_task => {
break;
},
Some(message) = self.mailbox_receiver.recv() else break => {
match message {
MailboxMessage::Certificate(certificate) => {
self.state.handle(certificate, None, &mut resolver).await;
}
MailboxMessage::Certified { view, success } => {
self.state
.handle_certified(view, success, &mut resolver)
.await;
}
}
},
Some(message) = handler_rx.recv() else break => {
self.handle_resolver(message, &mut voter, &mut resolver)
.await;
},
}
}
fn validate(&mut self, view: View, data: Bytes) -> Option<Certificate<S, D>> {
let incoming =
Certificate::<S, D>::decode_cfg(data, &self.scheme.certificate_codec_config()).ok()?;
match incoming {
Certificate::Notarization(notarization) => {
let notarization_view = notarization.view();
if notarization.view() < view {
debug!(%view, received = %notarization.view(), "notarization below view");
return None;
}
if notarization.epoch() != self.epoch {
debug!(
epoch = %notarization.epoch(),
expected = %self.epoch,
"rejecting notarization from different epoch"
);
return None;
}
if self.state.is_failed(notarization_view) {
debug!(
%notarization_view,
"rejecting notarization for view with failed certification"
);
return None;
}
if !notarization.verify(&mut self.context, &self.scheme, &self.strategy) {
debug!(%view, "notarization failed verification");
return None;
}
debug!(%view, received = %notarization_view, "received notarization for request");
Some(Certificate::Notarization(notarization))
}
Certificate::Finalization(finalization) => {
if finalization.view() < view {
debug!(%view, received = %finalization.view(), "finalization below view");
return None;
}
if finalization.epoch() != self.epoch {
debug!(
epoch = %finalization.epoch(),
expected = %self.epoch,
"rejecting finalization from different epoch"
);
return None;
}
if !finalization.verify(&mut self.context, &self.scheme, &self.strategy) {
debug!(%view, "finalization failed verification");
return None;
}
debug!(%view, received = %finalization.view(), "received finalization for request");
Some(Certificate::Finalization(finalization))
}
Certificate::Nullification(nullification) => {
if nullification.view() != view {
debug!(%view, received = %nullification.view(), "nullification view mismatch");
return None;
}
if nullification.epoch() != self.epoch {
debug!(
epoch = %nullification.epoch(),
expected = %self.epoch,
"rejecting nullification from different epoch"
);
return None;
}
if !nullification.verify::<_, D>(&mut self.context, &self.scheme, &self.strategy) {
debug!(%view, "nullification failed verification");
return None;
}
debug!(%view, received = %nullification.view(), "received nullification for request");
Some(Certificate::Nullification(nullification))
}
}
}
async fn handle_resolver(
&mut self,
message: HandlerMessage,
voter: &mut voter::Mailbox<S, D>,
resolver: &mut p2p::Mailbox<U64, S::PublicKey>,
) {
match message {
HandlerMessage::Deliver {
view,
data,
response,
} => {
let Some(parsed) = self.validate(view, data) else {
response.send_lossy(false);
return;
};
response.send_lossy(true);
voter.resolved(parsed.clone()).await;
self.state.handle(parsed, Some(view), resolver).await;
}
HandlerMessage::Produce { view, response } => {
let Some(certificate) = self.state.get(view) else {
return;
};
response.send_lossy(certificate.encode());
}
}
}
}