commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible};
4use commonware_p2p::Recipients;
5use commonware_utils::Array;
6use futures::{
7 channel::{mpsc, oneshot},
8 SinkExt,
9};
10
11pub enum Message<P: Array, M: Committable + Digestible> {
13 Broadcast {
17 recipients: Recipients<P>,
18 message: M,
19 responder: oneshot::Sender<Vec<P>>,
20 },
21
22 Subscribe {
28 peer: Option<P>,
29 commitment: M::Commitment,
30 digest: Option<M::Digest>,
31 responder: oneshot::Sender<M>,
32 },
33
34 Get {
36 peer: Option<P>,
37 commitment: M::Commitment,
38 digest: Option<M::Digest>,
39 responder: oneshot::Sender<Vec<M>>,
40 },
41}
42
43#[derive(Clone)]
45pub struct Mailbox<P: Array, M: Committable + Digestible + Codec> {
46 sender: mpsc::Sender<Message<P, M>>,
47}
48
49impl<P: Array, M: Committable + Digestible + Codec> Mailbox<P, M> {
50 pub(super) fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
51 Self { sender }
52 }
53}
54
55impl<P: Array, M: Committable + Digestible + Codec> Mailbox<P, M> {
56 pub async fn subscribe(
62 &mut self,
63 peer: Option<P>,
64 commitment: M::Commitment,
65 digest: Option<M::Digest>,
66 ) -> oneshot::Receiver<M> {
67 let (responder, receiver) = oneshot::channel();
68 self.sender
69 .send(Message::Subscribe {
70 peer,
71 commitment,
72 digest,
73 responder,
74 })
75 .await
76 .expect("mailbox closed");
77 receiver
78 }
79
80 pub async fn subscribe_prepared(
87 &mut self,
88 peer: Option<P>,
89 commitment: M::Commitment,
90 digest: Option<M::Digest>,
91 responder: oneshot::Sender<M>,
92 ) {
93 self.sender
94 .send(Message::Subscribe {
95 peer,
96 commitment,
97 digest,
98 responder,
99 })
100 .await
101 .expect("mailbox closed");
102 }
103
104 pub async fn get(
106 &mut self,
107 peer: Option<P>,
108 commitment: M::Commitment,
109 digest: Option<M::Digest>,
110 ) -> Vec<M> {
111 let (responder, receiver) = oneshot::channel();
112 self.sender
113 .send(Message::Get {
114 peer,
115 commitment,
116 digest,
117 responder,
118 })
119 .await
120 .expect("mailbox closed");
121 receiver.await.expect("mailbox closed")
122 }
123}
124
125impl<P: Array, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
126 type Recipients = Recipients<P>;
127 type Message = M;
128 type Response = Vec<P>;
129
130 async fn broadcast(
131 &mut self,
132 recipients: Self::Recipients,
133 message: Self::Message,
134 ) -> oneshot::Receiver<Self::Response> {
135 let (sender, receiver) = oneshot::channel();
136 self.sender
137 .send(Message::Broadcast {
138 recipients,
139 message,
140 responder: sender,
141 })
142 .await
143 .expect("mailbox closed");
144 receiver
145 }
146}