1use crate::{
4 marshal::coding::types::CodedBlock,
5 types::{coding::Commitment, Round},
6 CertifiableBlock,
7};
8use commonware_actor::mailbox::{Overflow, Policy, Sender};
9use commonware_coding::Scheme as CodingScheme;
10use commonware_cryptography::{Hasher, PublicKey};
11use commonware_utils::channel::oneshot;
12use std::collections::VecDeque;
13
14pub(crate) enum Message<B, C, H, P>
18where
19 B: CertifiableBlock,
20 C: CodingScheme,
21 H: Hasher,
22 P: PublicKey,
23{
24 Proposed {
26 block: CodedBlock<B, C, H>,
28 round: Round,
30 },
31 Discovered {
33 commitment: Commitment,
35 leader: P,
37 round: Round,
39 },
40 Notarized {
46 commitment: Commitment,
48 round: Round,
50 },
51 GetByCommitment {
53 commitment: Commitment,
55 response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
57 },
58 GetByDigest {
60 digest: B::Digest,
62 response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
64 },
65 SubscribeAssignedShardVerified {
76 commitment: Commitment,
78 response: oneshot::Sender<()>,
80 },
81 SubscribeByCommitment {
84 commitment: Commitment,
86 response: oneshot::Sender<CodedBlock<B, C, H>>,
88 },
89 SubscribeByDigest {
92 digest: B::Digest,
94 response: oneshot::Sender<CodedBlock<B, C, H>>,
96 },
97 Prune {
99 through: Commitment,
101 },
102}
103
104impl<B, C, H, P> Message<B, C, H, P>
105where
106 B: CertifiableBlock,
107 C: CodingScheme,
108 H: Hasher,
109 P: PublicKey,
110{
111 pub(crate) fn response_closed(&self) -> bool {
112 match self {
113 Self::GetByCommitment { response, .. } | Self::GetByDigest { response, .. } => {
114 response.is_closed()
115 }
116 Self::SubscribeAssignedShardVerified { response, .. } => response.is_closed(),
117 Self::SubscribeByCommitment { response, .. }
118 | Self::SubscribeByDigest { response, .. } => response.is_closed(),
119 Self::Proposed { .. }
120 | Self::Discovered { .. }
121 | Self::Notarized { .. }
122 | Self::Prune { .. } => false,
123 }
124 }
125}
126
127pub(crate) struct Pending<B, C, H, P>(VecDeque<Message<B, C, H, P>>)
128where
129 B: CertifiableBlock,
130 C: CodingScheme,
131 H: Hasher,
132 P: PublicKey;
133
134impl<B, C, H, P> Default for Pending<B, C, H, P>
135where
136 B: CertifiableBlock,
137 C: CodingScheme,
138 H: Hasher,
139 P: PublicKey,
140{
141 fn default() -> Self {
142 Self(VecDeque::new())
143 }
144}
145
146impl<B, C, H, P> Overflow<Message<B, C, H, P>> for Pending<B, C, H, P>
147where
148 B: CertifiableBlock,
149 C: CodingScheme,
150 H: Hasher,
151 P: PublicKey,
152{
153 fn is_empty(&self) -> bool {
154 self.0.is_empty()
155 }
156
157 fn drain<F>(&mut self, mut push: F)
158 where
159 F: FnMut(Message<B, C, H, P>) -> Option<Message<B, C, H, P>>,
160 {
161 while let Some(message) = self.0.pop_front() {
162 if message.response_closed() {
163 continue;
164 }
165
166 if let Some(message) = push(message) {
167 self.0.push_front(message);
168 break;
169 }
170 }
171 }
172}
173
174impl<B, C, H, P> Policy for Message<B, C, H, P>
175where
176 B: CertifiableBlock,
177 C: CodingScheme,
178 H: Hasher,
179 P: PublicKey,
180{
181 type Overflow = Pending<B, C, H, P>;
182
183 fn handle(overflow: &mut Self::Overflow, message: Self) {
184 if message.response_closed() {
185 return;
186 }
187
188 overflow.0.push_back(message);
189 }
190}
191
192#[derive(Clone)]
196pub struct Mailbox<B, C, H, P>
197where
198 B: CertifiableBlock,
199 C: CodingScheme,
200 H: Hasher,
201 P: PublicKey,
202{
203 pub(super) sender: Sender<Message<B, C, H, P>>,
204}
205
206impl<B, C, H, P> Mailbox<B, C, H, P>
207where
208 B: CertifiableBlock,
209 C: CodingScheme,
210 H: Hasher,
211 P: PublicKey,
212{
213 pub(crate) const fn new(sender: Sender<Message<B, C, H, P>>) -> Self {
215 Self { sender }
216 }
217
218 pub fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
220 let _ = self.sender.enqueue(Message::Proposed { block, round });
221 }
222
223 pub fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
225 let _ = self.sender.enqueue(Message::Discovered {
226 commitment,
227 leader,
228 round,
229 });
230 }
231
232 pub fn notarized(&self, commitment: Commitment, round: Round) {
239 let _ = self
240 .sender
241 .enqueue(Message::Notarized { commitment, round });
242 }
243
244 pub async fn get(&self, commitment: Commitment) -> Option<CodedBlock<B, C, H>> {
246 let (response, receiver) = oneshot::channel();
247 let _ = self.sender.enqueue(Message::GetByCommitment {
248 commitment,
249 response,
250 });
251 receiver.await.ok().flatten()
252 }
253
254 pub async fn get_by_digest(&self, digest: B::Digest) -> Option<CodedBlock<B, C, H>> {
256 let (response, receiver) = oneshot::channel();
257 let _ = self
258 .sender
259 .enqueue(Message::GetByDigest { digest, response });
260 receiver.await.ok().flatten()
261 }
262
263 pub fn subscribe_assigned_shard_verified(
274 &self,
275 commitment: Commitment,
276 ) -> oneshot::Receiver<()> {
277 let (responder, receiver) = oneshot::channel();
278 let _ = self
279 .sender
280 .enqueue(Message::SubscribeAssignedShardVerified {
281 commitment,
282 response: responder,
283 });
284 receiver
285 }
286
287 pub fn subscribe(&self, commitment: Commitment) -> oneshot::Receiver<CodedBlock<B, C, H>> {
289 let (responder, receiver) = oneshot::channel();
290 let _ = self.sender.enqueue(Message::SubscribeByCommitment {
291 commitment,
292 response: responder,
293 });
294 receiver
295 }
296
297 pub fn subscribe_by_digest(&self, digest: B::Digest) -> oneshot::Receiver<CodedBlock<B, C, H>> {
299 let (responder, receiver) = oneshot::channel();
300 let _ = self.sender.enqueue(Message::SubscribeByDigest {
301 digest,
302 response: responder,
303 });
304 receiver
305 }
306
307 pub fn prune(&self, through: Commitment) {
309 let _ = self.sender.enqueue(Message::Prune { through });
310 }
311}