commonware_consensus/marshal/coding/shards/
mailbox.rs1use crate::{
4 marshal::coding::types::CodedBlock,
5 types::{coding::Commitment, Round},
6 CertifiableBlock,
7};
8use commonware_coding::Scheme as CodingScheme;
9use commonware_cryptography::{Hasher, PublicKey};
10use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
11use std::sync::Arc;
12
13pub enum Message<B, C, H, P>
17where
18 B: CertifiableBlock,
19 C: CodingScheme,
20 H: Hasher,
21 P: PublicKey,
22{
23 Proposed {
25 block: CodedBlock<B, C, H>,
27 round: Round,
29 },
30 Discovered {
32 commitment: Commitment,
34 leader: P,
36 round: Round,
38 },
39 GetByCommitment {
41 commitment: Commitment,
43 response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
45 },
46 GetByDigest {
48 digest: B::Digest,
50 response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
52 },
53 SubscribeShard {
56 commitment: Commitment,
58 response: oneshot::Sender<()>,
60 },
61 SubscribeByCommitment {
64 commitment: Commitment,
66 response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
68 },
69 SubscribeByDigest {
72 digest: B::Digest,
74 response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
76 },
77 Prune {
79 through: Commitment,
81 },
82}
83
84#[derive(Clone)]
88pub struct Mailbox<B, C, H, P>
89where
90 B: CertifiableBlock,
91 C: CodingScheme,
92 H: Hasher,
93 P: PublicKey,
94{
95 pub(super) sender: mpsc::Sender<Message<B, C, H, P>>,
96}
97
98impl<B, C, H, P> Mailbox<B, C, H, P>
99where
100 B: CertifiableBlock,
101 C: CodingScheme,
102 H: Hasher,
103 P: PublicKey,
104{
105 pub const fn new(sender: mpsc::Sender<Message<B, C, H, P>>) -> Self {
107 Self { sender }
108 }
109
110 pub async fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
112 let msg = Message::Proposed { block, round };
113 self.sender.send_lossy(msg).await;
114 }
115
116 pub async fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
118 let msg = Message::Discovered {
119 commitment,
120 leader,
121 round,
122 };
123 self.sender.send_lossy(msg).await;
124 }
125
126 pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
128 self.sender
129 .request(|tx| Message::GetByCommitment {
130 commitment,
131 response: tx,
132 })
133 .await
134 .flatten()
135 }
136
137 pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
139 self.sender
140 .request(|tx| Message::GetByDigest {
141 digest,
142 response: tx,
143 })
144 .await
145 .flatten()
146 }
147
148 pub async fn subscribe_shard(&self, commitment: Commitment) -> oneshot::Receiver<()> {
150 let (responder, receiver) = oneshot::channel();
151 let msg = Message::SubscribeShard {
152 commitment,
153 response: responder,
154 };
155 self.sender.send_lossy(msg).await;
156 receiver
157 }
158
159 pub async fn subscribe(
161 &self,
162 commitment: Commitment,
163 ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
164 let (responder, receiver) = oneshot::channel();
165 let msg = Message::SubscribeByCommitment {
166 commitment,
167 response: responder,
168 };
169 self.sender.send_lossy(msg).await;
170 receiver
171 }
172
173 pub async fn subscribe_by_digest(
175 &self,
176 digest: B::Digest,
177 ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
178 let (responder, receiver) = oneshot::channel();
179 let msg = Message::SubscribeByDigest {
180 digest,
181 response: responder,
182 };
183 self.sender.send_lossy(msg).await;
184 receiver
185 }
186
187 pub async fn prune(&self, through: Commitment) {
189 let msg = Message::Prune { through };
190 self.sender.send_lossy(msg).await;
191 }
192}