use crate::stateful::Application;
use commonware_actor::{
mailbox::{Overflow, Policy, Sender},
Feedback,
};
use commonware_consensus::{marshal::Update, Application as ConsensusApplication, Reporter};
use commonware_runtime::{Clock, Metrics, Spawner};
use commonware_utils::{acknowledgement::Exact, channel::oneshot};
use futures::Stream;
use rand::Rng;
use std::{collections::VecDeque, pin::Pin};
pub(crate) type ErasedAncestorStream<B> = Pin<Box<dyn Stream<Item = B> + Send>>;
pub(crate) enum Message<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
Propose {
context: (E, A::Context),
ancestry: ErasedAncestorStream<A::Block>,
response: oneshot::Sender<Option<A::Block>>,
},
Verify {
context: (E, A::Context),
ancestry: ErasedAncestorStream<A::Block>,
response: oneshot::Sender<bool>,
},
Finalized {
block: A::Block,
acknowledgement: Exact,
},
SubscribeDatabases {
response: oneshot::Sender<A::Databases>,
},
}
impl<E, A> Message<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
fn response_closed(&self) -> bool {
match self {
Self::Propose { response, .. } => response.is_closed(),
Self::Verify { response, .. } => response.is_closed(),
Self::SubscribeDatabases { response } => response.is_closed(),
Self::Finalized { .. } => false,
}
}
}
pub(crate) struct Pending<E, A>(VecDeque<Message<E, A>>)
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>;
impl<E, A> Default for Pending<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
fn default() -> Self {
Self(VecDeque::new())
}
}
impl<E, A> Overflow<Message<E, A>> for Pending<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn drain<F>(&mut self, mut push: F)
where
F: FnMut(Message<E, A>) -> Option<Message<E, A>>,
{
while let Some(message) = self.0.pop_front() {
if message.response_closed() {
continue;
}
if let Some(message) = push(message) {
self.0.push_front(message);
break;
}
}
}
}
impl<E, A> Policy for Message<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
type Overflow = Pending<E, A>;
fn handle(overflow: &mut Self::Overflow, message: Self) {
if message.response_closed() {
return;
}
overflow.0.push_back(message);
}
}
pub struct Mailbox<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
sender: Sender<Message<E, A>>,
}
impl<E, A> Clone for Mailbox<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<E, A> Mailbox<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
pub(crate) const fn new(sender: Sender<Message<E, A>>) -> Self {
Self { sender }
}
}
impl<E, A> Mailbox<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
pub async fn subscribe_databases(&self) -> A::Databases {
let (response, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::SubscribeDatabases { response });
receiver
.await
.expect("stateful actor dropped during subscribe_databases")
}
}
impl<E, A> ConsensusApplication<E> for Mailbox<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
type SigningScheme = A::SigningScheme;
type Context = A::Context;
type Block = A::Block;
async fn propose(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send + 'static,
) -> Option<Self::Block> {
let (response, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Propose {
context,
ancestry: Box::pin(ancestry),
response,
});
receiver.await.ok().flatten()
}
async fn verify(
&mut self,
context: (E, Self::Context),
ancestry: impl Stream<Item = Self::Block> + Send + 'static,
) -> bool {
let (response, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Verify {
context,
ancestry: Box::pin(ancestry),
response,
});
receiver
.await
.expect("stateful actor dropped during verify")
}
}
impl<E, A> Reporter for Mailbox<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
type Activity = Update<A::Block>;
fn report(&mut self, activity: Self::Activity) -> Feedback {
let message = match activity {
Update::Tip(_, _, _) => return Feedback::Ok,
Update::Block(block, acknowledgement) => Message::Finalized {
block,
acknowledgement,
},
};
self.sender.enqueue(message)
}
}