commonware_consensus/marshal/core/mailbox.rs
1use super::Variant;
2use crate::{
3 marshal::{
4 ancestry::{AncestorStream, BlockProvider},
5 Identifier,
6 },
7 simplex::types::{Activity, Finalization, Notarization},
8 types::{Height, Round},
9 Reporter,
10};
11use commonware_cryptography::{certificate::Scheme, Digestible};
12use commonware_utils::{
13 channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
14 vec::NonEmptyVec,
15};
16
17/// Messages sent to the marshal [Actor](super::Actor).
18///
19/// These messages are sent from the consensus engine and other parts of the
20/// system to drive the state of the marshal.
21pub(crate) enum Message<S: Scheme, V: Variant> {
22 /// A request to retrieve the `(height, digest)` of a block by its identifier.
23 /// The block must be finalized; returns `None` if the block is not finalized.
24 GetInfo {
25 /// The identifier of the block to get the information of.
26 identifier: Identifier<<V::Block as Digestible>::Digest>,
27 /// A channel to send the retrieved `(height, digest)`.
28 response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
29 },
30 /// A request to retrieve a block by its identifier.
31 ///
32 /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
33 /// blocks, whereas requesting by [Identifier::Digest] may return non-finalized
34 /// or even unverified blocks.
35 GetBlock {
36 /// The identifier of the block to retrieve.
37 identifier: Identifier<<V::Block as Digestible>::Digest>,
38 /// A channel to send the retrieved block.
39 response: oneshot::Sender<Option<V::Block>>,
40 },
41 /// A request to retrieve a finalization by height.
42 GetFinalization {
43 /// The height of the finalization to retrieve.
44 height: Height,
45 /// A channel to send the retrieved finalization.
46 response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
47 },
48 /// A hint that a finalized block may be available at a given height.
49 ///
50 /// This triggers a network fetch if the finalization is not available locally.
51 /// This is fire-and-forget: the finalization will be stored in marshal and
52 /// delivered via the normal finalization flow when available.
53 ///
54 /// The height must be covered by both the epocher and the provider. If the
55 /// epocher cannot map the height to an epoch, or the provider cannot supply
56 /// a scheme for that epoch, the hint is silently dropped.
57 ///
58 /// Targets are required because this is typically called when a peer claims to
59 /// be ahead. If a target returns invalid data, the resolver will block them.
60 /// Sending this message multiple times with different targets adds to the
61 /// target set.
62 HintFinalized {
63 /// The height of the finalization to fetch.
64 height: Height,
65 /// Target peers to fetch from. Added to any existing targets for this height.
66 targets: NonEmptyVec<S::PublicKey>,
67 },
68 /// A request to subscribe to a block by its digest.
69 SubscribeByDigest {
70 /// The round in which the block was notarized. This is an optimization
71 /// to help locate the block.
72 round: Option<Round>,
73 /// The digest of the block to retrieve.
74 digest: <V::Block as Digestible>::Digest,
75 /// A channel to send the retrieved block.
76 response: oneshot::Sender<V::Block>,
77 },
78 /// A request to subscribe to a block by its commitment.
79 SubscribeByCommitment {
80 /// The round in which the block was notarized. This is an optimization
81 /// to help locate the block.
82 round: Option<Round>,
83 /// The commitment of the block to retrieve.
84 commitment: V::Commitment,
85 /// A channel to send the retrieved block.
86 response: oneshot::Sender<V::Block>,
87 },
88 /// A request to broadcast a proposed block to peers.
89 Proposed {
90 /// The round in which the block was proposed.
91 round: Round,
92 /// The block to broadcast.
93 block: V::Block,
94 },
95 /// A notification that a block has been verified by the application.
96 Verified {
97 /// The round in which the block was verified.
98 round: Round,
99 /// The verified block.
100 block: V::Block,
101 },
102 /// Sets the sync starting point (advances if higher than current).
103 ///
104 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
105 /// the floor is pruned.
106 ///
107 /// To prune data without affecting the sync starting point (say at some trailing depth
108 /// from tip), use [Message::Prune] instead.
109 ///
110 /// The default floor is 0.
111 SetFloor {
112 /// The candidate floor height.
113 height: Height,
114 },
115 /// Prunes finalized blocks and certificates below the given height.
116 ///
117 /// Unlike [Message::SetFloor], this does not affect the sync starting point.
118 /// The height must be at or below the current floor (last processed height),
119 /// otherwise the prune request is ignored.
120 Prune {
121 /// The minimum height to keep (blocks below this are pruned).
122 height: Height,
123 },
124 /// A notarization from the consensus engine.
125 Notarization {
126 /// The notarization.
127 notarization: Notarization<S, V::Commitment>,
128 },
129 /// A finalization from the consensus engine.
130 Finalization {
131 /// The finalization.
132 finalization: Finalization<S, V::Commitment>,
133 },
134}
135
136/// A mailbox for sending messages to the marshal [Actor](super::Actor).
137#[derive(Clone)]
138pub struct Mailbox<S: Scheme, V: Variant> {
139 sender: mpsc::Sender<Message<S, V>>,
140}
141
142impl<S: Scheme, V: Variant> Mailbox<S, V> {
143 /// Creates a new mailbox.
144 pub(crate) const fn new(sender: mpsc::Sender<Message<S, V>>) -> Self {
145 Self { sender }
146 }
147
148 /// A request to retrieve the information about the highest finalized block.
149 pub async fn get_info(
150 &self,
151 identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
152 ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
153 let identifier = identifier.into();
154 self.sender
155 .request(|response| Message::GetInfo {
156 identifier,
157 response,
158 })
159 .await
160 .flatten()
161 }
162
163 /// A best-effort attempt to retrieve a given block from local
164 /// storage. It is not an indication to go fetch the block from the network.
165 pub async fn get_block(
166 &self,
167 identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
168 ) -> Option<V::Block> {
169 let identifier = identifier.into();
170 self.sender
171 .request(|response| Message::GetBlock {
172 identifier,
173 response,
174 })
175 .await
176 .flatten()
177 }
178
179 /// A best-effort attempt to retrieve a given [Finalization] from local
180 /// storage. It is not an indication to go fetch the [Finalization] from the network.
181 pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
182 self.sender
183 .request(|response| Message::GetFinalization { height, response })
184 .await
185 .flatten()
186 }
187
188 /// Hints that a finalized block may be available at the given height.
189 ///
190 /// This method will request the finalization from the network via the resolver
191 /// if it is not available locally.
192 ///
193 /// Targets are required because this is typically called when a peer claims to be
194 /// ahead. By targeting only those peers, we limit who we ask. If a target returns
195 /// invalid data, they will be blocked by the resolver. If targets don't respond
196 /// or return "no data", they effectively rate-limit themselves.
197 ///
198 /// Calling this multiple times for the same height with different targets will
199 /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
200 ///
201 /// This is fire-and-forget: the finalization will be stored in marshal and delivered
202 /// via the normal finalization flow when available.
203 ///
204 /// The height must be covered by both the epocher and the provider. If the
205 /// epocher cannot map the height to an epoch, or the provider cannot supply
206 /// a scheme for that epoch, the hint is silently dropped.
207 pub async fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
208 self.sender
209 .send_lossy(Message::HintFinalized { height, targets })
210 .await;
211 }
212
213 /// Subscribe to a block by its digest.
214 ///
215 /// If the block is found available locally, the block will be returned immediately.
216 ///
217 /// If the block is not available locally, the request will be registered and the caller will
218 /// be notified when the block is available. If the block is not finalized, it's possible that
219 /// it may never become available.
220 ///
221 /// The oneshot receiver should be dropped to cancel the subscription.
222 pub async fn subscribe_by_digest(
223 &self,
224 round: Option<Round>,
225 digest: <V::Block as Digestible>::Digest,
226 ) -> oneshot::Receiver<V::Block> {
227 let (tx, rx) = oneshot::channel();
228 self.sender
229 .send_lossy(Message::SubscribeByDigest {
230 round,
231 digest,
232 response: tx,
233 })
234 .await;
235 rx
236 }
237
238 /// Subscribe to a block by its commitment.
239 ///
240 /// If the block is found available locally, the block will be returned immediately.
241 ///
242 /// If the block is not available locally, the request will be registered and the caller will
243 /// be notified when the block is available. If the block is not finalized, it's possible that
244 /// it may never become available.
245 ///
246 /// The oneshot receiver should be dropped to cancel the subscription.
247 pub async fn subscribe_by_commitment(
248 &self,
249 round: Option<Round>,
250 commitment: V::Commitment,
251 ) -> oneshot::Receiver<V::Block> {
252 let (tx, rx) = oneshot::channel();
253 self.sender
254 .send_lossy(Message::SubscribeByCommitment {
255 round,
256 commitment,
257 response: tx,
258 })
259 .await;
260 rx
261 }
262
263 /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
264 ///
265 /// If the starting block is not found, `None` is returned.
266 pub async fn ancestry(
267 &self,
268 (start_round, start_digest): (Option<Round>, <V::Block as Digestible>::Digest),
269 ) -> Option<AncestorStream<Self, V::ApplicationBlock>> {
270 self.subscribe_by_digest(start_round, start_digest)
271 .await
272 .await
273 .ok()
274 .map(|block| AncestorStream::new(self.clone(), [V::into_inner(block)]))
275 }
276
277 /// Requests that a proposed block is sent to peers.
278 pub async fn proposed(&self, round: Round, block: V::Block) {
279 self.sender
280 .send_lossy(Message::Proposed { round, block })
281 .await;
282 }
283
284 /// Notifies the actor that a block has been verified.
285 pub async fn verified(&self, round: Round, block: V::Block) {
286 self.sender
287 .send_lossy(Message::Verified { round, block })
288 .await;
289 }
290
291 /// Sets the sync starting point (advances if higher than current).
292 ///
293 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
294 /// the floor is pruned.
295 ///
296 /// To prune data without affecting the sync starting point (say at some trailing depth
297 /// from tip), use [Self::prune] instead.
298 ///
299 /// The default floor is 0.
300 pub async fn set_floor(&self, height: Height) {
301 self.sender.send_lossy(Message::SetFloor { height }).await;
302 }
303
304 /// Prunes finalized blocks and certificates below the given height.
305 ///
306 /// Unlike [Self::set_floor], this does not affect the sync starting point.
307 /// The height must be at or below the current floor (last processed height),
308 /// otherwise the prune request is ignored.
309 pub async fn prune(&self, height: Height) {
310 self.sender.send_lossy(Message::Prune { height }).await;
311 }
312}
313
314impl<S: Scheme, V: Variant> BlockProvider for Mailbox<S, V> {
315 type Block = V::ApplicationBlock;
316
317 async fn fetch_block(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
318 let subscription = self.subscribe_by_digest(None, digest).await;
319 subscription.await.ok().map(V::into_inner)
320 }
321}
322
323impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
324 type Activity = Activity<S, V::Commitment>;
325
326 async fn report(&mut self, activity: Self::Activity) {
327 let message = match activity {
328 Activity::Notarization(notarization) => Message::Notarization { notarization },
329 Activity::Finalization(finalization) => Message::Finalization { finalization },
330 _ => {
331 // Ignore other activity types
332 return;
333 }
334 };
335 self.sender.send_lossy(message).await;
336 }
337}