commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::{Codec, Config};
3use commonware_cryptography::{Digest, Digestible};
4use commonware_utils::Array;
5use futures::{
6 channel::{mpsc, oneshot},
7 SinkExt,
8};
9
10pub enum Message<P: Array, D: Digest, M: Digestible<D>> {
12 Broadcast {
16 message: M,
17 responder: oneshot::Sender<Vec<P>>,
18 },
19
20 Subscribe {
25 digest: D,
26 responder: oneshot::Sender<M>,
27 },
28
29 Get {
31 digest: D,
32 responder: oneshot::Sender<Option<M>>,
33 },
34}
35
36#[derive(Clone)]
38pub struct Mailbox<P: Array, D: Digest, M: Digestible<D>> {
39 sender: mpsc::Sender<Message<P, D, M>>,
40}
41
42impl<P: Array, D: Digest, M: Digestible<D>> Mailbox<P, D, M> {
43 pub(super) fn new(sender: mpsc::Sender<Message<P, D, M>>) -> Self {
44 Self { sender }
45 }
46}
47
48impl<P: Array, D: Digest, M: Digestible<D>> Mailbox<P, D, M> {
49 pub async fn subscribe(&mut self, digest: D) -> oneshot::Receiver<M> {
54 let (sender, receiver) = oneshot::channel();
55 self.sender
56 .send(Message::Subscribe {
57 digest,
58 responder: sender,
59 })
60 .await
61 .expect("mailbox closed");
62 receiver
63 }
64
65 pub async fn subscribe_prepared(&mut self, digest: D, sender: oneshot::Sender<M>) {
70 self.sender
71 .send(Message::Subscribe {
72 digest,
73 responder: sender,
74 })
75 .await
76 .expect("mailbox closed");
77 }
78
79 pub async fn get(&mut self, digest: D) -> Option<M> {
81 let (sender, receiver) = oneshot::channel();
82 self.sender
83 .send(Message::Get {
84 digest,
85 responder: sender,
86 })
87 .await
88 .expect("mailbox closed");
89 receiver.await.unwrap_or(None)
90 }
91}
92
93impl<Cfg: Config, P: Array, D: Digest, M: Codec<Cfg> + Digestible<D>> Broadcaster<Cfg>
94 for Mailbox<P, D, M>
95{
96 type Message = M;
97 type Response = Vec<P>;
98
99 async fn broadcast(&mut self, message: Self::Message) -> oneshot::Receiver<Vec<P>> {
100 let (sender, receiver) = oneshot::channel();
101 self.sender
102 .send(Message::Broadcast {
103 message,
104 responder: sender,
105 })
106 .await
107 .expect("mailbox closed");
108 receiver
109 }
110}