use crate::Broadcaster;
use commonware_actor::{
mailbox::{Overflow, Policy, Sender},
Feedback,
};
use commonware_codec::Codec;
use commonware_cryptography::{Digestible, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::oneshot;
use std::collections::VecDeque;
pub(crate) enum Message<P: PublicKey, M: Digestible> {
Broadcast {
recipients: Recipients<P>,
message: M,
},
Subscribe {
digest: M::Digest,
responder: oneshot::Sender<M>,
},
Get {
digest: M::Digest,
responder: oneshot::Sender<Option<M>>,
},
}
impl<P: PublicKey, M: Digestible> Message<P, M> {
fn response_closed(&self) -> bool {
match self {
Self::Subscribe { responder, .. } => responder.is_closed(),
Self::Get { responder, .. } => responder.is_closed(),
Self::Broadcast { .. } => false,
}
}
}
pub(crate) struct Pending<P: PublicKey, M: Digestible>(VecDeque<Message<P, M>>);
impl<P: PublicKey, M: Digestible> Default for Pending<P, M> {
fn default() -> Self {
Self(VecDeque::new())
}
}
impl<P: PublicKey, M: Digestible> Overflow<Message<P, M>> for Pending<P, M> {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
fn drain<F>(&mut self, mut push: F)
where
F: FnMut(Message<P, M>) -> Option<Message<P, M>>,
{
while let Some(message) = self.0.pop_front() {
if message.response_closed() {
continue;
}
if let Some(message) = push(message) {
self.0.push_front(message);
break;
}
}
}
}
impl<P: PublicKey, M: Digestible> Policy for Message<P, M> {
type Overflow = Pending<P, M>;
fn handle(overflow: &mut Self::Overflow, message: Self) {
if message.response_closed() {
return;
}
overflow.0.push_back(message);
}
}
#[derive(Clone)]
pub struct Mailbox<P: PublicKey, M: Digestible + Codec> {
sender: Sender<Message<P, M>>,
}
impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
pub(super) const fn new(sender: Sender<Message<P, M>>) -> Self {
Self { sender }
}
pub fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver<M> {
let (responder, receiver) = oneshot::channel();
let _ = self
.sender
.enqueue(Message::Subscribe { digest, responder });
receiver
}
pub fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender<M>) {
let _ = self
.sender
.enqueue(Message::Subscribe { digest, responder });
}
pub async fn get(&self, digest: M::Digest) -> Option<M> {
let (responder, receiver) = oneshot::channel();
let _ = self.sender.enqueue(Message::Get { digest, responder });
receiver.await.unwrap_or_default()
}
}
impl<P: PublicKey, M: Digestible + Codec> Broadcaster for Mailbox<P, M> {
type Recipients = Recipients<P>;
type Message = M;
fn broadcast(&self, recipients: Self::Recipients, message: Self::Message) -> Feedback {
self.sender.enqueue(Message::Broadcast {
recipients,
message,
})
}
}