use crate::Broadcaster;
use commonware_codec::Codec;
use commonware_cryptography::{Digestible, PublicKey};
use commonware_p2p::Recipients;
use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
pub enum Message<P: PublicKey, M: Digestible> {
Broadcast {
recipients: Recipients<P>,
message: M,
responder: oneshot::Sender<Vec<P>>,
},
Subscribe {
digest: M::Digest,
responder: oneshot::Sender<M>,
},
Get {
digest: M::Digest,
responder: oneshot::Sender<Option<M>>,
},
}
#[derive(Clone)]
pub struct Mailbox<P: PublicKey, M: Digestible + Codec> {
sender: mpsc::Sender<Message<P, M>>,
}
impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
Self { sender }
}
pub async fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver<M> {
let (responder, receiver) = oneshot::channel();
self.sender
.send_lossy(Message::Subscribe { digest, responder })
.await;
receiver
}
pub async fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender<M>) {
self.sender
.send_lossy(Message::Subscribe { digest, responder })
.await;
}
pub async fn get(&self, digest: M::Digest) -> Option<M> {
self.sender
.request(|responder| Message::Get { digest, responder })
.await
.unwrap_or_default()
}
}
impl<P: PublicKey, M: Digestible + Codec> Broadcaster for Mailbox<P, M> {
type Recipients = Recipients<P>;
type Message = M;
type Response = Vec<P>;
async fn broadcast(
&self,
recipients: Self::Recipients,
message: Self::Message,
) -> oneshot::Receiver<Self::Response> {
let (sender, receiver) = oneshot::channel();
self.sender
.send_lossy(Message::Broadcast {
recipients,
message,
responder: sender,
})
.await;
receiver
}
}