commonware_consensus/marshal/ingress/mailbox.rs
1use crate::{
2 simplex::types::{Activity, Finalization, Notarization},
3 types::{Height, Round},
4 Block, Heightable, Reporter,
5};
6use commonware_cryptography::{certificate::Scheme, Digest};
7use commonware_storage::archive;
8use commonware_utils::{channels::fallible::AsyncFallibleExt, vec::NonEmptyVec};
9use futures::{
10 channel::{mpsc, oneshot},
11 future::BoxFuture,
12 stream::{FuturesOrdered, Stream},
13 FutureExt,
14};
15use pin_project::pin_project;
16use std::{
17 pin::Pin,
18 task::{Context, Poll},
19};
20
21/// An identifier for a block request.
22pub enum Identifier<D: Digest> {
23 /// The height of the block to retrieve.
24 Height(Height),
25 /// The commitment of the block to retrieve.
26 Commitment(D),
27 /// The highest finalized block. It may be the case that marshal does not have some of the
28 /// blocks below this height.
29 Latest,
30}
31
32// Allows using Height directly for convenience.
33impl<D: Digest> From<Height> for Identifier<D> {
34 fn from(src: Height) -> Self {
35 Self::Height(src)
36 }
37}
38
39// Allows using &Digest directly for convenience.
40impl<D: Digest> From<&D> for Identifier<D> {
41 fn from(src: &D) -> Self {
42 Self::Commitment(*src)
43 }
44}
45
46// Allows using archive identifiers directly for convenience.
47impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
48 fn from(src: archive::Identifier<'_, D>) -> Self {
49 match src {
50 archive::Identifier::Index(index) => Self::Height(Height::new(index)),
51 archive::Identifier::Key(key) => Self::Commitment(*key),
52 }
53 }
54}
55
56/// Messages sent to the marshal [Actor](super::super::actor::Actor).
57///
58/// These messages are sent from the consensus engine and other parts of the
59/// system to drive the state of the marshal.
60pub(crate) enum Message<S: Scheme, B: Block> {
61 // -------------------- Application Messages --------------------
62 /// A request to retrieve the (height, commitment) of a block by its identifier.
63 /// The block must be finalized; returns `None` if the block is not finalized.
64 GetInfo {
65 /// The identifier of the block to get the information of.
66 identifier: Identifier<B::Commitment>,
67 /// A channel to send the retrieved (height, commitment).
68 response: oneshot::Sender<Option<(Height, B::Commitment)>>,
69 },
70 /// A request to retrieve a block by its identifier.
71 ///
72 /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
73 /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
74 GetBlock {
75 /// The identifier of the block to retrieve.
76 identifier: Identifier<B::Commitment>,
77 /// A channel to send the retrieved block.
78 response: oneshot::Sender<Option<B>>,
79 },
80 /// A request to retrieve a finalization by height.
81 GetFinalization {
82 /// The height of the finalization to retrieve.
83 height: Height,
84 /// A channel to send the retrieved finalization.
85 response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
86 },
87 /// A hint that a finalized block may be available at a given height.
88 ///
89 /// This triggers a network fetch if the finalization is not available locally.
90 /// This is fire-and-forget: the finalization will be stored in marshal and
91 /// delivered via the normal finalization flow when available.
92 ///
93 /// Targets are required because this is typically called when a peer claims to
94 /// be ahead. If a target returns invalid data, the resolver will block them.
95 /// Sending this message multiple times with different targets adds to the
96 /// target set.
97 HintFinalized {
98 /// The height of the finalization to fetch.
99 height: Height,
100 /// Target peers to fetch from. Added to any existing targets for this height.
101 targets: NonEmptyVec<S::PublicKey>,
102 },
103 /// A request to retrieve a block by its commitment.
104 Subscribe {
105 /// The view in which the block was notarized. This is an optimization
106 /// to help locate the block.
107 round: Option<Round>,
108 /// The commitment of the block to retrieve.
109 commitment: B::Commitment,
110 /// A channel to send the retrieved block.
111 response: oneshot::Sender<B>,
112 },
113 /// A request to broadcast a proposed block to all peers.
114 Proposed {
115 /// The round in which the block was proposed.
116 round: Round,
117 /// The block to broadcast.
118 block: B,
119 },
120 /// A notification that a block has been verified by the application.
121 Verified {
122 /// The round in which the block was verified.
123 round: Round,
124 /// The verified block.
125 block: B,
126 },
127 /// Sets the sync starting point (advances if higher than current).
128 ///
129 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data at or
130 /// below the floor is pruned.
131 ///
132 /// To prune data without affecting the sync starting point (say at some trailing depth
133 /// from tip), prune the finalized stores directly.
134 ///
135 /// The default floor is 0.
136 SetFloor {
137 /// The candidate floor height.
138 height: Height,
139 },
140
141 // -------------------- Consensus Engine Messages --------------------
142 /// A notarization from the consensus engine.
143 Notarization {
144 /// The notarization.
145 notarization: Notarization<S, B::Commitment>,
146 },
147 /// A finalization from the consensus engine.
148 Finalization {
149 /// The finalization.
150 finalization: Finalization<S, B::Commitment>,
151 },
152}
153
154/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
155#[derive(Clone)]
156pub struct Mailbox<S: Scheme, B: Block> {
157 sender: mpsc::Sender<Message<S, B>>,
158}
159
160impl<S: Scheme, B: Block> Mailbox<S, B> {
161 /// Creates a new mailbox.
162 pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
163 Self { sender }
164 }
165
166 /// A request to retrieve the information about the highest finalized block.
167 pub async fn get_info(
168 &mut self,
169 identifier: impl Into<Identifier<B::Commitment>>,
170 ) -> Option<(Height, B::Commitment)> {
171 let identifier = identifier.into();
172 self.sender
173 .request(|response| Message::GetInfo {
174 identifier,
175 response,
176 })
177 .await
178 .flatten()
179 }
180
181 /// A best-effort attempt to retrieve a given block from local
182 /// storage. It is not an indication to go fetch the block from the network.
183 pub async fn get_block(
184 &mut self,
185 identifier: impl Into<Identifier<B::Commitment>>,
186 ) -> Option<B> {
187 let identifier = identifier.into();
188 self.sender
189 .request(|response| Message::GetBlock {
190 identifier,
191 response,
192 })
193 .await
194 .flatten()
195 }
196
197 /// A best-effort attempt to retrieve a given [Finalization] from local
198 /// storage. It is not an indication to go fetch the [Finalization] from the network.
199 pub async fn get_finalization(
200 &mut self,
201 height: Height,
202 ) -> Option<Finalization<S, B::Commitment>> {
203 self.sender
204 .request(|response| Message::GetFinalization { height, response })
205 .await
206 .flatten()
207 }
208
209 /// Hints that a finalized block may be available at the given height.
210 ///
211 /// This method will request the finalization from the network via the resolver
212 /// if it is not available locally.
213 ///
214 /// Targets are required because this is typically called when a peer claims to be
215 /// ahead. By targeting only those peers, we limit who we ask. If a target returns
216 /// invalid data, they will be blocked by the resolver. If targets don't respond
217 /// or return "no data", they effectively rate-limit themselves.
218 ///
219 /// Calling this multiple times for the same height with different targets will
220 /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
221 ///
222 /// This is fire-and-forget: the finalization will be stored in marshal and delivered
223 /// via the normal finalization flow when available.
224 pub async fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
225 self.sender
226 .send_lossy(Message::HintFinalized { height, targets })
227 .await;
228 }
229
230 /// A request to retrieve a block by its commitment.
231 ///
232 /// If the block is found available locally, the block will be returned immediately.
233 ///
234 /// If the block is not available locally, the request will be registered and the caller will
235 /// be notified when the block is available. If the block is not finalized, it's possible that
236 /// it may never become available.
237 ///
238 /// The oneshot receiver should be dropped to cancel the subscription.
239 pub async fn subscribe(
240 &mut self,
241 round: Option<Round>,
242 commitment: B::Commitment,
243 ) -> oneshot::Receiver<B> {
244 let (tx, rx) = oneshot::channel();
245 self.sender
246 .send_lossy(Message::Subscribe {
247 round,
248 commitment,
249 response: tx,
250 })
251 .await;
252 rx
253 }
254
255 /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
256 ///
257 /// If the starting block is not found, `None` is returned.
258 pub async fn ancestry(
259 &mut self,
260 (start_round, start_commitment): (Option<Round>, B::Commitment),
261 ) -> Option<AncestorStream<S, B>> {
262 self.subscribe(start_round, start_commitment)
263 .await
264 .await
265 .ok()
266 .map(|block| AncestorStream::new(self.clone(), [block]))
267 }
268
269 /// Proposed requests that a proposed block is sent to all peers.
270 pub async fn proposed(&mut self, round: Round, block: B) {
271 self.sender
272 .send_lossy(Message::Proposed { round, block })
273 .await;
274 }
275
276 /// Notifies the actor that a block has been verified.
277 pub async fn verified(&mut self, round: Round, block: B) {
278 self.sender
279 .send_lossy(Message::Verified { round, block })
280 .await;
281 }
282
283 /// Sets the sync starting point (advances if higher than current).
284 ///
285 /// Marshal will sync and deliver blocks starting at `floor + 1`. Data at or
286 /// below the floor is pruned.
287 ///
288 /// To prune data without affecting the sync starting point (say at some trailing depth
289 /// from tip), prune the finalized stores directly.
290 ///
291 /// The default floor is 0.
292 pub async fn set_floor(&mut self, height: Height) {
293 self.sender.send_lossy(Message::SetFloor { height }).await;
294 }
295}
296
297impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
298 type Activity = Activity<S, B::Commitment>;
299
300 async fn report(&mut self, activity: Self::Activity) {
301 let message = match activity {
302 Activity::Notarization(notarization) => Message::Notarization { notarization },
303 Activity::Finalization(finalization) => Message::Finalization { finalization },
304 _ => {
305 // Ignore other activity types
306 return;
307 }
308 };
309 self.sender.send_lossy(message).await;
310 }
311}
312
313/// Returns a boxed subscription future for a block.
314#[inline]
315fn subscribe_block_future<S: Scheme, B: Block>(
316 mut marshal: Mailbox<S, B>,
317 commitment: B::Commitment,
318) -> BoxFuture<'static, Option<B>> {
319 async move {
320 let receiver = marshal.subscribe(None, commitment).await;
321 receiver.await.ok()
322 }
323 .boxed()
324}
325
326/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
327///
328/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
329/// at block height 0 rather than 1.
330#[pin_project]
331pub struct AncestorStream<S: Scheme, B: Block> {
332 marshal: Mailbox<S, B>,
333 buffered: Vec<B>,
334 #[pin]
335 pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
336}
337
338impl<S: Scheme, B: Block> AncestorStream<S, B> {
339 /// Creates a new [AncestorStream] starting from the given ancestry.
340 ///
341 /// # Panics
342 ///
343 /// Panics if the initial blocks are not contiguous in height.
344 pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
345 let mut buffered = initial.into_iter().collect::<Vec<B>>();
346 buffered.sort_by_key(Heightable::height);
347
348 // Check that the initial blocks are contiguous in height.
349 buffered.windows(2).for_each(|window| {
350 assert_eq!(
351 window[0].height().next(),
352 window[1].height(),
353 "initial blocks must be contiguous in height"
354 );
355 });
356
357 Self {
358 marshal,
359 buffered,
360 pending: FuturesOrdered::new(),
361 }
362 }
363}
364
365impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
366 type Item = B;
367
368 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
369 // Because marshal cannot currently yield the genesis block, we stop at height 1.
370 const END_BOUND: Height = Height::new(1);
371
372 let mut this = self.project();
373
374 // If a result has been buffered, return it and queue the parent fetch if needed.
375 if let Some(block) = this.buffered.pop() {
376 let height = block.height();
377 let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
378 if should_fetch_parent {
379 let parent_commitment = block.parent();
380 let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
381 this.pending.push_back(future);
382
383 // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
384 // buffer it for the next poll.
385 if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
386 this.buffered.push(block);
387 }
388 }
389
390 return Poll::Ready(Some(block));
391 }
392
393 match this.pending.as_mut().poll_next(cx) {
394 Poll::Pending => Poll::Pending,
395 Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
396 Poll::Ready(Some(Some(block))) => {
397 let height = block.height();
398 let should_fetch_parent = height > END_BOUND;
399 if should_fetch_parent {
400 let parent_commitment = block.parent();
401 let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
402 this.pending.push_back(future);
403
404 // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
405 // buffer it for the next poll.
406 if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
407 this.buffered.push(block);
408 }
409 }
410
411 Poll::Ready(Some(block))
412 }
413 }
414 }
415}