commonware_consensus/marshal/core/mailbox.rs
1use super::Variant;
2use crate::{
3 marshal::{
4 ancestry::{AncestorStream, BlockProvider},
5 Identifier,
6 },
7 simplex::types::{Activity, Finalization, Notarization},
8 types::{Height, Round},
9 Reporter,
10};
11use commonware_cryptography::{certificate::Scheme, Digestible};
12use commonware_utils::{
13 channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
14 vec::NonEmptyVec,
15};
16
17/// Messages sent to the marshal [Actor](super::Actor).
18///
19/// These messages are sent from the consensus engine and other parts of the
20/// system to drive the state of the marshal.
21pub(crate) enum Message<S: Scheme, V: Variant> {
22 /// A request to retrieve the `(height, digest)` of a block by its identifier.
23 /// The block must be finalized; returns `None` if the block is not finalized.
24 GetInfo {
25 /// The identifier of the block to get the information of.
26 identifier: Identifier<<V::Block as Digestible>::Digest>,
27 /// A channel to send the retrieved `(height, digest)`.
28 response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
29 },
30 /// A request to retrieve a block by its identifier.
31 ///
32 /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
33 /// blocks, whereas requesting by [Identifier::Digest] may return non-finalized
34 /// or even unverified blocks.
35 GetBlock {
36 /// The identifier of the block to retrieve.
37 identifier: Identifier<<V::Block as Digestible>::Digest>,
38 /// A channel to send the retrieved block.
39 response: oneshot::Sender<Option<V::Block>>,
40 },
41 /// A request to retrieve a finalization by height.
42 GetFinalization {
43 /// The height of the finalization to retrieve.
44 height: Height,
45 /// A channel to send the retrieved finalization.
46 response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
47 },
48 /// A hint that a finalized block may be available at a given height.
49 ///
50 /// This triggers a network fetch if the finalization is not available locally.
51 /// This is fire-and-forget: the finalization will be stored in marshal and
52 /// delivered via the normal finalization flow when available.
53 ///
54 /// The height must be covered by both the epocher and the provider. If the
55 /// epocher cannot map the height to an epoch, or the provider cannot supply
56 /// a scheme for that epoch, the hint is silently dropped.
57 ///
58 /// Targets are required because this is typically called when a peer claims to
59 /// be ahead. If a target returns invalid data, the resolver will block them.
60 /// Sending this message multiple times with different targets adds to the
61 /// target set.
62 HintFinalized {
63 /// The height of the finalization to fetch.
64 height: Height,
65 /// Target peers to fetch from. Added to any existing targets for this height.
66 targets: NonEmptyVec<S::PublicKey>,
67 },
68 /// A request to subscribe to a block by its digest.
69 SubscribeByDigest {
70 /// The round in which the block was notarized. This is an optimization
71 /// to help locate the block.
72 round: Option<Round>,
73 /// The digest of the block to retrieve.
74 digest: <V::Block as Digestible>::Digest,
75 /// A channel to send the retrieved block.
76 response: oneshot::Sender<V::Block>,
77 },
78 /// A request to subscribe to a block by its commitment.
79 SubscribeByCommitment {
80 /// The round in which the block was notarized. This is an optimization
81 /// to help locate the block.
82 round: Option<Round>,
83 /// The commitment of the block to retrieve.
84 commitment: V::Commitment,
85 /// A channel to send the retrieved block.
86 response: oneshot::Sender<V::Block>,
87 },
88 /// A request to broadcast a proposed block to peers.
89 Proposed {
90 /// The round in which the block was proposed.
91 round: Round,
92 /// The block to broadcast.
93 block: V::Block,
94 },
95 /// A request to forward a block to a set of peers.
96 Forward {
97 /// The round in which the block was proposed.
98 round: Round,
99 /// The commitment of the block to forward.
100 commitment: V::Commitment,
101 /// The peers to forward the block to.
102 peers: Vec<S::PublicKey>,
103 },
104 /// A notification that a block has been verified by the application.
105 Verified {
106 /// The round in which the block was verified.
107 round: Round,
108 /// The verified block.
109 block: V::Block,
110 },
111 /// Sets the sync starting point (advances if higher than current).
112 ///
113 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
114 /// the floor is pruned.
115 ///
116 /// To prune data without affecting the sync starting point (say at some trailing depth
117 /// from tip), use [Message::Prune] instead.
118 ///
119 /// The default floor is 0.
120 SetFloor {
121 /// The candidate floor height.
122 height: Height,
123 },
124 /// Prunes finalized blocks and certificates below the given height.
125 ///
126 /// Unlike [Message::SetFloor], this does not affect the sync starting point.
127 /// The height must be at or below the current floor (last processed height),
128 /// otherwise the prune request is ignored.
129 Prune {
130 /// The minimum height to keep (blocks below this are pruned).
131 height: Height,
132 },
133 /// A notarization from the consensus engine.
134 Notarization {
135 /// The notarization.
136 notarization: Notarization<S, V::Commitment>,
137 },
138 /// A finalization from the consensus engine.
139 Finalization {
140 /// The finalization.
141 finalization: Finalization<S, V::Commitment>,
142 },
143}
144
145/// A mailbox for sending messages to the marshal [Actor](super::Actor).
146#[derive(Clone)]
147pub struct Mailbox<S: Scheme, V: Variant> {
148 sender: mpsc::Sender<Message<S, V>>,
149}
150
151impl<S: Scheme, V: Variant> Mailbox<S, V> {
152 /// Creates a new mailbox.
153 pub(crate) const fn new(sender: mpsc::Sender<Message<S, V>>) -> Self {
154 Self { sender }
155 }
156
157 /// A request to retrieve the information about the highest finalized block.
158 pub async fn get_info(
159 &self,
160 identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
161 ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
162 let identifier = identifier.into();
163 self.sender
164 .request(|response| Message::GetInfo {
165 identifier,
166 response,
167 })
168 .await
169 .flatten()
170 }
171
172 /// A best-effort attempt to retrieve a given block from local
173 /// storage. It is not an indication to go fetch the block from the network.
174 pub async fn get_block(
175 &self,
176 identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
177 ) -> Option<V::Block> {
178 let identifier = identifier.into();
179 self.sender
180 .request(|response| Message::GetBlock {
181 identifier,
182 response,
183 })
184 .await
185 .flatten()
186 }
187
188 /// A best-effort attempt to retrieve a given [Finalization] from local
189 /// storage. It is not an indication to go fetch the [Finalization] from the network.
190 pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
191 self.sender
192 .request(|response| Message::GetFinalization { height, response })
193 .await
194 .flatten()
195 }
196
197 /// Hints that a finalized block may be available at the given height.
198 ///
199 /// This method will request the finalization from the network via the resolver
200 /// if it is not available locally.
201 ///
202 /// Targets are required because this is typically called when a peer claims to be
203 /// ahead. By targeting only those peers, we limit who we ask. If a target returns
204 /// invalid data, they will be blocked by the resolver. If targets don't respond
205 /// or return "no data", they effectively rate-limit themselves.
206 ///
207 /// Calling this multiple times for the same height with different targets will
208 /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
209 ///
210 /// This is fire-and-forget: the finalization will be stored in marshal and delivered
211 /// via the normal finalization flow when available.
212 ///
213 /// The height must be covered by both the epocher and the provider. If the
214 /// epocher cannot map the height to an epoch, or the provider cannot supply
215 /// a scheme for that epoch, the hint is silently dropped.
216 pub async fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
217 self.sender
218 .send_lossy(Message::HintFinalized { height, targets })
219 .await;
220 }
221
222 /// Subscribe to a block by its digest.
223 ///
224 /// If the block is found available locally, the block will be returned immediately.
225 ///
226 /// If the block is not available locally, the request will be registered and the caller will
227 /// be notified when the block is available. If the block is not finalized, it's possible that
228 /// it may never become available.
229 ///
230 /// The oneshot receiver should be dropped to cancel the subscription.
231 pub async fn subscribe_by_digest(
232 &self,
233 round: Option<Round>,
234 digest: <V::Block as Digestible>::Digest,
235 ) -> oneshot::Receiver<V::Block> {
236 let (tx, rx) = oneshot::channel();
237 self.sender
238 .send_lossy(Message::SubscribeByDigest {
239 round,
240 digest,
241 response: tx,
242 })
243 .await;
244 rx
245 }
246
247 /// Subscribe to a block by its commitment.
248 ///
249 /// If the block is found available locally, the block will be returned immediately.
250 ///
251 /// If the block is not available locally, the request will be registered and the caller will
252 /// be notified when the block is available. If the block is not finalized, it's possible that
253 /// it may never become available.
254 ///
255 /// The oneshot receiver should be dropped to cancel the subscription.
256 pub async fn subscribe_by_commitment(
257 &self,
258 round: Option<Round>,
259 commitment: V::Commitment,
260 ) -> oneshot::Receiver<V::Block> {
261 let (tx, rx) = oneshot::channel();
262 self.sender
263 .send_lossy(Message::SubscribeByCommitment {
264 round,
265 commitment,
266 response: tx,
267 })
268 .await;
269 rx
270 }
271
272 /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
273 ///
274 /// If the starting block is not found, `None` is returned.
275 pub async fn ancestry(
276 &self,
277 (start_round, start_digest): (Option<Round>, <V::Block as Digestible>::Digest),
278 ) -> Option<AncestorStream<Self, V::ApplicationBlock>> {
279 self.subscribe_by_digest(start_round, start_digest)
280 .await
281 .await
282 .ok()
283 .map(|block| AncestorStream::new(self.clone(), [V::into_inner(block)]))
284 }
285
286 /// Requests that a proposed block is sent to peers.
287 pub async fn proposed(&self, round: Round, block: V::Block) {
288 self.sender
289 .send_lossy(Message::Proposed { round, block })
290 .await;
291 }
292
293 /// Notifies the actor that a block has been verified.
294 pub async fn verified(&self, round: Round, block: V::Block) {
295 self.sender
296 .send_lossy(Message::Verified { round, block })
297 .await;
298 }
299
300 /// Sets the sync starting point (advances if higher than current).
301 ///
302 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
303 /// the floor is pruned.
304 ///
305 /// To prune data without affecting the sync starting point (say at some trailing depth
306 /// from tip), use [Self::prune] instead.
307 ///
308 /// The default floor is 0.
309 pub async fn set_floor(&self, height: Height) {
310 self.sender.send_lossy(Message::SetFloor { height }).await;
311 }
312
313 /// Prunes finalized blocks and certificates below the given height.
314 ///
315 /// Unlike [Self::set_floor], this does not affect the sync starting point.
316 /// The height must be at or below the current floor (last processed height),
317 /// otherwise the prune request is ignored.
318 ///
319 /// A `prune` request for a height above marshal's current floor is dropped.
320 pub async fn prune(&self, height: Height) {
321 self.sender.send_lossy(Message::Prune { height }).await;
322 }
323
324 /// Forward a block to a set of peers.
325 pub async fn forward(&self, round: Round, commitment: V::Commitment, peers: Vec<S::PublicKey>) {
326 self.sender
327 .send_lossy(Message::Forward {
328 round,
329 commitment,
330 peers,
331 })
332 .await;
333 }
334}
335
336impl<S: Scheme, V: Variant> BlockProvider for Mailbox<S, V> {
337 type Block = V::ApplicationBlock;
338
339 async fn fetch_block(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
340 let subscription = self.subscribe_by_digest(None, digest).await;
341 subscription.await.ok().map(V::into_inner)
342 }
343}
344
345impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
346 type Activity = Activity<S, V::Commitment>;
347
348 async fn report(&mut self, activity: Self::Activity) {
349 let message = match activity {
350 Activity::Notarization(notarization) => Message::Notarization { notarization },
351 Activity::Finalization(finalization) => Message::Finalization { finalization },
352 _ => {
353 // Ignore other activity types
354 return;
355 }
356 };
357 self.sender.send_lossy(message).await;
358 }
359}