Skip to main content

commonware_consensus/marshal/core/
mailbox.rs

1use super::Variant;
2use crate::{
3    marshal::{
4        ancestry::{AncestorStream, BlockProvider},
5        Identifier,
6    },
7    simplex::types::{Activity, Finalization, Notarization},
8    types::{Height, Round},
9    Reporter,
10};
11use commonware_cryptography::{certificate::Scheme, Digestible};
12use commonware_utils::{
13    channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
14    vec::NonEmptyVec,
15};
16
17/// Messages sent to the marshal [Actor](super::Actor).
18///
19/// These messages are sent from the consensus engine and other parts of the
20/// system to drive the state of the marshal.
21pub(crate) enum Message<S: Scheme, V: Variant> {
22    /// A request to retrieve the `(height, digest)` of a block by its identifier.
23    /// The block must be finalized; returns `None` if the block is not finalized.
24    GetInfo {
25        /// The identifier of the block to get the information of.
26        identifier: Identifier<<V::Block as Digestible>::Digest>,
27        /// A channel to send the retrieved `(height, digest)`.
28        response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
29    },
30    /// A request to retrieve a block by its identifier.
31    ///
32    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
33    /// blocks, whereas requesting by [Identifier::Digest] may return non-finalized
34    /// or even unverified blocks.
35    GetBlock {
36        /// The identifier of the block to retrieve.
37        identifier: Identifier<<V::Block as Digestible>::Digest>,
38        /// A channel to send the retrieved block.
39        response: oneshot::Sender<Option<V::Block>>,
40    },
41    /// A request to retrieve a finalization by height.
42    GetFinalization {
43        /// The height of the finalization to retrieve.
44        height: Height,
45        /// A channel to send the retrieved finalization.
46        response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
47    },
48    /// A hint that a finalized block may be available at a given height.
49    ///
50    /// This triggers a network fetch if the finalization is not available locally.
51    /// This is fire-and-forget: the finalization will be stored in marshal and
52    /// delivered via the normal finalization flow when available.
53    ///
54    /// The height must be covered by both the epocher and the provider. If the
55    /// epocher cannot map the height to an epoch, or the provider cannot supply
56    /// a scheme for that epoch, the hint is silently dropped.
57    ///
58    /// Targets are required because this is typically called when a peer claims to
59    /// be ahead. If a target returns invalid data, the resolver will block them.
60    /// Sending this message multiple times with different targets adds to the
61    /// target set.
62    HintFinalized {
63        /// The height of the finalization to fetch.
64        height: Height,
65        /// Target peers to fetch from. Added to any existing targets for this height.
66        targets: NonEmptyVec<S::PublicKey>,
67    },
68    /// A request to subscribe to a block by its digest.
69    SubscribeByDigest {
70        /// The round in which the block was notarized. This is an optimization
71        /// to help locate the block.
72        round: Option<Round>,
73        /// The digest of the block to retrieve.
74        digest: <V::Block as Digestible>::Digest,
75        /// A channel to send the retrieved block.
76        response: oneshot::Sender<V::Block>,
77    },
78    /// A request to subscribe to a block by its commitment.
79    SubscribeByCommitment {
80        /// The round in which the block was notarized. This is an optimization
81        /// to help locate the block.
82        round: Option<Round>,
83        /// The commitment of the block to retrieve.
84        commitment: V::Commitment,
85        /// A channel to send the retrieved block.
86        response: oneshot::Sender<V::Block>,
87    },
88    /// A request to broadcast a proposed block to peers.
89    Proposed {
90        /// The round in which the block was proposed.
91        round: Round,
92        /// The block to broadcast.
93        block: V::Block,
94    },
95    /// A notification that a block has been verified by the application.
96    Verified {
97        /// The round in which the block was verified.
98        round: Round,
99        /// The verified block.
100        block: V::Block,
101    },
102    /// Sets the sync starting point (advances if higher than current).
103    ///
104    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
105    /// the floor is pruned.
106    ///
107    /// To prune data without affecting the sync starting point (say at some trailing depth
108    /// from tip), use [Message::Prune] instead.
109    ///
110    /// The default floor is 0.
111    SetFloor {
112        /// The candidate floor height.
113        height: Height,
114    },
115    /// Prunes finalized blocks and certificates below the given height.
116    ///
117    /// Unlike [Message::SetFloor], this does not affect the sync starting point.
118    /// The height must be at or below the current floor (last processed height),
119    /// otherwise the prune request is ignored.
120    Prune {
121        /// The minimum height to keep (blocks below this are pruned).
122        height: Height,
123    },
124    /// A notarization from the consensus engine.
125    Notarization {
126        /// The notarization.
127        notarization: Notarization<S, V::Commitment>,
128    },
129    /// A finalization from the consensus engine.
130    Finalization {
131        /// The finalization.
132        finalization: Finalization<S, V::Commitment>,
133    },
134}
135
136/// A mailbox for sending messages to the marshal [Actor](super::Actor).
137#[derive(Clone)]
138pub struct Mailbox<S: Scheme, V: Variant> {
139    sender: mpsc::Sender<Message<S, V>>,
140}
141
142impl<S: Scheme, V: Variant> Mailbox<S, V> {
143    /// Creates a new mailbox.
144    pub(crate) const fn new(sender: mpsc::Sender<Message<S, V>>) -> Self {
145        Self { sender }
146    }
147
148    /// A request to retrieve the information about the highest finalized block.
149    pub async fn get_info(
150        &self,
151        identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
152    ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
153        let identifier = identifier.into();
154        self.sender
155            .request(|response| Message::GetInfo {
156                identifier,
157                response,
158            })
159            .await
160            .flatten()
161    }
162
163    /// A best-effort attempt to retrieve a given block from local
164    /// storage. It is not an indication to go fetch the block from the network.
165    pub async fn get_block(
166        &self,
167        identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
168    ) -> Option<V::Block> {
169        let identifier = identifier.into();
170        self.sender
171            .request(|response| Message::GetBlock {
172                identifier,
173                response,
174            })
175            .await
176            .flatten()
177    }
178
179    /// A best-effort attempt to retrieve a given [Finalization] from local
180    /// storage. It is not an indication to go fetch the [Finalization] from the network.
181    pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
182        self.sender
183            .request(|response| Message::GetFinalization { height, response })
184            .await
185            .flatten()
186    }
187
188    /// Hints that a finalized block may be available at the given height.
189    ///
190    /// This method will request the finalization from the network via the resolver
191    /// if it is not available locally.
192    ///
193    /// Targets are required because this is typically called when a peer claims to be
194    /// ahead. By targeting only those peers, we limit who we ask. If a target returns
195    /// invalid data, they will be blocked by the resolver. If targets don't respond
196    /// or return "no data", they effectively rate-limit themselves.
197    ///
198    /// Calling this multiple times for the same height with different targets will
199    /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
200    ///
201    /// This is fire-and-forget: the finalization will be stored in marshal and delivered
202    /// via the normal finalization flow when available.
203    ///
204    /// The height must be covered by both the epocher and the provider. If the
205    /// epocher cannot map the height to an epoch, or the provider cannot supply
206    /// a scheme for that epoch, the hint is silently dropped.
207    pub async fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
208        self.sender
209            .send_lossy(Message::HintFinalized { height, targets })
210            .await;
211    }
212
213    /// Subscribe to a block by its digest.
214    ///
215    /// If the block is found available locally, the block will be returned immediately.
216    ///
217    /// If the block is not available locally, the request will be registered and the caller will
218    /// be notified when the block is available. If the block is not finalized, it's possible that
219    /// it may never become available.
220    ///
221    /// The oneshot receiver should be dropped to cancel the subscription.
222    pub async fn subscribe_by_digest(
223        &self,
224        round: Option<Round>,
225        digest: <V::Block as Digestible>::Digest,
226    ) -> oneshot::Receiver<V::Block> {
227        let (tx, rx) = oneshot::channel();
228        self.sender
229            .send_lossy(Message::SubscribeByDigest {
230                round,
231                digest,
232                response: tx,
233            })
234            .await;
235        rx
236    }
237
238    /// Subscribe to a block by its commitment.
239    ///
240    /// If the block is found available locally, the block will be returned immediately.
241    ///
242    /// If the block is not available locally, the request will be registered and the caller will
243    /// be notified when the block is available. If the block is not finalized, it's possible that
244    /// it may never become available.
245    ///
246    /// The oneshot receiver should be dropped to cancel the subscription.
247    pub async fn subscribe_by_commitment(
248        &self,
249        round: Option<Round>,
250        commitment: V::Commitment,
251    ) -> oneshot::Receiver<V::Block> {
252        let (tx, rx) = oneshot::channel();
253        self.sender
254            .send_lossy(Message::SubscribeByCommitment {
255                round,
256                commitment,
257                response: tx,
258            })
259            .await;
260        rx
261    }
262
263    /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
264    ///
265    /// If the starting block is not found, `None` is returned.
266    pub async fn ancestry(
267        &self,
268        (start_round, start_digest): (Option<Round>, <V::Block as Digestible>::Digest),
269    ) -> Option<AncestorStream<Self, V::ApplicationBlock>> {
270        self.subscribe_by_digest(start_round, start_digest)
271            .await
272            .await
273            .ok()
274            .map(|block| AncestorStream::new(self.clone(), [V::into_inner(block)]))
275    }
276
277    /// Requests that a proposed block is sent to peers.
278    pub async fn proposed(&self, round: Round, block: V::Block) {
279        self.sender
280            .send_lossy(Message::Proposed { round, block })
281            .await;
282    }
283
284    /// Notifies the actor that a block has been verified.
285    pub async fn verified(&self, round: Round, block: V::Block) {
286        self.sender
287            .send_lossy(Message::Verified { round, block })
288            .await;
289    }
290
291    /// Sets the sync starting point (advances if higher than current).
292    ///
293    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
294    /// the floor is pruned.
295    ///
296    /// To prune data without affecting the sync starting point (say at some trailing depth
297    /// from tip), use [Self::prune] instead.
298    ///
299    /// The default floor is 0.
300    pub async fn set_floor(&self, height: Height) {
301        self.sender.send_lossy(Message::SetFloor { height }).await;
302    }
303
304    /// Prunes finalized blocks and certificates below the given height.
305    ///
306    /// Unlike [Self::set_floor], this does not affect the sync starting point.
307    /// The height must be at or below the current floor (last processed height),
308    /// otherwise the prune request is ignored.
309    pub async fn prune(&self, height: Height) {
310        self.sender.send_lossy(Message::Prune { height }).await;
311    }
312}
313
314impl<S: Scheme, V: Variant> BlockProvider for Mailbox<S, V> {
315    type Block = V::ApplicationBlock;
316
317    async fn fetch_block(self, digest: <V::Block as Digestible>::Digest) -> Option<Self::Block> {
318        let subscription = self.subscribe_by_digest(None, digest).await;
319        subscription.await.ok().map(V::into_inner)
320    }
321}
322
323impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
324    type Activity = Activity<S, V::Commitment>;
325
326    async fn report(&mut self, activity: Self::Activity) {
327        let message = match activity {
328            Activity::Notarization(notarization) => Message::Notarization { notarization },
329            Activity::Finalization(finalization) => Message::Finalization { finalization },
330            _ => {
331                // Ignore other activity types
332                return;
333            }
334        };
335        self.sender.send_lossy(message).await;
336    }
337}