commonware_consensus/marshal/ingress/mailbox.rs
1use crate::{
2 simplex::types::{Activity, Finalization, Notarization},
3 types::{Height, Round},
4 Block, Heightable, Reporter,
5};
6use commonware_cryptography::{certificate::Scheme, Digest};
7use commonware_storage::archive;
8use commonware_utils::{
9 channel::{fallible::AsyncFallibleExt, mpsc, oneshot},
10 vec::NonEmptyVec,
11};
12use futures::{
13 future::BoxFuture,
14 stream::{FuturesOrdered, Stream},
15 FutureExt,
16};
17use pin_project::pin_project;
18use std::{
19 pin::Pin,
20 task::{Context, Poll},
21};
22
23/// An identifier for a block request.
24pub enum Identifier<D: Digest> {
25 /// The height of the block to retrieve.
26 Height(Height),
27 /// The commitment of the block to retrieve.
28 Commitment(D),
29 /// The highest finalized block. It may be the case that marshal does not have some of the
30 /// blocks below this height.
31 Latest,
32}
33
34// Allows using Height directly for convenience.
35impl<D: Digest> From<Height> for Identifier<D> {
36 fn from(src: Height) -> Self {
37 Self::Height(src)
38 }
39}
40
41// Allows using &Digest directly for convenience.
42impl<D: Digest> From<&D> for Identifier<D> {
43 fn from(src: &D) -> Self {
44 Self::Commitment(*src)
45 }
46}
47
48// Allows using archive identifiers directly for convenience.
49impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
50 fn from(src: archive::Identifier<'_, D>) -> Self {
51 match src {
52 archive::Identifier::Index(index) => Self::Height(Height::new(index)),
53 archive::Identifier::Key(key) => Self::Commitment(*key),
54 }
55 }
56}
57
58/// Messages sent to the marshal [Actor](super::super::actor::Actor).
59///
60/// These messages are sent from the consensus engine and other parts of the
61/// system to drive the state of the marshal.
62pub(crate) enum Message<S: Scheme, B: Block> {
63 // -------------------- Application Messages --------------------
64 /// A request to retrieve the (height, commitment) of a block by its identifier.
65 /// The block must be finalized; returns `None` if the block is not finalized.
66 GetInfo {
67 /// The identifier of the block to get the information of.
68 identifier: Identifier<B::Commitment>,
69 /// A channel to send the retrieved (height, commitment).
70 response: oneshot::Sender<Option<(Height, B::Commitment)>>,
71 },
72 /// A request to retrieve a block by its identifier.
73 ///
74 /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
75 /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
76 GetBlock {
77 /// The identifier of the block to retrieve.
78 identifier: Identifier<B::Commitment>,
79 /// A channel to send the retrieved block.
80 response: oneshot::Sender<Option<B>>,
81 },
82 /// A request to retrieve a finalization by height.
83 GetFinalization {
84 /// The height of the finalization to retrieve.
85 height: Height,
86 /// A channel to send the retrieved finalization.
87 response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
88 },
89 /// A hint that a finalized block may be available at a given height.
90 ///
91 /// This triggers a network fetch if the finalization is not available locally.
92 /// This is fire-and-forget: the finalization will be stored in marshal and
93 /// delivered via the normal finalization flow when available.
94 ///
95 /// Targets are required because this is typically called when a peer claims to
96 /// be ahead. If a target returns invalid data, the resolver will block them.
97 /// Sending this message multiple times with different targets adds to the
98 /// target set.
99 HintFinalized {
100 /// The height of the finalization to fetch.
101 height: Height,
102 /// Target peers to fetch from. Added to any existing targets for this height.
103 targets: NonEmptyVec<S::PublicKey>,
104 },
105 /// A request to retrieve a block by its commitment.
106 Subscribe {
107 /// The view in which the block was notarized. This is an optimization
108 /// to help locate the block.
109 round: Option<Round>,
110 /// The commitment of the block to retrieve.
111 commitment: B::Commitment,
112 /// A channel to send the retrieved block.
113 response: oneshot::Sender<B>,
114 },
115 /// A request to broadcast a proposed block to all peers.
116 Proposed {
117 /// The round in which the block was proposed.
118 round: Round,
119 /// The block to broadcast.
120 block: B,
121 },
122 /// A notification that a block has been verified by the application.
123 Verified {
124 /// The round in which the block was verified.
125 round: Round,
126 /// The verified block.
127 block: B,
128 },
129 /// Sets the sync starting point (advances if higher than current).
130 ///
131 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
132 /// the floor is pruned.
133 ///
134 /// To prune data without affecting the sync starting point (say at some trailing depth
135 /// from tip), use [Message::Prune] instead.
136 ///
137 /// The default floor is 0.
138 SetFloor {
139 /// The candidate floor height.
140 height: Height,
141 },
142 /// Prunes finalized blocks and certificates below the given height.
143 ///
144 /// Unlike [Message::SetFloor], this does not affect the sync starting point.
145 /// The height must be at or below the current floor (last processed height),
146 /// otherwise the prune request is ignored.
147 Prune {
148 /// The minimum height to keep (blocks below this are pruned).
149 height: Height,
150 },
151
152 // -------------------- Consensus Engine Messages --------------------
153 /// A notarization from the consensus engine.
154 Notarization {
155 /// The notarization.
156 notarization: Notarization<S, B::Commitment>,
157 },
158 /// A finalization from the consensus engine.
159 Finalization {
160 /// The finalization.
161 finalization: Finalization<S, B::Commitment>,
162 },
163}
164
165/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
166#[derive(Clone)]
167pub struct Mailbox<S: Scheme, B: Block> {
168 sender: mpsc::Sender<Message<S, B>>,
169}
170
171impl<S: Scheme, B: Block> Mailbox<S, B> {
172 /// Creates a new mailbox.
173 pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
174 Self { sender }
175 }
176
177 /// A request to retrieve the information about the highest finalized block.
178 pub async fn get_info(
179 &mut self,
180 identifier: impl Into<Identifier<B::Commitment>>,
181 ) -> Option<(Height, B::Commitment)> {
182 let identifier = identifier.into();
183 self.sender
184 .request(|response| Message::GetInfo {
185 identifier,
186 response,
187 })
188 .await
189 .flatten()
190 }
191
192 /// A best-effort attempt to retrieve a given block from local
193 /// storage. It is not an indication to go fetch the block from the network.
194 pub async fn get_block(
195 &mut self,
196 identifier: impl Into<Identifier<B::Commitment>>,
197 ) -> Option<B> {
198 let identifier = identifier.into();
199 self.sender
200 .request(|response| Message::GetBlock {
201 identifier,
202 response,
203 })
204 .await
205 .flatten()
206 }
207
208 /// A best-effort attempt to retrieve a given [Finalization] from local
209 /// storage. It is not an indication to go fetch the [Finalization] from the network.
210 pub async fn get_finalization(
211 &mut self,
212 height: Height,
213 ) -> Option<Finalization<S, B::Commitment>> {
214 self.sender
215 .request(|response| Message::GetFinalization { height, response })
216 .await
217 .flatten()
218 }
219
220 /// Hints that a finalized block may be available at the given height.
221 ///
222 /// This method will request the finalization from the network via the resolver
223 /// if it is not available locally.
224 ///
225 /// Targets are required because this is typically called when a peer claims to be
226 /// ahead. By targeting only those peers, we limit who we ask. If a target returns
227 /// invalid data, they will be blocked by the resolver. If targets don't respond
228 /// or return "no data", they effectively rate-limit themselves.
229 ///
230 /// Calling this multiple times for the same height with different targets will
231 /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
232 ///
233 /// This is fire-and-forget: the finalization will be stored in marshal and delivered
234 /// via the normal finalization flow when available.
235 pub async fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
236 self.sender
237 .send_lossy(Message::HintFinalized { height, targets })
238 .await;
239 }
240
241 /// A request to retrieve a block by its commitment.
242 ///
243 /// If the block is found available locally, the block will be returned immediately.
244 ///
245 /// If the block is not available locally, the request will be registered and the caller will
246 /// be notified when the block is available. If the block is not finalized, it's possible that
247 /// it may never become available.
248 ///
249 /// The oneshot receiver should be dropped to cancel the subscription.
250 pub async fn subscribe(
251 &mut self,
252 round: Option<Round>,
253 commitment: B::Commitment,
254 ) -> oneshot::Receiver<B> {
255 let (tx, rx) = oneshot::channel();
256 self.sender
257 .send_lossy(Message::Subscribe {
258 round,
259 commitment,
260 response: tx,
261 })
262 .await;
263 rx
264 }
265
266 /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
267 ///
268 /// If the starting block is not found, `None` is returned.
269 pub async fn ancestry(
270 &mut self,
271 (start_round, start_commitment): (Option<Round>, B::Commitment),
272 ) -> Option<AncestorStream<S, B>> {
273 self.subscribe(start_round, start_commitment)
274 .await
275 .await
276 .ok()
277 .map(|block| AncestorStream::new(self.clone(), [block]))
278 }
279
280 /// Proposed requests that a proposed block is sent to all peers.
281 pub async fn proposed(&mut self, round: Round, block: B) {
282 self.sender
283 .send_lossy(Message::Proposed { round, block })
284 .await;
285 }
286
287 /// Notifies the actor that a block has been verified.
288 pub async fn verified(&mut self, round: Round, block: B) {
289 self.sender
290 .send_lossy(Message::Verified { round, block })
291 .await;
292 }
293
294 /// Sets the sync starting point (advances if higher than current).
295 ///
296 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below
297 /// the floor is pruned.
298 ///
299 /// To prune data without affecting the sync starting point (say at some trailing depth
300 /// from tip), use [Self::prune] instead.
301 ///
302 /// The default floor is 0.
303 pub async fn set_floor(&mut self, height: Height) {
304 self.sender.send_lossy(Message::SetFloor { height }).await;
305 }
306
307 /// Prunes finalized blocks and certificates below the given height.
308 ///
309 /// Unlike [Self::set_floor], this does not affect the sync starting point.
310 /// The height must be at or below the current floor (last processed height),
311 /// otherwise the prune request is ignored.
312 pub async fn prune(&mut self, height: Height) {
313 self.sender.send_lossy(Message::Prune { height }).await;
314 }
315}
316
317impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
318 type Activity = Activity<S, B::Commitment>;
319
320 async fn report(&mut self, activity: Self::Activity) {
321 let message = match activity {
322 Activity::Notarization(notarization) => Message::Notarization { notarization },
323 Activity::Finalization(finalization) => Message::Finalization { finalization },
324 _ => {
325 // Ignore other activity types
326 return;
327 }
328 };
329 self.sender.send_lossy(message).await;
330 }
331}
332
333/// Returns a boxed subscription future for a block.
334#[inline]
335fn subscribe_block_future<S: Scheme, B: Block>(
336 mut marshal: Mailbox<S, B>,
337 commitment: B::Commitment,
338) -> BoxFuture<'static, Option<B>> {
339 async move {
340 let receiver = marshal.subscribe(None, commitment).await;
341 receiver.await.ok()
342 }
343 .boxed()
344}
345
346/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
347///
348/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
349/// at block height 0 rather than 1.
350#[pin_project]
351pub struct AncestorStream<S: Scheme, B: Block> {
352 marshal: Mailbox<S, B>,
353 buffered: Vec<B>,
354 #[pin]
355 pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
356}
357
358impl<S: Scheme, B: Block> AncestorStream<S, B> {
359 /// Creates a new [AncestorStream] starting from the given ancestry.
360 ///
361 /// # Panics
362 ///
363 /// Panics if the initial blocks are not contiguous in height.
364 pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
365 let mut buffered = initial.into_iter().collect::<Vec<B>>();
366 buffered.sort_by_key(Heightable::height);
367
368 // Check that the initial blocks are contiguous in height.
369 buffered.windows(2).for_each(|window| {
370 assert_eq!(
371 window[0].height().next(),
372 window[1].height(),
373 "initial blocks must be contiguous in height"
374 );
375 });
376
377 Self {
378 marshal,
379 buffered,
380 pending: FuturesOrdered::new(),
381 }
382 }
383}
384
385impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
386 type Item = B;
387
388 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
389 // Because marshal cannot currently yield the genesis block, we stop at height 1.
390 const END_BOUND: Height = Height::new(1);
391
392 let mut this = self.project();
393
394 // If a result has been buffered, return it and queue the parent fetch if needed.
395 if let Some(block) = this.buffered.pop() {
396 let height = block.height();
397 let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
398 if should_fetch_parent {
399 let parent_commitment = block.parent();
400 let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
401 this.pending.push_back(future);
402
403 // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
404 // buffer it for the next poll.
405 if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
406 this.buffered.push(block);
407 }
408 }
409
410 return Poll::Ready(Some(block));
411 }
412
413 match this.pending.as_mut().poll_next(cx) {
414 Poll::Pending => Poll::Pending,
415 Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
416 Poll::Ready(Some(Some(block))) => {
417 let height = block.height();
418 let should_fetch_parent = height > END_BOUND;
419 if should_fetch_parent {
420 let parent_commitment = block.parent();
421 let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
422 this.pending.push_back(future);
423
424 // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
425 // buffer it for the next poll.
426 if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
427 this.buffered.push(block);
428 }
429 }
430
431 Poll::Ready(Some(block))
432 }
433 }
434 }
435}