Skip to main content

commonware_consensus/marshal/core/
mailbox.rs

1use super::Variant;
2use crate::{
3    marshal::{
4        ancestry::{AncestorStream, BlockProvider},
5        Identifier,
6    },
7    simplex::types::{Activity, Finalization, Notarization},
8    types::{Height, Round},
9    Reporter,
10};
11use commonware_cryptography::{certificate::Scheme, Digestible};
12use commonware_utils::{
13    channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
14    vec::NonEmptyVec,
15};
16
17/// Messages sent to the marshal [Actor](super::Actor).
18///
19/// These messages are sent from the consensus engine and other parts of the
20/// system to drive the state of the marshal.
21pub(crate) enum Message<S: Scheme, V: Variant> {
22    /// A request to retrieve the `(height, digest)` of a block by its identifier.
23    /// The block must be finalized; returns `None` if the block is not finalized.
24    GetInfo {
25        /// The identifier of the block to get the information of.
26        identifier: Identifier<<V::Block as Digestible>::Digest>,
27        /// A channel to send the retrieved `(height, digest)`.
28        response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
29    },
30    /// A request to retrieve a block by its identifier.
31    ///
32    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
33    /// blocks, whereas requesting by [Identifier::Digest] may return non-finalized
34    /// or even unverified blocks.
35    GetBlock {
36        /// The identifier of the block to retrieve.
37        identifier: Identifier<<V::Block as Digestible>::Digest>,
38        /// A channel to send the retrieved block.
39        response: oneshot::Sender<Option<V::Block>>,
40    },
41    /// A request to retrieve a finalization by height.
42    GetFinalization {
43        /// The height of the finalization to retrieve.
44        height: Height,
45        /// A channel to send the retrieved finalization.
46        response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
47    },
48    /// A hint that a finalized block may be available at a given height.
49    ///
50    /// This triggers a network fetch if the finalization is not available locally.
51    /// This is fire-and-forget: the finalization will be stored in marshal and
52    /// delivered via the normal finalization flow when available.
53    ///
54    /// The height must be covered by both the epocher and the provider. If the
55    /// epocher cannot map the height to an epoch, or the provider cannot supply
56    /// a scheme for that epoch, the hint is silently dropped.
57    ///
58    /// Targets are required because this is typically called when a peer claims to
59    /// be ahead. If a target returns invalid data, the resolver will block them.
60    /// Sending this message multiple times with different targets adds to the
61    /// target set.
62    HintFinalized {
63        /// The height of the finalization to fetch.
64        height: Height,
65        /// Target peers to fetch from. Added to any existing targets for this height.
66        targets: NonEmptyVec<S::PublicKey>,
67    },
68    /// A request to subscribe to a block by its digest.
69    SubscribeByDigest {
70        /// The round in which the block was notarized. This is an optimization
71        /// to help locate the block.
72        round: Option<Round>,
73        /// The digest of the block to retrieve.
74        digest: <V::Block as Digestible>::Digest,
75        /// A channel to send the retrieved block.
76        response: oneshot::Sender<V::Block>,
77    },
78    /// A request to subscribe to a block by its commitment.
79    SubscribeByCommitment {
80        /// The round in which the block was notarized. This is an optimization
81        /// to help locate the block.
82        round: Option<Round>,
83        /// The commitment of the block to retrieve.
84        commitment: V::Commitment,
85        /// A channel to send the retrieved block.
86        response: oneshot::Sender<V::Block>,
87    },
88    /// A request to broadcast a proposed block to peers.
89    Proposed {
90        /// The round in which the block was proposed.
91        round: Round,
92        /// The block to broadcast.
93        block: V::Block,
94    },
95    /// A request to forward a block to a set of peers.
96    Forward {
97        /// The round in which the block was proposed.
98        round: Round,
99        /// The commitment of the block to forward.
100        commitment: V::Commitment,
101        /// The peers to forward the block to.
102        peers: Vec<S::PublicKey>,
103    },
104    /// A notification that a block has been verified by the application.
105    Verified {
106        /// The round in which the block was verified.
107        round: Round,
108        /// The verified block.
109        block: V::Block,
110    },
111    /// Sets the sync starting point (advances if higher than current).
112    ///
113    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
114    /// the floor is pruned.
115    ///
116    /// To prune data without affecting the sync starting point (say at some trailing depth
117    /// from tip), use [Message::Prune] instead.
118    ///
119    /// The default floor is 0.
120    SetFloor {
121        /// The candidate floor height.
122        height: Height,
123    },
124    /// Prunes finalized blocks and certificates below the given height.
125    ///
126    /// Unlike [Message::SetFloor], this does not affect the sync starting point.
127    /// The height must be at or below the current floor (last processed height),
128    /// otherwise the prune request is ignored.
129    Prune {
130        /// The minimum height to keep (blocks below this are pruned).
131        height: Height,
132    },
133    /// A notarization from the consensus engine.
134    Notarization {
135        /// The notarization.
136        notarization: Notarization<S, V::Commitment>,
137    },
138    /// A finalization from the consensus engine.
139    Finalization {
140        /// The finalization.
141        finalization: Finalization<S, V::Commitment>,
142    },
143}
144
145/// A mailbox for sending messages to the marshal [Actor](super::Actor).
146#[derive(Clone)]
147pub struct Mailbox<S: Scheme, V: Variant> {
148    sender: mpsc::Sender<Message<S, V>>,
149}
150
151impl<S: Scheme, V: Variant> Mailbox<S, V> {
152    /// Creates a new mailbox.
153    pub(crate) const fn new(sender: mpsc::Sender<Message<S, V>>) -> Self {
154        Self { sender }
155    }
156
157    /// A request to retrieve the information about the highest finalized block.
158    pub async fn get_info(
159        &self,
160        identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
161    ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
162        let identifier = identifier.into();
163        self.sender
164            .request(|response| Message::GetInfo {
165                identifier,
166                response,
167            })
168            .await
169            .flatten()
170    }
171
172    /// A best-effort attempt to retrieve a given block from local
173    /// storage. It is not an indication to go fetch the block from the network.
174    pub async fn get_block(
175        &self,
176        identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
177    ) -> Option<V::Block> {
178        let identifier = identifier.into();
179        self.sender
180            .request(|response| Message::GetBlock {
181                identifier,
182                response,
183            })
184            .await
185            .flatten()
186    }
187
188    /// A best-effort attempt to retrieve a given [Finalization] from local
189    /// storage. It is not an indication to go fetch the [Finalization] from the network.
190    pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
191        self.sender
192            .request(|response| Message::GetFinalization { height, response })
193            .await
194            .flatten()
195    }
196
197    /// Hints that a finalized block may be available at the given height.
198    ///
199    /// This method will request the finalization from the network via the resolver
200    /// if it is not available locally.
201    ///
202    /// Targets are required because this is typically called when a peer claims to be
203    /// ahead. By targeting only those peers, we limit who we ask. If a target returns
204    /// invalid data, they will be blocked by the resolver. If targets don't respond
205    /// or return "no data", they effectively rate-limit themselves.
206    ///
207    /// Calling this multiple times for the same height with different targets will
208    /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
209    ///
210    /// This is fire-and-forget: the finalization will be stored in marshal and delivered
211    /// via the normal finalization flow when available.
212    ///
213    /// The height must be covered by both the epocher and the provider. If the
214    /// epocher cannot map the height to an epoch, or the provider cannot supply
215    /// a scheme for that epoch, the hint is silently dropped.
216    pub async fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
217        self.sender
218            .send_lossy(Message::HintFinalized { height, targets })
219            .await;
220    }
221
222    /// Subscribe to a block by its digest.
223    ///
224    /// If the block is found available locally, the block will be returned immediately.
225    ///
226    /// If the block is not available locally, the request will be registered and the caller will
227    /// be notified when the block is available. If the block is not finalized, it's possible that
228    /// it may never become available.
229    ///
230    /// The oneshot receiver should be dropped to cancel the subscription.
231    pub async fn subscribe_by_digest(
232        &self,
233        round: Option<Round>,
234        digest: <V::Block as Digestible>::Digest,
235    ) -> oneshot::Receiver<V::Block> {
236        let (tx, rx) = oneshot::channel();
237        self.sender
238            .send_lossy(Message::SubscribeByDigest {
239                round,
240                digest,
241                response: tx,
242            })
243            .await;
244        rx
245    }
246
247    /// Subscribe to a block by its commitment.
248    ///
249    /// If the block is found available locally, the block will be returned immediately.
250    ///
251    /// If the block is not available locally, the request will be registered and the caller will
252    /// be notified when the block is available. If the block is not finalized, it's possible that
253    /// it may never become available.
254    ///
255    /// The oneshot receiver should be dropped to cancel the subscription.
256    pub async fn subscribe_by_commitment(
257        &self,
258        round: Option<Round>,
259        commitment: V::Commitment,
260    ) -> oneshot::Receiver<V::Block> {
261        let (tx, rx) = oneshot::channel();
262        self.sender
263            .send_lossy(Message::SubscribeByCommitment {
264                round,
265                commitment,
266                response: tx,
267            })
268            .await;
269        rx
270    }
271
272    /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
273    ///
274    /// If the starting block is not found, `None` is returned.
275    pub async fn ancestry(
276        &self,
277        (start_round, start_digest): (Option<Round>, <V::Block as Digestible>::Digest),
278    ) -> Option<AncestorStream<Self, V::ApplicationBlock>> {
279        self.subscribe_by_digest(start_round, start_digest)
280            .await
281            .await
282            .ok()
283            .map(|block| AncestorStream::new(self.clone(), [V::into_inner(block)]))
284    }
285
286    /// Requests that a proposed block is sent to peers.
287    pub async fn proposed(&self, round: Round, block: V::Block) {
288        self.sender
289            .send_lossy(Message::Proposed { round, block })
290            .await;
291    }
292
293    /// Notifies the actor that a block has been verified.
294    pub async fn verified(&self, round: Round, block: V::Block) {
295        self.sender
296            .send_lossy(Message::Verified { round, block })
297            .await;
298    }
299
300    /// Sets the sync starting point (advances if higher than current).
301    ///
302    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
303    /// the floor is pruned.
304    ///
305    /// To prune data without affecting the sync starting point (say at some trailing depth
306    /// from tip), use [Self::prune] instead.
307    ///
308    /// The default floor is 0.
309    pub async fn set_floor(&self, height: Height) {
310        self.sender.send_lossy(Message::SetFloor { height }).await;
311    }
312
313    /// Prunes finalized blocks and certificates below the given height.
314    ///
315    /// Unlike [Self::set_floor], this does not affect the sync starting point.
316    /// The height must be at or below the current floor (last processed height),
317    /// otherwise the prune request is ignored.
318    ///
319    /// A `prune` request for a height above marshal's current floor is dropped.
320    pub async fn prune(&self, height: Height) {
321        self.sender.send_lossy(Message::Prune { height }).await;
322    }
323
324    /// Forward a block to a set of peers.
325    pub async fn forward(&self, round: Round, commitment: V::Commitment, peers: Vec<S::PublicKey>) {
326        self.sender
327            .send_lossy(Message::Forward {
328                round,
329                commitment,
330                peers,
331            })
332            .await;
333    }
334}
335
336impl<S: Scheme, V: Variant> BlockProvider for Mailbox<S, V> {
337    type Block = V::ApplicationBlock;
338
339    async fn fetch_block(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
340        let subscription = self.subscribe_by_digest(None, digest).await;
341        subscription.await.ok().map(V::into_inner)
342    }
343}
344
345impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
346    type Activity = Activity<S, V::Commitment>;
347
348    async fn report(&mut self, activity: Self::Activity) {
349        let message = match activity {
350            Activity::Notarization(notarization) => Message::Notarization { notarization },
351            Activity::Finalization(finalization) => Message::Finalization { finalization },
352            _ => {
353                // Ignore other activity types
354                return;
355            }
356        };
357        self.sender.send_lossy(message).await;
358    }
359}