commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use futures::{
6 channel::{mpsc, oneshot},
7 SinkExt,
8};
9
10pub enum Message<P: PublicKey, M: Committable + Digestible> {
12 Broadcast {
16 recipients: Recipients<P>,
17 message: M,
18 responder: oneshot::Sender<Vec<P>>,
19 },
20
21 Subscribe {
27 peer: Option<P>,
28 commitment: M::Commitment,
29 digest: Option<M::Digest>,
30 responder: oneshot::Sender<M>,
31 },
32
33 Get {
35 peer: Option<P>,
36 commitment: M::Commitment,
37 digest: Option<M::Digest>,
38 responder: oneshot::Sender<Vec<M>>,
39 },
40}
41
42#[derive(Clone)]
44pub struct Mailbox<P: PublicKey, M: Committable + Digestible + Codec> {
45 sender: mpsc::Sender<Message<P, M>>,
46}
47
48impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
49 pub(super) fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
50 Self { sender }
51 }
52}
53
54impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
55 pub async fn subscribe(
61 &mut self,
62 peer: Option<P>,
63 commitment: M::Commitment,
64 digest: Option<M::Digest>,
65 ) -> oneshot::Receiver<M> {
66 let (responder, receiver) = oneshot::channel();
67 self.sender
68 .send(Message::Subscribe {
69 peer,
70 commitment,
71 digest,
72 responder,
73 })
74 .await
75 .expect("mailbox closed");
76 receiver
77 }
78
79 pub async fn subscribe_prepared(
86 &mut self,
87 peer: Option<P>,
88 commitment: M::Commitment,
89 digest: Option<M::Digest>,
90 responder: oneshot::Sender<M>,
91 ) {
92 self.sender
93 .send(Message::Subscribe {
94 peer,
95 commitment,
96 digest,
97 responder,
98 })
99 .await
100 .expect("mailbox closed");
101 }
102
103 pub async fn get(
105 &mut self,
106 peer: Option<P>,
107 commitment: M::Commitment,
108 digest: Option<M::Digest>,
109 ) -> Vec<M> {
110 let (responder, receiver) = oneshot::channel();
111 self.sender
112 .send(Message::Get {
113 peer,
114 commitment,
115 digest,
116 responder,
117 })
118 .await
119 .expect("mailbox closed");
120 receiver.await.expect("mailbox closed")
121 }
122}
123
124impl<P: PublicKey, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
125 type Recipients = Recipients<P>;
126 type Message = M;
127 type Response = Vec<P>;
128
129 async fn broadcast(
130 &mut self,
131 recipients: Self::Recipients,
132 message: Self::Message,
133 ) -> oneshot::Receiver<Self::Response> {
134 let (sender, receiver) = oneshot::channel();
135 self.sender
136 .send(Message::Broadcast {
137 recipients,
138 message,
139 responder: sender,
140 })
141 .await
142 .expect("mailbox closed");
143 receiver
144 }
145}