Skip to main content

commonware_consensus/marshal/coding/shards/
mailbox.rs

1//! Mailbox for the shard buffer engine.
2
3use crate::{
4    marshal::coding::types::CodedBlock,
5    types::{coding::Commitment, Round},
6    CertifiableBlock,
7};
8use commonware_coding::Scheme as CodingScheme;
9use commonware_cryptography::{Hasher, PublicKey};
10use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
11use std::sync::Arc;
12
13/// A message that can be sent to the coding [`Engine`].
14///
15/// [`Engine`]: super::Engine
16pub enum Message<B, C, H, P>
17where
18    B: CertifiableBlock,
19    C: CodingScheme,
20    H: Hasher,
21    P: PublicKey,
22{
23    /// A request to broadcast a proposed [`CodedBlock`] to all peers.
24    Proposed {
25        /// The erasure coded block.
26        block: CodedBlock<B, C, H>,
27        /// The round in which the block was proposed.
28        round: Round,
29    },
30    /// A notification from consensus that a [`Commitment`] has been discovered.
31    Discovered {
32        /// The [`Commitment`] of the proposed block.
33        commitment: Commitment,
34        /// The leader's public key.
35        leader: P,
36        /// The round in which the commitment was proposed.
37        round: Round,
38    },
39    /// A request to get a reconstructed block, if available.
40    GetByCommitment {
41        /// The [`Commitment`] of the block to get.
42        commitment: Commitment,
43        /// The response channel.
44        response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
45    },
46    /// A request to get a reconstructed block by its digest, if available.
47    GetByDigest {
48        /// The digest of the block to get.
49        digest: B::Digest,
50        /// The response channel.
51        response: oneshot::Sender<Option<Arc<CodedBlock<B, C, H>>>>,
52    },
53    /// A request to open a subscription for assigned shard verification.
54    ///
55    /// For participants, this resolves once the leader-delivered shard for
56    /// the local participant index has been verified. Reconstructing the full
57    /// block from gossiped shards does not resolve this subscription: that
58    /// block may still be used for later certification, but it is not enough
59    /// to claim the participant received the shard it is expected to echo.
60    ///
61    /// For proposers, this resolves immediately after the locally built block
62    /// is cached because they trivially have all shards.
63    SubscribeAssignedShardVerified {
64        /// The block's commitment.
65        commitment: Commitment,
66        /// The response channel.
67        response: oneshot::Sender<()>,
68    },
69    /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
70    /// by its [`Commitment`].
71    SubscribeByCommitment {
72        /// The block's digest.
73        commitment: Commitment,
74        /// The response channel.
75        response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
76    },
77    /// A request to open a subscription for the reconstruction of a [`CodedBlock`]
78    /// by its digest.
79    SubscribeByDigest {
80        /// The block's digest.
81        digest: B::Digest,
82        /// The response channel.
83        response: oneshot::Sender<Arc<CodedBlock<B, C, H>>>,
84    },
85    /// A request to prune all caches at and below the given commitment.
86    Prune {
87        /// Inclusive prune target [`Commitment`].
88        through: Commitment,
89    },
90}
91
92/// A mailbox for sending messages to the [`Engine`].
93///
94/// [`Engine`]: super::Engine
95#[derive(Clone)]
96pub struct Mailbox<B, C, H, P>
97where
98    B: CertifiableBlock,
99    C: CodingScheme,
100    H: Hasher,
101    P: PublicKey,
102{
103    pub(super) sender: mpsc::Sender<Message<B, C, H, P>>,
104}
105
106impl<B, C, H, P> Mailbox<B, C, H, P>
107where
108    B: CertifiableBlock,
109    C: CodingScheme,
110    H: Hasher,
111    P: PublicKey,
112{
113    /// Create a new [`Mailbox`] with the given sender.
114    pub const fn new(sender: mpsc::Sender<Message<B, C, H, P>>) -> Self {
115        Self { sender }
116    }
117
118    /// Broadcast a proposed erasure coded block's shards to the participants.
119    pub async fn proposed(&self, round: Round, block: CodedBlock<B, C, H>) {
120        let msg = Message::Proposed { block, round };
121        self.sender.send_lossy(msg).await;
122    }
123
124    /// Inform the engine of an externally proposed [`Commitment`].
125    pub async fn discovered(&self, commitment: Commitment, leader: P, round: Round) {
126        let msg = Message::Discovered {
127            commitment,
128            leader,
129            round,
130        };
131        self.sender.send_lossy(msg).await;
132    }
133
134    /// Request a reconstructed block by its [`Commitment`].
135    pub async fn get(&self, commitment: Commitment) -> Option<Arc<CodedBlock<B, C, H>>> {
136        self.sender
137            .request(|tx| Message::GetByCommitment {
138                commitment,
139                response: tx,
140            })
141            .await
142            .flatten()
143    }
144
145    /// Request a reconstructed block by its digest.
146    pub async fn get_by_digest(&self, digest: B::Digest) -> Option<Arc<CodedBlock<B, C, H>>> {
147        self.sender
148            .request(|tx| Message::GetByDigest {
149                digest,
150                response: tx,
151            })
152            .await
153            .flatten()
154    }
155
156    /// Subscribe to assigned shard verification for a commitment.
157    ///
158    /// For participants, this resolves once the leader-delivered shard for
159    /// the local participant index has been verified. Reconstructing the full
160    /// block from gossiped shards does not resolve this subscription: that
161    /// block may still be used for later certification, but it is not enough
162    /// to claim the participant received the shard it is expected to echo.
163    ///
164    /// For proposers, this resolves immediately after the locally built block
165    /// is cached because they trivially have all shards.
166    pub async fn subscribe_assigned_shard_verified(
167        &self,
168        commitment: Commitment,
169    ) -> oneshot::Receiver<()> {
170        let (responder, receiver) = oneshot::channel();
171        let msg = Message::SubscribeAssignedShardVerified {
172            commitment,
173            response: responder,
174        };
175        self.sender.send_lossy(msg).await;
176        receiver
177    }
178
179    /// Subscribe to the reconstruction of a [`CodedBlock`] by its [`Commitment`].
180    pub async fn subscribe(
181        &self,
182        commitment: Commitment,
183    ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
184        let (responder, receiver) = oneshot::channel();
185        let msg = Message::SubscribeByCommitment {
186            commitment,
187            response: responder,
188        };
189        self.sender.send_lossy(msg).await;
190        receiver
191    }
192
193    /// Subscribe to the reconstruction of a [`CodedBlock`] by its digest.
194    pub async fn subscribe_by_digest(
195        &self,
196        digest: B::Digest,
197    ) -> oneshot::Receiver<Arc<CodedBlock<B, C, H>>> {
198        let (responder, receiver) = oneshot::channel();
199        let msg = Message::SubscribeByDigest {
200            digest,
201            response: responder,
202        };
203        self.sender.send_lossy(msg).await;
204        receiver
205    }
206
207    /// Request to prune all caches at and below the given commitment.
208    pub async fn prune(&self, through: Commitment) {
209        let msg = Message::Prune { through };
210        self.sender.send_lossy(msg).await;
211    }
212}