commonware_consensus/marshal/ingress/
mailbox.rs

1use crate::{
2    simplex::types::{Activity, Finalization, Notarization},
3    types::Round,
4    Block, Reporter,
5};
6use commonware_cryptography::{certificate::Scheme, Digest};
7use commonware_storage::archive;
8use commonware_utils::vec::NonEmptyVec;
9use futures::{
10    channel::{mpsc, oneshot},
11    future::BoxFuture,
12    stream::{FuturesOrdered, Stream},
13    FutureExt, SinkExt,
14};
15use pin_project::pin_project;
16use std::{
17    pin::Pin,
18    task::{Context, Poll},
19};
20use tracing::error;
21
22/// An identifier for a block request.
23pub enum Identifier<D: Digest> {
24    /// The height of the block to retrieve.
25    Height(u64),
26    /// The commitment of the block to retrieve.
27    Commitment(D),
28    /// The highest finalized block. It may be the case that marshal does not have some of the
29    /// blocks below this height.
30    Latest,
31}
32
33// Allows using u64 directly for convenience.
34impl<D: Digest> From<u64> for Identifier<D> {
35    fn from(src: u64) -> Self {
36        Self::Height(src)
37    }
38}
39
40// Allows using &Digest directly for convenience.
41impl<D: Digest> From<&D> for Identifier<D> {
42    fn from(src: &D) -> Self {
43        Self::Commitment(*src)
44    }
45}
46
47// Allows using archive identifiers directly for convenience.
48impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
49    fn from(src: archive::Identifier<'_, D>) -> Self {
50        match src {
51            archive::Identifier::Index(index) => Self::Height(index),
52            archive::Identifier::Key(key) => Self::Commitment(*key),
53        }
54    }
55}
56
57/// Messages sent to the marshal [Actor](super::super::actor::Actor).
58///
59/// These messages are sent from the consensus engine and other parts of the
60/// system to drive the state of the marshal.
61pub(crate) enum Message<S: Scheme, B: Block> {
62    // -------------------- Application Messages --------------------
63    /// A request to retrieve the (height, commitment) of a block by its identifier.
64    /// The block must be finalized; returns `None` if the block is not finalized.
65    GetInfo {
66        /// The identifier of the block to get the information of.
67        identifier: Identifier<B::Commitment>,
68        /// A channel to send the retrieved (height, commitment).
69        response: oneshot::Sender<Option<(u64, B::Commitment)>>,
70    },
71    /// A request to retrieve a block by its identifier.
72    ///
73    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
74    /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
75    GetBlock {
76        /// The identifier of the block to retrieve.
77        identifier: Identifier<B::Commitment>,
78        /// A channel to send the retrieved block.
79        response: oneshot::Sender<Option<B>>,
80    },
81    /// A request to retrieve a finalization by height.
82    GetFinalization {
83        /// The height of the finalization to retrieve.
84        height: u64,
85        /// A channel to send the retrieved finalization.
86        response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
87    },
88    /// A hint that a finalized block may be available at a given height.
89    ///
90    /// This triggers a network fetch if the finalization is not available locally.
91    /// This is fire-and-forget: the finalization will be stored in marshal and
92    /// delivered via the normal finalization flow when available.
93    ///
94    /// Targets are required because this is typically called when a peer claims to
95    /// be ahead. If a target returns invalid data, the resolver will block them.
96    /// Sending this message multiple times with different targets adds to the
97    /// target set.
98    HintFinalized {
99        /// The height of the finalization to fetch.
100        height: u64,
101        /// Target peers to fetch from. Added to any existing targets for this height.
102        targets: NonEmptyVec<S::PublicKey>,
103    },
104    /// A request to retrieve a block by its commitment.
105    Subscribe {
106        /// The view in which the block was notarized. This is an optimization
107        /// to help locate the block.
108        round: Option<Round>,
109        /// The commitment of the block to retrieve.
110        commitment: B::Commitment,
111        /// A channel to send the retrieved block.
112        response: oneshot::Sender<B>,
113    },
114    /// A request to broadcast a proposed block to all peers.
115    Proposed {
116        /// The round in which the block was proposed.
117        round: Round,
118        /// The block to broadcast.
119        block: B,
120    },
121    /// A notification that a block has been verified by the application.
122    Verified {
123        /// The round in which the block was verified.
124        round: Round,
125        /// The verified block.
126        block: B,
127    },
128    /// A request to set the sync floor.
129    ///
130    /// The sync floor is the latest block that the application has processed. Marshal
131    /// will not attempt to sync blocks below this height nor deliver blocks below
132    /// this height to the application.
133    ///
134    /// This sets the sync floor only if the provided height is higher than the
135    /// previously recorded floor.
136    ///
137    /// The default sync floor is height 0.
138    SetFloor {
139        /// The candidate sync floor height.
140        height: u64,
141    },
142
143    // -------------------- Consensus Engine Messages --------------------
144    /// A notarization from the consensus engine.
145    Notarization {
146        /// The notarization.
147        notarization: Notarization<S, B::Commitment>,
148    },
149    /// A finalization from the consensus engine.
150    Finalization {
151        /// The finalization.
152        finalization: Finalization<S, B::Commitment>,
153    },
154}
155
156/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
157#[derive(Clone)]
158pub struct Mailbox<S: Scheme, B: Block> {
159    sender: mpsc::Sender<Message<S, B>>,
160}
161
162impl<S: Scheme, B: Block> Mailbox<S, B> {
163    /// Creates a new mailbox.
164    pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
165        Self { sender }
166    }
167
168    /// A request to retrieve the information about the highest finalized block.
169    pub async fn get_info(
170        &mut self,
171        identifier: impl Into<Identifier<B::Commitment>>,
172    ) -> Option<(u64, B::Commitment)> {
173        let (tx, rx) = oneshot::channel();
174        if self
175            .sender
176            .send(Message::GetInfo {
177                identifier: identifier.into(),
178                response: tx,
179            })
180            .await
181            .is_err()
182        {
183            error!("failed to send get info message to actor: receiver dropped");
184        }
185        rx.await.unwrap_or_else(|_| {
186            error!("failed to get block info: receiver dropped");
187            None
188        })
189    }
190
191    /// A best-effort attempt to retrieve a given block from local
192    /// storage. It is not an indication to go fetch the block from the network.
193    pub async fn get_block(
194        &mut self,
195        identifier: impl Into<Identifier<B::Commitment>>,
196    ) -> Option<B> {
197        let (tx, rx) = oneshot::channel();
198        if self
199            .sender
200            .send(Message::GetBlock {
201                identifier: identifier.into(),
202                response: tx,
203            })
204            .await
205            .is_err()
206        {
207            error!("failed to send get block message to actor: receiver dropped");
208        }
209        rx.await.unwrap_or_else(|_| {
210            error!("failed to get block: receiver dropped");
211            None
212        })
213    }
214
215    /// A best-effort attempt to retrieve a given [Finalization] from local
216    /// storage. It is not an indication to go fetch the [Finalization] from the network.
217    pub async fn get_finalization(
218        &mut self,
219        height: u64,
220    ) -> Option<Finalization<S, B::Commitment>> {
221        let (tx, rx) = oneshot::channel();
222        if self
223            .sender
224            .send(Message::GetFinalization {
225                height,
226                response: tx,
227            })
228            .await
229            .is_err()
230        {
231            error!("failed to send get finalization message to actor: receiver dropped");
232        }
233        rx.await.unwrap_or_else(|_| {
234            error!("failed to get finalization: receiver dropped");
235            None
236        })
237    }
238
239    /// Hints that a finalized block may be available at the given height.
240    ///
241    /// This method will request the finalization from the network via the resolver
242    /// if it is not available locally.
243    ///
244    /// Targets are required because this is typically called when a peer claims to be
245    /// ahead. By targeting only those peers, we limit who we ask. If a target returns
246    /// invalid data, they will be blocked by the resolver. If targets don't respond
247    /// or return "no data", they effectively rate-limit themselves.
248    ///
249    /// Calling this multiple times for the same height with different targets will
250    /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
251    ///
252    /// This is fire-and-forget: the finalization will be stored in marshal and delivered
253    /// via the normal finalization flow when available.
254    pub async fn hint_finalized(&mut self, height: u64, targets: NonEmptyVec<S::PublicKey>) {
255        if self
256            .sender
257            .send(Message::HintFinalized { height, targets })
258            .await
259            .is_err()
260        {
261            error!("failed to send hint finalized message to actor: receiver dropped");
262        }
263    }
264
265    /// A request to retrieve a block by its commitment.
266    ///
267    /// If the block is found available locally, the block will be returned immediately.
268    ///
269    /// If the block is not available locally, the request will be registered and the caller will
270    /// be notified when the block is available. If the block is not finalized, it's possible that
271    /// it may never become available.
272    ///
273    /// The oneshot receiver should be dropped to cancel the subscription.
274    pub async fn subscribe(
275        &mut self,
276        round: Option<Round>,
277        commitment: B::Commitment,
278    ) -> oneshot::Receiver<B> {
279        let (tx, rx) = oneshot::channel();
280        if self
281            .sender
282            .send(Message::Subscribe {
283                round,
284                commitment,
285                response: tx,
286            })
287            .await
288            .is_err()
289        {
290            error!("failed to send subscribe message to actor: receiver dropped");
291        }
292        rx
293    }
294
295    /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
296    ///
297    /// If the starting block is not found, `None` is returned.
298    pub async fn ancestry(
299        &mut self,
300        (start_round, start_commitment): (Option<Round>, B::Commitment),
301    ) -> Option<AncestorStream<S, B>> {
302        self.subscribe(start_round, start_commitment)
303            .await
304            .await
305            .ok()
306            .map(|block| AncestorStream::new(self.clone(), [block]))
307    }
308
309    /// Proposed requests that a proposed block is sent to all peers.
310    pub async fn proposed(&mut self, round: Round, block: B) {
311        if self
312            .sender
313            .send(Message::Proposed { round, block })
314            .await
315            .is_err()
316        {
317            error!("failed to send proposed message to actor: receiver dropped");
318        }
319    }
320
321    /// Notifies the actor that a block has been verified.
322    pub async fn verified(&mut self, round: Round, block: B) {
323        if self
324            .sender
325            .send(Message::Verified { round, block })
326            .await
327            .is_err()
328        {
329            error!("failed to send verified message to actor: receiver dropped");
330        }
331    }
332
333    /// A request to set the sync floor (conditionally advances if higher).
334    ///
335    /// The sync floor is the latest block that the application has processed. Marshal
336    /// will not attempt to sync blocks below this height nor deliver blocks below
337    /// this height to the application.
338    ///
339    /// The default sync floor is height 0.
340    pub async fn set_floor(&mut self, height: u64) {
341        if self
342            .sender
343            .send(Message::SetFloor { height })
344            .await
345            .is_err()
346        {
347            error!("failed to send set sync floor message to actor: receiver dropped");
348        }
349    }
350}
351
352impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
353    type Activity = Activity<S, B::Commitment>;
354
355    async fn report(&mut self, activity: Self::Activity) {
356        let message = match activity {
357            Activity::Notarization(notarization) => Message::Notarization { notarization },
358            Activity::Finalization(finalization) => Message::Finalization { finalization },
359            _ => {
360                // Ignore other activity types
361                return;
362            }
363        };
364        if self.sender.send(message).await.is_err() {
365            error!("failed to report activity to actor: receiver dropped");
366        }
367    }
368}
369
370/// Returns a boxed subscription future for a block.
371#[inline]
372fn subscribe_block_future<S: Scheme, B: Block>(
373    mut marshal: Mailbox<S, B>,
374    commitment: B::Commitment,
375) -> BoxFuture<'static, Option<B>> {
376    async move {
377        let receiver = marshal.subscribe(None, commitment).await;
378        receiver.await.ok()
379    }
380    .boxed()
381}
382
383/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
384///
385/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
386/// at block height 0 rather than 1.
387#[pin_project]
388pub struct AncestorStream<S: Scheme, B: Block> {
389    marshal: Mailbox<S, B>,
390    buffered: Vec<B>,
391    #[pin]
392    pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
393}
394
395impl<S: Scheme, B: Block> AncestorStream<S, B> {
396    /// Creates a new [AncestorStream] starting from the given ancestry.
397    ///
398    /// # Panics
399    ///
400    /// Panics if the initial blocks are not contiguous in height.
401    pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
402        let mut buffered = initial.into_iter().collect::<Vec<B>>();
403        buffered.sort_by_key(Block::height);
404
405        // Check that the initial blocks are contiguous in height.
406        buffered.windows(2).for_each(|window| {
407            assert_eq!(
408                window[0].height() + 1,
409                window[1].height(),
410                "initial blocks must be contiguous in height"
411            );
412        });
413
414        Self {
415            marshal,
416            buffered,
417            pending: FuturesOrdered::new(),
418        }
419    }
420}
421
422impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
423    type Item = B;
424
425    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
426        // Because marshal cannot currently yield the genesis block, we stop at height 1.
427        const END_BOUND: u64 = 1;
428
429        let mut this = self.project();
430
431        // If a result has been buffered, return it and queue the parent fetch if needed.
432        if let Some(block) = this.buffered.pop() {
433            let height = block.height();
434            let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
435            if should_fetch_parent {
436                let parent_commitment = block.parent();
437                let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
438                this.pending.push_back(future);
439
440                // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
441                // buffer it for the next poll.
442                if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
443                    this.buffered.push(block);
444                }
445            }
446
447            return Poll::Ready(Some(block));
448        }
449
450        match this.pending.as_mut().poll_next(cx) {
451            Poll::Pending => Poll::Pending,
452            Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
453            Poll::Ready(Some(Some(block))) => {
454                let height = block.height();
455                let should_fetch_parent = height > END_BOUND;
456                if should_fetch_parent {
457                    let parent_commitment = block.parent();
458                    let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
459                    this.pending.push_back(future);
460
461                    // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
462                    // buffer it for the next poll.
463                    if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
464                        this.buffered.push(block);
465                    }
466                }
467
468                Poll::Ready(Some(block))
469            }
470        }
471    }
472}