commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
6
7pub enum Message<P: PublicKey, M: Digestible> {
9 Broadcast {
13 recipients: Recipients<P>,
14 message: M,
15 responder: oneshot::Sender<Vec<P>>,
16 },
17
18 Subscribe {
24 digest: M::Digest,
25 responder: oneshot::Sender<M>,
26 },
27
28 Get {
30 digest: M::Digest,
31 responder: oneshot::Sender<Option<M>>,
32 },
33}
34
35#[derive(Clone)]
37pub struct Mailbox<P: PublicKey, M: Digestible + Codec> {
38 sender: mpsc::Sender<Message<P, M>>,
39}
40
41impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
42 pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
43 Self { sender }
44 }
45
46 pub async fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver<M> {
54 let (responder, receiver) = oneshot::channel();
55 self.sender
56 .send_lossy(Message::Subscribe { digest, responder })
57 .await;
58 receiver
59 }
60
61 pub async fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender<M>) {
69 self.sender
70 .send_lossy(Message::Subscribe { digest, responder })
71 .await;
72 }
73
74 pub async fn get(&self, digest: M::Digest) -> Option<M> {
78 self.sender
79 .request(|responder| Message::Get { digest, responder })
80 .await
81 .unwrap_or_default()
82 }
83}
84
85impl<P: PublicKey, M: Digestible + Codec> Broadcaster for Mailbox<P, M> {
86 type Recipients = Recipients<P>;
87 type Message = M;
88 type Response = Vec<P>;
89
90 async fn broadcast(
94 &self,
95 recipients: Self::Recipients,
96 message: Self::Message,
97 ) -> oneshot::Receiver<Self::Response> {
98 let (sender, receiver) = oneshot::channel();
99 self.sender
100 .send_lossy(Message::Broadcast {
101 recipients,
102 message,
103 responder: sender,
104 })
105 .await;
106 receiver
107 }
108}