commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use commonware_utils::channels::fallible::AsyncFallibleExt;
6use futures::channel::{mpsc, oneshot};
7
8pub enum Message<P: PublicKey, M: Committable + Digestible> {
10 Broadcast {
14 recipients: Recipients<P>,
15 message: M,
16 responder: oneshot::Sender<Vec<P>>,
17 },
18
19 Subscribe {
25 peer: Option<P>,
26 commitment: M::Commitment,
27 digest: Option<M::Digest>,
28 responder: oneshot::Sender<M>,
29 },
30
31 Get {
33 peer: Option<P>,
34 commitment: M::Commitment,
35 digest: Option<M::Digest>,
36 responder: oneshot::Sender<Vec<M>>,
37 },
38}
39
40#[derive(Clone)]
42pub struct Mailbox<P: PublicKey, M: Committable + Digestible + Codec> {
43 sender: mpsc::Sender<Message<P, M>>,
44}
45
46impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
47 pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
48 Self { sender }
49 }
50
51 pub async fn subscribe(
59 &mut self,
60 peer: Option<P>,
61 commitment: M::Commitment,
62 digest: Option<M::Digest>,
63 ) -> oneshot::Receiver<M> {
64 let (responder, receiver) = oneshot::channel();
65 self.sender
66 .send_lossy(Message::Subscribe {
67 peer,
68 commitment,
69 digest,
70 responder,
71 })
72 .await;
73 receiver
74 }
75
76 pub async fn subscribe_prepared(
85 &mut self,
86 peer: Option<P>,
87 commitment: M::Commitment,
88 digest: Option<M::Digest>,
89 responder: oneshot::Sender<M>,
90 ) {
91 self.sender
92 .send_lossy(Message::Subscribe {
93 peer,
94 commitment,
95 digest,
96 responder,
97 })
98 .await;
99 }
100
101 pub async fn get(
105 &mut self,
106 peer: Option<P>,
107 commitment: M::Commitment,
108 digest: Option<M::Digest>,
109 ) -> Vec<M> {
110 self.sender
111 .request(|responder| Message::Get {
112 peer,
113 commitment,
114 digest,
115 responder,
116 })
117 .await
118 .unwrap_or_default()
119 }
120}
121
122impl<P: PublicKey, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
123 type Recipients = Recipients<P>;
124 type Message = M;
125 type Response = Vec<P>;
126
127 async fn broadcast(
131 &mut self,
132 recipients: Self::Recipients,
133 message: Self::Message,
134 ) -> oneshot::Receiver<Self::Response> {
135 let (sender, receiver) = oneshot::channel();
136 self.sender
137 .send_lossy(Message::Broadcast {
138 recipients,
139 message,
140 responder: sender,
141 })
142 .await;
143 receiver
144 }
145}