Skip to main content

commonware_consensus/marshal/ingress/
mailbox.rs

1use crate::{
2    simplex::types::{Activity, Finalization, Notarization},
3    types::{Height, Round},
4    Block, Heightable, Reporter,
5};
6use commonware_cryptography::{certificate::Scheme, Digest};
7use commonware_storage::archive;
8use commonware_utils::{
9    channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
10    vec::NonEmptyVec,
11};
12use futures::{
13    future::BoxFuture,
14    stream::{FuturesOrdered, Stream},
15    FutureExt,
16};
17use pin_project::pin_project;
18use std::{
19    pin::Pin,
20    task::{Context, Poll},
21};
22
23/// An identifier for a block request.
24pub enum Identifier<D: Digest> {
25    /// The height of the block to retrieve.
26    Height(Height),
27    /// The commitment of the block to retrieve.
28    Commitment(D),
29    /// The highest finalized block. It may be the case that marshal does not have some of the
30    /// blocks below this height.
31    Latest,
32}
33
34// Allows using Height directly for convenience.
35impl<D: Digest> From<Height> for Identifier<D> {
36    fn from(src: Height) -> Self {
37        Self::Height(src)
38    }
39}
40
41// Allows using &Digest directly for convenience.
42impl<D: Digest> From<&D> for Identifier<D> {
43    fn from(src: &D) -> Self {
44        Self::Commitment(*src)
45    }
46}
47
48// Allows using archive identifiers directly for convenience.
49impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
50    fn from(src: archive::Identifier<'_, D>) -> Self {
51        match src {
52            archive::Identifier::Index(index) => Self::Height(Height::new(index)),
53            archive::Identifier::Key(key) => Self::Commitment(*key),
54        }
55    }
56}
57
58/// Messages sent to the marshal [Actor](super::super::actor::Actor).
59///
60/// These messages are sent from the consensus engine and other parts of the
61/// system to drive the state of the marshal.
62pub(crate) enum Message<S: Scheme, B: Block> {
63    // -------------------- Application Messages --------------------
64    /// A request to retrieve the (height, commitment) of a block by its identifier.
65    /// The block must be finalized; returns `None` if the block is not finalized.
66    GetInfo {
67        /// The identifier of the block to get the information of.
68        identifier: Identifier<B::Commitment>,
69        /// A channel to send the retrieved (height, commitment).
70        response: oneshot::Sender<Option<(Height, B::Commitment)>>,
71    },
72    /// A request to retrieve a block by its identifier.
73    ///
74    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
75    /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
76    GetBlock {
77        /// The identifier of the block to retrieve.
78        identifier: Identifier<B::Commitment>,
79        /// A channel to send the retrieved block.
80        response: oneshot::Sender<Option<B>>,
81    },
82    /// A request to retrieve a finalization by height.
83    GetFinalization {
84        /// The height of the finalization to retrieve.
85        height: Height,
86        /// A channel to send the retrieved finalization.
87        response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
88    },
89    /// A hint that a finalized block may be available at a given height.
90    ///
91    /// This triggers a network fetch if the finalization is not available locally.
92    /// This is fire-and-forget: the finalization will be stored in marshal and
93    /// delivered via the normal finalization flow when available.
94    ///
95    /// Targets are required because this is typically called when a peer claims to
96    /// be ahead. If a target returns invalid data, the resolver will block them.
97    /// Sending this message multiple times with different targets adds to the
98    /// target set.
99    HintFinalized {
100        /// The height of the finalization to fetch.
101        height: Height,
102        /// Target peers to fetch from. Added to any existing targets for this height.
103        targets: NonEmptyVec<S::PublicKey>,
104    },
105    /// A request to retrieve a block by its commitment.
106    Subscribe {
107        /// The view in which the block was notarized. This is an optimization
108        /// to help locate the block.
109        round: Option<Round>,
110        /// The commitment of the block to retrieve.
111        commitment: B::Commitment,
112        /// A channel to send the retrieved block.
113        response: oneshot::Sender<B>,
114    },
115    /// A request to broadcast a proposed block to all peers.
116    Proposed {
117        /// The round in which the block was proposed.
118        round: Round,
119        /// The block to broadcast.
120        block: B,
121    },
122    /// A notification that a block has been verified by the application.
123    Verified {
124        /// The round in which the block was verified.
125        round: Round,
126        /// The verified block.
127        block: B,
128    },
129    /// Sets the sync starting point (advances if higher than current).
130    ///
131    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
132    /// the floor is pruned.
133    ///
134    /// To prune data without affecting the sync starting point (say at some trailing depth
135    /// from tip), use [Message::Prune] instead.
136    ///
137    /// The default floor is 0.
138    SetFloor {
139        /// The candidate floor height.
140        height: Height,
141    },
142    /// Prunes finalized blocks and certificates below the given height.
143    ///
144    /// Unlike [Message::SetFloor], this does not affect the sync starting point.
145    /// The height must be at or below the current floor (last processed height),
146    /// otherwise the prune request is ignored.
147    Prune {
148        /// The minimum height to keep (blocks below this are pruned).
149        height: Height,
150    },
151
152    // -------------------- Consensus Engine Messages --------------------
153    /// A notarization from the consensus engine.
154    Notarization {
155        /// The notarization.
156        notarization: Notarization<S, B::Commitment>,
157    },
158    /// A finalization from the consensus engine.
159    Finalization {
160        /// The finalization.
161        finalization: Finalization<S, B::Commitment>,
162    },
163}
164
165/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
166#[derive(Clone)]
167pub struct Mailbox<S: Scheme, B: Block> {
168    sender: mpsc::Sender<Message<S, B>>,
169}
170
171impl<S: Scheme, B: Block> Mailbox<S, B> {
172    /// Creates a new mailbox.
173    pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
174        Self { sender }
175    }
176
177    /// A request to retrieve the information about the highest finalized block.
178    pub async fn get_info(
179        &mut self,
180        identifier: impl Into<Identifier<B::Commitment>>,
181    ) -> Option<(Height, B::Commitment)> {
182        let identifier = identifier.into();
183        self.sender
184            .request(|response| Message::GetInfo {
185                identifier,
186                response,
187            })
188            .await
189            .flatten()
190    }
191
192    /// A best-effort attempt to retrieve a given block from local
193    /// storage. It is not an indication to go fetch the block from the network.
194    pub async fn get_block(
195        &mut self,
196        identifier: impl Into<Identifier<B::Commitment>>,
197    ) -> Option<B> {
198        let identifier = identifier.into();
199        self.sender
200            .request(|response| Message::GetBlock {
201                identifier,
202                response,
203            })
204            .await
205            .flatten()
206    }
207
208    /// A best-effort attempt to retrieve a given [Finalization] from local
209    /// storage. It is not an indication to go fetch the [Finalization] from the network.
210    pub async fn get_finalization(
211        &mut self,
212        height: Height,
213    ) -> Option<Finalization<S, B::Commitment>> {
214        self.sender
215            .request(|response| Message::GetFinalization { height, response })
216            .await
217            .flatten()
218    }
219
220    /// Hints that a finalized block may be available at the given height.
221    ///
222    /// This method will request the finalization from the network via the resolver
223    /// if it is not available locally.
224    ///
225    /// Targets are required because this is typically called when a peer claims to be
226    /// ahead. By targeting only those peers, we limit who we ask. If a target returns
227    /// invalid data, they will be blocked by the resolver. If targets don't respond
228    /// or return "no data", they effectively rate-limit themselves.
229    ///
230    /// Calling this multiple times for the same height with different targets will
231    /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
232    ///
233    /// This is fire-and-forget: the finalization will be stored in marshal and delivered
234    /// via the normal finalization flow when available.
235    pub async fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
236        self.sender
237            .send_lossy(Message::HintFinalized { height, targets })
238            .await;
239    }
240
241    /// A request to retrieve a block by its commitment.
242    ///
243    /// If the block is found available locally, the block will be returned immediately.
244    ///
245    /// If the block is not available locally, the request will be registered and the caller will
246    /// be notified when the block is available. If the block is not finalized, it's possible that
247    /// it may never become available.
248    ///
249    /// The oneshot receiver should be dropped to cancel the subscription.
250    pub async fn subscribe(
251        &mut self,
252        round: Option<Round>,
253        commitment: B::Commitment,
254    ) -> oneshot::Receiver<B> {
255        let (tx, rx) = oneshot::channel();
256        self.sender
257            .send_lossy(Message::Subscribe {
258                round,
259                commitment,
260                response: tx,
261            })
262            .await;
263        rx
264    }
265
266    /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
267    ///
268    /// If the starting block is not found, `None` is returned.
269    pub async fn ancestry(
270        &mut self,
271        (start_round, start_commitment): (Option<Round>, B::Commitment),
272    ) -> Option<AncestorStream<S, B>> {
273        self.subscribe(start_round, start_commitment)
274            .await
275            .await
276            .ok()
277            .map(|block| AncestorStream::new(self.clone(), [block]))
278    }
279
280    /// Proposed requests that a proposed block is sent to all peers.
281    pub async fn proposed(&mut self, round: Round, block: B) {
282        self.sender
283            .send_lossy(Message::Proposed { round, block })
284            .await;
285    }
286
287    /// Notifies the actor that a block has been verified.
288    pub async fn verified(&mut self, round: Round, block: B) {
289        self.sender
290            .send_lossy(Message::Verified { round, block })
291            .await;
292    }
293
294    /// Sets the sync starting point (advances if higher than current).
295    ///
296    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
297    /// the floor is pruned.
298    ///
299    /// To prune data without affecting the sync starting point (say at some trailing depth
300    /// from tip), use [Self::prune] instead.
301    ///
302    /// The default floor is 0.
303    pub async fn set_floor(&mut self, height: Height) {
304        self.sender.send_lossy(Message::SetFloor { height }).await;
305    }
306
307    /// Prunes finalized blocks and certificates below the given height.
308    ///
309    /// Unlike [Self::set_floor], this does not affect the sync starting point.
310    /// The height must be at or below the current floor (last processed height),
311    /// otherwise the prune request is ignored.
312    pub async fn prune(&mut self, height: Height) {
313        self.sender.send_lossy(Message::Prune { height }).await;
314    }
315}
316
317impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
318    type Activity = Activity<S, B::Commitment>;
319
320    async fn report(&mut self, activity: Self::Activity) {
321        let message = match activity {
322            Activity::Notarization(notarization) => Message::Notarization { notarization },
323            Activity::Finalization(finalization) => Message::Finalization { finalization },
324            _ => {
325                // Ignore other activity types
326                return;
327            }
328        };
329        self.sender.send_lossy(message).await;
330    }
331}
332
333/// Returns a boxed subscription future for a block.
334#[inline]
335fn subscribe_block_future<S: Scheme, B: Block>(
336    mut marshal: Mailbox<S, B>,
337    commitment: B::Commitment,
338) -> BoxFuture<'static, Option<B>> {
339    async move {
340        let receiver = marshal.subscribe(None, commitment).await;
341        receiver.await.ok()
342    }
343    .boxed()
344}
345
346/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
347///
348/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
349/// at block height 0 rather than 1.
350#[pin_project]
351pub struct AncestorStream<S: Scheme, B: Block> {
352    marshal: Mailbox<S, B>,
353    buffered: Vec<B>,
354    #[pin]
355    pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
356}
357
358impl<S: Scheme, B: Block> AncestorStream<S, B> {
359    /// Creates a new [AncestorStream] starting from the given ancestry.
360    ///
361    /// # Panics
362    ///
363    /// Panics if the initial blocks are not contiguous in height.
364    pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
365        let mut buffered = initial.into_iter().collect::<Vec<B>>();
366        buffered.sort_by_key(Heightable::height);
367
368        // Check that the initial blocks are contiguous in height.
369        buffered.windows(2).for_each(|window| {
370            assert_eq!(
371                window[0].height().next(),
372                window[1].height(),
373                "initial blocks must be contiguous in height"
374            );
375        });
376
377        Self {
378            marshal,
379            buffered,
380            pending: FuturesOrdered::new(),
381        }
382    }
383}
384
385impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
386    type Item = B;
387
388    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
389        // Because marshal cannot currently yield the genesis block, we stop at height 1.
390        const END_BOUND: Height = Height::new(1);
391
392        let mut this = self.project();
393
394        // If a result has been buffered, return it and queue the parent fetch if needed.
395        if let Some(block) = this.buffered.pop() {
396            let height = block.height();
397            let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
398            if should_fetch_parent {
399                let parent_commitment = block.parent();
400                let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
401                this.pending.push_back(future);
402
403                // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
404                // buffer it for the next poll.
405                if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
406                    this.buffered.push(block);
407                }
408            }
409
410            return Poll::Ready(Some(block));
411        }
412
413        match this.pending.as_mut().poll_next(cx) {
414            Poll::Pending => Poll::Pending,
415            Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
416            Poll::Ready(Some(Some(block))) => {
417                let height = block.height();
418                let should_fetch_parent = height > END_BOUND;
419                if should_fetch_parent {
420                    let parent_commitment = block.parent();
421                    let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
422                    this.pending.push_back(future);
423
424                    // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
425                    // buffer it for the next poll.
426                    if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
427                        this.buffered.push(block);
428                    }
429                }
430
431                Poll::Ready(Some(block))
432            }
433        }
434    }
435}