commonware_consensus/marshal/coding/shards/mailbox.rs
1//! Mailbox for the shard buffer engine.
2
3use crate::{
4 marshal::coding::types::CodedBlock,
5 types::{coding::Commitment, Round},
6 CertifiableBlock,
7};
8use commonware_coding::Scheme as CodingScheme;
9use commonware_cryptography::{Hasher, PublicKey};
10use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
11use std::sync::Arc;
12
13/// A message that can be sent to the coding [`Engine`].
14///
15/// [`Engine`]: super::Engine
16pub enum Message<B, C, H, P>
17where
18 B: CertifiableBlock,
19 C: CodingScheme,
20 H: Hasher,
21 P: PublicKey,
22{
23 /// A request to broadcast a proposed [`CodedBlock`] to all peers.
24 Proposed {
25 /// The erasure coded block.
26 block: CodedBlock<B, C, H>,
27 /// The round in which the block was proposed.
28 round: Round,
29 },
30 /// A notification from consensus that a [`Commitment`] has been discovered.
31 Discovered {
32 /// The [`Commitment`] of the proposed block.
33 commitment: Commitment,
34 /// The leader's public key.
35 leader: P,
36 /// The round in which the commitment was proposed.
37 round: Round,
38 },
39 /// A request to get a reconstructed block, if available.
40 GetByCommitment {
41 /// The [`Commitment`] of the block to get.
42 commitment: Commitment,
43 /// The response channel.
44 response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
45 },
46 /// A request to get a reconstructed block by its digest, if available.
47 GetByDigest {
48 /// The digest of the block to get.
49 digest: B::Digest,
50 /// The response channel.
51 response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
52 },
53 /// A request to open a subscription for assigned shard verification.
54 ///
55 /// For participants, this resolves once the leader-delivered shard for
56 /// the local participant index has been verified. Reconstructing the full
57 /// block from gossiped shards does not resolve this subscription: that
58 /// block may still be used for later certification, but it is not enough
59 /// to claim the participant received the shard it is expected to echo.
60 ///
61 /// For proposers, this resolves immediately after the locally built block
62 /// is cached because they trivially have all shards.
63 SubscribeAssignedShardVerified {
64 /// The block's commitment.
65 commitment: Commitment,
66 /// The response channel.
67 response: oneshot::Sender<()>,
68 },
69 /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
70 /// by its [`Commitment`].
71 SubscribeByCommitment {
72 /// The block's digest.
73 commitment: Commitment,
74 /// The response channel.
75 response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
76 },
77 /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
78 /// by its digest.
79 SubscribeByDigest {
80 /// The block's digest.
81 digest: B::Digest,
82 /// The response channel.
83 response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
84 },
85 /// A request to prune all caches at and below the given commitment.
86 Prune {
87 /// Inclusive prune target [`Commitment`].
88 through: Commitment,
89 },
90}
91
92/// A mailbox for sending messages to the [`Engine`].
93///
94/// [`Engine`]: super::Engine
95#[derive(Clone)]
96pub struct Mailbox<B, C, H, P>
97where
98 B: CertifiableBlock,
99 C: CodingScheme,
100 H: Hasher,
101 P: PublicKey,
102{
103 pub(super) sender: mpsc::Sender<Message<B, C, H, P>>,
104}
105
106impl<B, C, H, P> Mailbox<B, C, H, P>
107where
108 B: CertifiableBlock,
109 C: CodingScheme,
110 H: Hasher,
111 P: PublicKey,
112{
113 /// Create a new [`Mailbox`] with the given sender.
114 pub const fn new(sender: mpsc::Sender<Message<B, C, H, P>>) -> Self {
115 Self { sender }
116 }
117
118 /// Broadcast a proposed erasure coded block's shards to the participants.
119 pub async fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
120 let msg = Message::Proposed { block, round };
121 self.sender.send_lossy(msg).await;
122 }
123
124 /// Inform the engine of an externally proposed [`Commitment`].
125 pub async fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
126 let msg = Message::Discovered {
127 commitment,
128 leader,
129 round,
130 };
131 self.sender.send_lossy(msg).await;
132 }
133
134 /// Request a reconstructed block by its [`Commitment`].
135 pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
136 self.sender
137 .request(|tx| Message::GetByCommitment {
138 commitment,
139 response: tx,
140 })
141 .await
142 .flatten()
143 }
144
145 /// Request a reconstructed block by its digest.
146 pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
147 self.sender
148 .request(|tx| Message::GetByDigest {
149 digest,
150 response: tx,
151 })
152 .await
153 .flatten()
154 }
155
156 /// Subscribe to assigned shard verification for a commitment.
157 ///
158 /// For participants, this resolves once the leader-delivered shard for
159 /// the local participant index has been verified. Reconstructing the full
160 /// block from gossiped shards does not resolve this subscription: that
161 /// block may still be used for later certification, but it is not enough
162 /// to claim the participant received the shard it is expected to echo.
163 ///
164 /// For proposers, this resolves immediately after the locally built block
165 /// is cached because they trivially have all shards.
166 pub async fn subscribe_assigned_shard_verified(
167 &self,
168 commitment: Commitment,
169 ) -> oneshot::Receiver<()> {
170 let (responder, receiver) = oneshot::channel();
171 let msg = Message::SubscribeAssignedShardVerified {
172 commitment,
173 response: responder,
174 };
175 self.sender.send_lossy(msg).await;
176 receiver
177 }
178
179 /// Subscribe to the reconstruction of a [`CodedBlock`] by its [`Commitment`].
180 pub async fn subscribe(
181 &self,
182 commitment: Commitment,
183 ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
184 let (responder, receiver) = oneshot::channel();
185 let msg = Message::SubscribeByCommitment {
186 commitment,
187 response: responder,
188 };
189 self.sender.send_lossy(msg).await;
190 receiver
191 }
192
193 /// Subscribe to the reconstruction of a [`CodedBlock`] by its digest.
194 pub async fn subscribe_by_digest(
195 &self,
196 digest: B::Digest,
197 ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
198 let (responder, receiver) = oneshot::channel();
199 let msg = Message::SubscribeByDigest {
200 digest,
201 response: responder,
202 };
203 self.sender.send_lossy(msg).await;
204 receiver
205 }
206
207 /// Request to prune all caches at and below the given commitment.
208 pub async fn prune(&self, through: Commitment) {
209 let msg = Message::Prune { through };
210 self.sender.send_lossy(msg).await;
211 }
212}