Skip to main content

commonware_consensus/marshal/coding/shards/
mailbox.rs

1//! Mailbox for the shard buffer engine.
2
3use crate::{
4    marshal::coding::types::CodedBlock,
5    types::{coding::Commitment, Round},
6    CertifiableBlock,
7};
8use commonware_actor::mailbox::{Overflow, Policy, Sender};
9use commonware_coding::Scheme as CodingScheme;
10use commonware_cryptography::{Hasher, PublicKey};
11use commonware_utils::channel::oneshot;
12use std::collections::VecDeque;
13
14/// A message that can be sent to the coding [`Engine`].
15///
16/// [`Engine`]: super::Engine
17pub(crate) enum Message<B, C, H, P>
18where
19    B: CertifiableBlock,
20    C: CodingScheme,
21    H: Hasher,
22    P: PublicKey,
23{
24    /// A request to broadcast a proposed [`CodedBlock`] to all peers.
25    Proposed {
26        /// The erasure coded block.
27        block: CodedBlock<B, C, H>,
28        /// The round in which the block was proposed.
29        round: Round,
30    },
31    /// A notification from consensus that a [`Commitment`] has been discovered.
32    Discovered {
33        /// The [`Commitment`] of the proposed block.
34        commitment: Commitment,
35        /// The leader's public key.
36        leader: P,
37        /// The round in which the commitment was proposed.
38        round: Round,
39    },
40    /// A notification from consensus that a [`Commitment`] has been notarized.
41    ///
42    /// This may arrive before the engine knows the round leader. It allows the
43    /// engine to reconstruct from sender-indexed gossip shards already buffered
44    /// for the commitment, but it does not satisfy assigned shard verification.
45    Notarized {
46        /// The [`Commitment`] of the notarized block.
47        commitment: Commitment,
48        /// The round in which the commitment was notarized.
49        round: Round,
50    },
51    /// A request to get a reconstructed block, if available.
52    GetByCommitment {
53        /// The [`Commitment`] of the block to get.
54        commitment: Commitment,
55        /// The response channel.
56        response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
57    },
58    /// A request to get a reconstructed block by its digest, if available.
59    GetByDigest {
60        /// The digest of the block to get.
61        digest: B::Digest,
62        /// The response channel.
63        response: oneshot::Sender<Option<CodedBlock<B, C, H>>>,
64    },
65    /// A request to open a subscription for assigned shard verification.
66    ///
67    /// For participants, this resolves once the leader-delivered shard for
68    /// the local participant index has been verified. Reconstructing the full
69    /// block from gossiped shards does not resolve this subscription: that
70    /// block may still be used for later certification, but it is not enough
71    /// to claim the participant received the shard it is expected to echo.
72    ///
73    /// For proposers, this resolves immediately after the locally built block
74    /// is cached because they trivially have all shards.
75    SubscribeAssignedShardVerified {
76        /// The block's commitment.
77        commitment: Commitment,
78        /// The response channel.
79        response: oneshot::Sender<()>,
80    },
81    /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
82    /// by its [`Commitment`].
83    SubscribeByCommitment {
84        /// The block's digest.
85        commitment: Commitment,
86        /// The response channel.
87        response: oneshot::Sender<CodedBlock<B, C, H>>,
88    },
89    /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
90    /// by its digest.
91    SubscribeByDigest {
92        /// The block's digest.
93        digest: B::Digest,
94        /// The response channel.
95        response: oneshot::Sender<CodedBlock<B, C, H>>,
96    },
97    /// A request to prune all caches at and below the given commitment.
98    Prune {
99        /// Inclusive prune target [`Commitment`].
100        through: Commitment,
101    },
102}
103
104impl<B, C, H, P> Message<B, C, H, P>
105where
106    B: CertifiableBlock,
107    C: CodingScheme,
108    H: Hasher,
109    P: PublicKey,
110{
111    pub(crate) fn response_closed(&self) -> bool {
112        match self {
113            Self::GetByCommitment { response, .. } | Self::GetByDigest { response, .. } => {
114                response.is_closed()
115            }
116            Self::SubscribeAssignedShardVerified { response, .. } => response.is_closed(),
117            Self::SubscribeByCommitment { response, .. }
118            | Self::SubscribeByDigest { response, .. } => response.is_closed(),
119            Self::Proposed { .. }
120            | Self::Discovered { .. }
121            | Self::Notarized { .. }
122            | Self::Prune { .. } => false,
123        }
124    }
125}
126
127pub(crate) struct Pending<B, C, H, P>(VecDeque<Message<B, C, H, P>>)
128where
129    B: CertifiableBlock,
130    C: CodingScheme,
131    H: Hasher,
132    P: PublicKey;
133
134impl<B, C, H, P> Default for Pending<B, C, H, P>
135where
136    B: CertifiableBlock,
137    C: CodingScheme,
138    H: Hasher,
139    P: PublicKey,
140{
141    fn default() -> Self {
142        Self(VecDeque::new())
143    }
144}
145
146impl<B, C, H, P> Overflow<Message<B, C, H, P>> for Pending<B, C, H, P>
147where
148    B: CertifiableBlock,
149    C: CodingScheme,
150    H: Hasher,
151    P: PublicKey,
152{
153    fn is_empty(&self) -> bool {
154        self.0.is_empty()
155    }
156
157    fn drain<F>(&mut self, mut push: F)
158    where
159        F: FnMut(Message<B, C, H, P>) -> Option<Message<B, C, H, P>>,
160    {
161        while let Some(message) = self.0.pop_front() {
162            if message.response_closed() {
163                continue;
164            }
165
166            if let Some(message) = push(message) {
167                self.0.push_front(message);
168                break;
169            }
170        }
171    }
172}
173
174impl<B, C, H, P> Policy for Message<B, C, H, P>
175where
176    B: CertifiableBlock,
177    C: CodingScheme,
178    H: Hasher,
179    P: PublicKey,
180{
181    type Overflow = Pending<B, C, H, P>;
182
183    fn handle(overflow: &mut Self::Overflow, message: Self) {
184        if message.response_closed() {
185            return;
186        }
187
188        overflow.0.push_back(message);
189    }
190}
191
192/// A mailbox for sending messages to the [`Engine`].
193///
194/// [`Engine`]: super::Engine
195#[derive(Clone)]
196pub struct Mailbox<B, C, H, P>
197where
198    B: CertifiableBlock,
199    C: CodingScheme,
200    H: Hasher,
201    P: PublicKey,
202{
203    pub(super) sender: Sender<Message<B, C, H, P>>,
204}
205
206impl<B, C, H, P> Mailbox<B, C, H, P>
207where
208    B: CertifiableBlock,
209    C: CodingScheme,
210    H: Hasher,
211    P: PublicKey,
212{
213    /// Create a new [`Mailbox`] with the given sender.
214    pub(crate) const fn new(sender: Sender<Message<B, C, H, P>>) -> Self {
215        Self { sender }
216    }
217
218    /// Broadcast a proposed erasure coded block's shards to the participants.
219    pub fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
220        let _ = self.sender.enqueue(Message::Proposed { block, round });
221    }
222
223    /// Inform the engine of an externally proposed [`Commitment`].
224    pub fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
225        let _ = self.sender.enqueue(Message::Discovered {
226            commitment,
227            leader,
228            round,
229        });
230    }
231
232    /// Inform the engine that a [`Commitment`] was notarized.
233    ///
234    /// This is the leaderless reconstruction signal used by certification. It
235    /// lets the engine drain sender-indexed gossip shards from its peer buffers
236    /// for the commitment. Leader-specific validation and assigned shard
237    /// verification still require a later [`Self::discovered`] call.
238    pub fn notarized(&self, commitment: Commitment, round: Round) {
239        let _ = self
240            .sender
241            .enqueue(Message::Notarized { commitment, round });
242    }
243
244    /// Request a reconstructed block by its [`Commitment`].
245    pub async fn get(&self, commitment: Commitment) -> Option<CodedBlock<B, C, H>> {
246        let (response, receiver) = oneshot::channel();
247        let _ = self.sender.enqueue(Message::GetByCommitment {
248            commitment,
249            response,
250        });
251        receiver.await.ok().flatten()
252    }
253
254    /// Request a reconstructed block by its digest.
255    pub async fn get_by_digest(&self, digest: B::Digest) -> Option<CodedBlock<B, C, H>> {
256        let (response, receiver) = oneshot::channel();
257        let _ = self
258            .sender
259            .enqueue(Message::GetByDigest { digest, response });
260        receiver.await.ok().flatten()
261    }
262
263    /// Subscribe to assigned shard verification for a commitment.
264    ///
265    /// For participants, this resolves once the leader-delivered shard for
266    /// the local participant index has been verified. Reconstructing the full
267    /// block from gossiped shards does not resolve this subscription: that
268    /// block may still be used for later certification, but it is not enough
269    /// to claim the participant received the shard it is expected to echo.
270    ///
271    /// For proposers, this resolves immediately after the locally built block
272    /// is cached because they trivially have all shards.
273    pub fn subscribe_assigned_shard_verified(
274        &self,
275        commitment: Commitment,
276    ) -> oneshot::Receiver<()> {
277        let (responder, receiver) = oneshot::channel();
278        let _ = self
279            .sender
280            .enqueue(Message::SubscribeAssignedShardVerified {
281                commitment,
282                response: responder,
283            });
284        receiver
285    }
286
287    /// Subscribe to the reconstruction of a [`CodedBlock`] by its [`Commitment`].
288    pub fn subscribe(&self, commitment: Commitment) -> oneshot::Receiver<CodedBlock<B, C, H>> {
289        let (responder, receiver) = oneshot::channel();
290        let _ = self.sender.enqueue(Message::SubscribeByCommitment {
291            commitment,
292            response: responder,
293        });
294        receiver
295    }
296
297    /// Subscribe to the reconstruction of a [`CodedBlock`] by its digest.
298    pub fn subscribe_by_digest(&self, digest: B::Digest) -> oneshot::Receiver<CodedBlock<B, C, H>> {
299        let (responder, receiver) = oneshot::channel();
300        let _ = self.sender.enqueue(Message::SubscribeByDigest {
301            digest,
302            response: responder,
303        });
304        receiver
305    }
306
307    /// Request to prune all caches at and below the given commitment.
308    pub fn prune(&self, through: Commitment) {
309        let _ = self.sender.enqueue(Message::Prune { through });
310    }
311}