commonware_consensus/marshal/ingress/
mailbox.rs

1use crate::{
2    simplex::types::{Activity, Finalization, Notarization},
3    types::{Height, Round},
4    Block, Heightable, Reporter,
5};
6use commonware_cryptography::{certificate::Scheme, Digest};
7use commonware_storage::archive;
8use commonware_utils::{channels::fallible::AsyncFallibleExt, vec::NonEmptyVec};
9use futures::{
10    channel::{mpsc, oneshot},
11    future::BoxFuture,
12    stream::{FuturesOrdered, Stream},
13    FutureExt,
14};
15use pin_project::pin_project;
16use std::{
17    pin::Pin,
18    task::{Context, Poll},
19};
20
21/// An identifier for a block request.
22pub enum Identifier<D: Digest> {
23    /// The height of the block to retrieve.
24    Height(Height),
25    /// The commitment of the block to retrieve.
26    Commitment(D),
27    /// The highest finalized block. It may be the case that marshal does not have some of the
28    /// blocks below this height.
29    Latest,
30}
31
32// Allows using Height directly for convenience.
33impl<D: Digest> From<Height> for Identifier<D> {
34    fn from(src: Height) -> Self {
35        Self::Height(src)
36    }
37}
38
39// Allows using &Digest directly for convenience.
40impl<D: Digest> From<&D> for Identifier<D> {
41    fn from(src: &D) -> Self {
42        Self::Commitment(*src)
43    }
44}
45
46// Allows using archive identifiers directly for convenience.
47impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
48    fn from(src: archive::Identifier<'_, D>) -> Self {
49        match src {
50            archive::Identifier::Index(index) => Self::Height(Height::new(index)),
51            archive::Identifier::Key(key) => Self::Commitment(*key),
52        }
53    }
54}
55
56/// Messages sent to the marshal [Actor](super::super::actor::Actor).
57///
58/// These messages are sent from the consensus engine and other parts of the
59/// system to drive the state of the marshal.
60pub(crate) enum Message<S: Scheme, B: Block> {
61    // -------------------- Application Messages --------------------
62    /// A request to retrieve the (height, commitment) of a block by its identifier.
63    /// The block must be finalized; returns `None` if the block is not finalized.
64    GetInfo {
65        /// The identifier of the block to get the information of.
66        identifier: Identifier<B::Commitment>,
67        /// A channel to send the retrieved (height, commitment).
68        response: oneshot::Sender<Option<(Height, B::Commitment)>>,
69    },
70    /// A request to retrieve a block by its identifier.
71    ///
72    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
73    /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
74    GetBlock {
75        /// The identifier of the block to retrieve.
76        identifier: Identifier<B::Commitment>,
77        /// A channel to send the retrieved block.
78        response: oneshot::Sender<Option<B>>,
79    },
80    /// A request to retrieve a finalization by height.
81    GetFinalization {
82        /// The height of the finalization to retrieve.
83        height: Height,
84        /// A channel to send the retrieved finalization.
85        response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
86    },
87    /// A hint that a finalized block may be available at a given height.
88    ///
89    /// This triggers a network fetch if the finalization is not available locally.
90    /// This is fire-and-forget: the finalization will be stored in marshal and
91    /// delivered via the normal finalization flow when available.
92    ///
93    /// Targets are required because this is typically called when a peer claims to
94    /// be ahead. If a target returns invalid data, the resolver will block them.
95    /// Sending this message multiple times with different targets adds to the
96    /// target set.
97    HintFinalized {
98        /// The height of the finalization to fetch.
99        height: Height,
100        /// Target peers to fetch from. Added to any existing targets for this height.
101        targets: NonEmptyVec<S::PublicKey>,
102    },
103    /// A request to retrieve a block by its commitment.
104    Subscribe {
105        /// The view in which the block was notarized. This is an optimization
106        /// to help locate the block.
107        round: Option<Round>,
108        /// The commitment of the block to retrieve.
109        commitment: B::Commitment,
110        /// A channel to send the retrieved block.
111        response: oneshot::Sender<B>,
112    },
113    /// A request to broadcast a proposed block to all peers.
114    Proposed {
115        /// The round in which the block was proposed.
116        round: Round,
117        /// The block to broadcast.
118        block: B,
119    },
120    /// A notification that a block has been verified by the application.
121    Verified {
122        /// The round in which the block was verified.
123        round: Round,
124        /// The verified block.
125        block: B,
126    },
127    /// Sets the sync starting point (advances if higher than current).
128    ///
129    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data at or
130    /// below the floor is pruned.
131    ///
132    /// To prune data without affecting the sync starting point (say at some trailing depth
133    /// from tip), prune the finalized stores directly.
134    ///
135    /// The default floor is 0.
136    SetFloor {
137        /// The candidate floor height.
138        height: Height,
139    },
140
141    // -------------------- Consensus Engine Messages --------------------
142    /// A notarization from the consensus engine.
143    Notarization {
144        /// The notarization.
145        notarization: Notarization<S, B::Commitment>,
146    },
147    /// A finalization from the consensus engine.
148    Finalization {
149        /// The finalization.
150        finalization: Finalization<S, B::Commitment>,
151    },
152}
153
154/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
155#[derive(Clone)]
156pub struct Mailbox<S: Scheme, B: Block> {
157    sender: mpsc::Sender<Message<S, B>>,
158}
159
160impl<S: Scheme, B: Block> Mailbox<S, B> {
161    /// Creates a new mailbox.
162    pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
163        Self { sender }
164    }
165
166    /// A request to retrieve the information about the highest finalized block.
167    pub async fn get_info(
168        &mut self,
169        identifier: impl Into<Identifier<B::Commitment>>,
170    ) -> Option<(Height, B::Commitment)> {
171        let identifier = identifier.into();
172        self.sender
173            .request(|response| Message::GetInfo {
174                identifier,
175                response,
176            })
177            .await
178            .flatten()
179    }
180
181    /// A best-effort attempt to retrieve a given block from local
182    /// storage. It is not an indication to go fetch the block from the network.
183    pub async fn get_block(
184        &mut self,
185        identifier: impl Into<Identifier<B::Commitment>>,
186    ) -> Option<B> {
187        let identifier = identifier.into();
188        self.sender
189            .request(|response| Message::GetBlock {
190                identifier,
191                response,
192            })
193            .await
194            .flatten()
195    }
196
197    /// A best-effort attempt to retrieve a given [Finalization] from local
198    /// storage. It is not an indication to go fetch the [Finalization] from the network.
199    pub async fn get_finalization(
200        &mut self,
201        height: Height,
202    ) -> Option<Finalization<S, B::Commitment>> {
203        self.sender
204            .request(|response| Message::GetFinalization { height, response })
205            .await
206            .flatten()
207    }
208
209    /// Hints that a finalized block may be available at the given height.
210    ///
211    /// This method will request the finalization from the network via the resolver
212    /// if it is not available locally.
213    ///
214    /// Targets are required because this is typically called when a peer claims to be
215    /// ahead. By targeting only those peers, we limit who we ask. If a target returns
216    /// invalid data, they will be blocked by the resolver. If targets don't respond
217    /// or return "no data", they effectively rate-limit themselves.
218    ///
219    /// Calling this multiple times for the same height with different targets will
220    /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
221    ///
222    /// This is fire-and-forget: the finalization will be stored in marshal and delivered
223    /// via the normal finalization flow when available.
224    pub async fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
225        self.sender
226            .send_lossy(Message::HintFinalized { height, targets })
227            .await;
228    }
229
230    /// A request to retrieve a block by its commitment.
231    ///
232    /// If the block is found available locally, the block will be returned immediately.
233    ///
234    /// If the block is not available locally, the request will be registered and the caller will
235    /// be notified when the block is available. If the block is not finalized, it's possible that
236    /// it may never become available.
237    ///
238    /// The oneshot receiver should be dropped to cancel the subscription.
239    pub async fn subscribe(
240        &mut self,
241        round: Option<Round>,
242        commitment: B::Commitment,
243    ) -> oneshot::Receiver<B> {
244        let (tx, rx) = oneshot::channel();
245        self.sender
246            .send_lossy(Message::Subscribe {
247                round,
248                commitment,
249                response: tx,
250            })
251            .await;
252        rx
253    }
254
255    /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
256    ///
257    /// If the starting block is not found, `None` is returned.
258    pub async fn ancestry(
259        &mut self,
260        (start_round, start_commitment): (Option<Round>, B::Commitment),
261    ) -> Option<AncestorStream<S, B>> {
262        self.subscribe(start_round, start_commitment)
263            .await
264            .await
265            .ok()
266            .map(|block| AncestorStream::new(self.clone(), [block]))
267    }
268
269    /// Proposed requests that a proposed block is sent to all peers.
270    pub async fn proposed(&mut self, round: Round, block: B) {
271        self.sender
272            .send_lossy(Message::Proposed { round, block })
273            .await;
274    }
275
276    /// Notifies the actor that a block has been verified.
277    pub async fn verified(&mut self, round: Round, block: B) {
278        self.sender
279            .send_lossy(Message::Verified { round, block })
280            .await;
281    }
282
283    /// Sets the sync starting point (advances if higher than current).
284    ///
285    /// Marshal will sync and deliver blocks starting at `floor + 1`. Data at or
286    /// below the floor is pruned.
287    ///
288    /// To prune data without affecting the sync starting point (say at some trailing depth
289    /// from tip), prune the finalized stores directly.
290    ///
291    /// The default floor is 0.
292    pub async fn set_floor(&mut self, height: Height) {
293        self.sender.send_lossy(Message::SetFloor { height }).await;
294    }
295}
296
297impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
298    type Activity = Activity<S, B::Commitment>;
299
300    async fn report(&mut self, activity: Self::Activity) {
301        let message = match activity {
302            Activity::Notarization(notarization) => Message::Notarization { notarization },
303            Activity::Finalization(finalization) => Message::Finalization { finalization },
304            _ => {
305                // Ignore other activity types
306                return;
307            }
308        };
309        self.sender.send_lossy(message).await;
310    }
311}
312
313/// Returns a boxed subscription future for a block.
314#[inline]
315fn subscribe_block_future<S: Scheme, B: Block>(
316    mut marshal: Mailbox<S, B>,
317    commitment: B::Commitment,
318) -> BoxFuture<'static, Option<B>> {
319    async move {
320        let receiver = marshal.subscribe(None, commitment).await;
321        receiver.await.ok()
322    }
323    .boxed()
324}
325
326/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
327///
328/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
329/// at block height 0 rather than 1.
330#[pin_project]
331pub struct AncestorStream<S: Scheme, B: Block> {
332    marshal: Mailbox<S, B>,
333    buffered: Vec<B>,
334    #[pin]
335    pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
336}
337
338impl<S: Scheme, B: Block> AncestorStream<S, B> {
339    /// Creates a new [AncestorStream] starting from the given ancestry.
340    ///
341    /// # Panics
342    ///
343    /// Panics if the initial blocks are not contiguous in height.
344    pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
345        let mut buffered = initial.into_iter().collect::<Vec<B>>();
346        buffered.sort_by_key(Heightable::height);
347
348        // Check that the initial blocks are contiguous in height.
349        buffered.windows(2).for_each(|window| {
350            assert_eq!(
351                window[0].height().next(),
352                window[1].height(),
353                "initial blocks must be contiguous in height"
354            );
355        });
356
357        Self {
358            marshal,
359            buffered,
360            pending: FuturesOrdered::new(),
361        }
362    }
363}
364
365impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
366    type Item = B;
367
368    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
369        // Because marshal cannot currently yield the genesis block, we stop at height 1.
370        const END_BOUND: Height = Height::new(1);
371
372        let mut this = self.project();
373
374        // If a result has been buffered, return it and queue the parent fetch if needed.
375        if let Some(block) = this.buffered.pop() {
376            let height = block.height();
377            let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
378            if should_fetch_parent {
379                let parent_commitment = block.parent();
380                let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
381                this.pending.push_back(future);
382
383                // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
384                // buffer it for the next poll.
385                if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
386                    this.buffered.push(block);
387                }
388            }
389
390            return Poll::Ready(Some(block));
391        }
392
393        match this.pending.as_mut().poll_next(cx) {
394            Poll::Pending => Poll::Pending,
395            Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
396            Poll::Ready(Some(Some(block))) => {
397                let height = block.height();
398                let should_fetch_parent = height > END_BOUND;
399                if should_fetch_parent {
400                    let parent_commitment = block.parent();
401                    let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
402                    this.pending.push_back(future);
403
404                    // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
405                    // buffer it for the next poll.
406                    if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
407                        this.buffered.push(block);
408                    }
409                }
410
411                Poll::Ready(Some(block))
412            }
413        }
414    }
415}