commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digest, Digestible};
4use commonware_p2p::Recipients;
5use commonware_utils::Array;
6use futures::{
7 channel::{mpsc, oneshot},
8 SinkExt,
9};
10
11pub enum Message<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd>> {
13 Broadcast {
17 recipients: Recipients<P>,
18 message: M,
19 responder: oneshot::Sender<Vec<P>>,
20 },
21
22 Subscribe {
28 peer: Option<P>,
29 commitment: Dc,
30 digest: Option<Dd>,
31 responder: oneshot::Sender<M>,
32 },
33
34 Get {
36 peer: Option<P>,
37 commitment: Dc,
38 digest: Option<Dd>,
39 responder: oneshot::Sender<Vec<M>>,
40 },
41}
42
43#[derive(Clone)]
45pub struct Mailbox<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec> {
46 sender: mpsc::Sender<Message<P, Dc, Dd, M>>,
47}
48
49impl<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec>
50 Mailbox<P, Dc, Dd, M>
51{
52 pub(super) fn new(sender: mpsc::Sender<Message<P, Dc, Dd, M>>) -> Self {
53 Self { sender }
54 }
55}
56
57impl<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec>
58 Mailbox<P, Dc, Dd, M>
59{
60 pub async fn subscribe(
66 &mut self,
67 peer: Option<P>,
68 commitment: Dc,
69 digest: Option<Dd>,
70 ) -> oneshot::Receiver<M> {
71 let (responder, receiver) = oneshot::channel();
72 self.sender
73 .send(Message::Subscribe {
74 peer,
75 commitment,
76 digest,
77 responder,
78 })
79 .await
80 .expect("mailbox closed");
81 receiver
82 }
83
84 pub async fn subscribe_prepared(
91 &mut self,
92 peer: Option<P>,
93 commitment: Dc,
94 digest: Option<Dd>,
95 responder: oneshot::Sender<M>,
96 ) {
97 self.sender
98 .send(Message::Subscribe {
99 peer,
100 commitment,
101 digest,
102 responder,
103 })
104 .await
105 .expect("mailbox closed");
106 }
107
108 pub async fn get(&mut self, peer: Option<P>, commitment: Dc, digest: Option<Dd>) -> Vec<M> {
110 let (responder, receiver) = oneshot::channel();
111 self.sender
112 .send(Message::Get {
113 peer,
114 commitment,
115 digest,
116 responder,
117 })
118 .await
119 .expect("mailbox closed");
120 receiver.await.expect("mailbox closed")
121 }
122}
123
124impl<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec> Broadcaster
125 for Mailbox<P, Dc, Dd, M>
126{
127 type Recipients = Recipients<P>;
128 type Message = M;
129 type Response = Vec<P>;
130
131 async fn broadcast(
132 &mut self,
133 recipients: Self::Recipients,
134 message: Self::Message,
135 ) -> oneshot::Receiver<Self::Response> {
136 let (sender, receiver) = oneshot::channel();
137 self.sender
138 .send(Message::Broadcast {
139 recipients,
140 message,
141 responder: sender,
142 })
143 .await
144 .expect("mailbox closed");
145 receiver
146 }
147}