commonware_consensus/marshal/ingress/mailbox.rs
1use crate::{
2 simplex::types::{Activity, Finalization, Notarization},
3 types::Round,
4 Block, Reporter,
5};
6use commonware_cryptography::{certificate::Scheme, Digest};
7use commonware_storage::archive;
8use commonware_utils::vec::NonEmptyVec;
9use futures::{
10 channel::{mpsc, oneshot},
11 future::BoxFuture,
12 stream::{FuturesOrdered, Stream},
13 FutureExt, SinkExt,
14};
15use pin_project::pin_project;
16use std::{
17 pin::Pin,
18 task::{Context, Poll},
19};
20use tracing::error;
21
22/// An identifier for a block request.
23pub enum Identifier<D: Digest> {
24 /// The height of the block to retrieve.
25 Height(u64),
26 /// The commitment of the block to retrieve.
27 Commitment(D),
28 /// The highest finalized block. It may be the case that marshal does not have some of the
29 /// blocks below this height.
30 Latest,
31}
32
33// Allows using u64 directly for convenience.
34impl<D: Digest> From<u64> for Identifier<D> {
35 fn from(src: u64) -> Self {
36 Self::Height(src)
37 }
38}
39
40// Allows using &Digest directly for convenience.
41impl<D: Digest> From<&D> for Identifier<D> {
42 fn from(src: &D) -> Self {
43 Self::Commitment(*src)
44 }
45}
46
47// Allows using archive identifiers directly for convenience.
48impl<D: Digest> From<archive::Identifier<'_, D>> for Identifier<D> {
49 fn from(src: archive::Identifier<'_, D>) -> Self {
50 match src {
51 archive::Identifier::Index(index) => Self::Height(index),
52 archive::Identifier::Key(key) => Self::Commitment(*key),
53 }
54 }
55}
56
57/// Messages sent to the marshal [Actor](super::super::actor::Actor).
58///
59/// These messages are sent from the consensus engine and other parts of the
60/// system to drive the state of the marshal.
61pub(crate) enum Message<S: Scheme, B: Block> {
62 // -------------------- Application Messages --------------------
63 /// A request to retrieve the (height, commitment) of a block by its identifier.
64 /// The block must be finalized; returns `None` if the block is not finalized.
65 GetInfo {
66 /// The identifier of the block to get the information of.
67 identifier: Identifier<B::Commitment>,
68 /// A channel to send the retrieved (height, commitment).
69 response: oneshot::Sender<Option<(u64, B::Commitment)>>,
70 },
71 /// A request to retrieve a block by its identifier.
72 ///
73 /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
74 /// blocks, whereas requesting by commitment may return non-finalized or even unverified blocks.
75 GetBlock {
76 /// The identifier of the block to retrieve.
77 identifier: Identifier<B::Commitment>,
78 /// A channel to send the retrieved block.
79 response: oneshot::Sender<Option<B>>,
80 },
81 /// A request to retrieve a finalization by height.
82 GetFinalization {
83 /// The height of the finalization to retrieve.
84 height: u64,
85 /// A channel to send the retrieved finalization.
86 response: oneshot::Sender<Option<Finalization<S, B::Commitment>>>,
87 },
88 /// A hint that a finalized block may be available at a given height.
89 ///
90 /// This triggers a network fetch if the finalization is not available locally.
91 /// This is fire-and-forget: the finalization will be stored in marshal and
92 /// delivered via the normal finalization flow when available.
93 ///
94 /// Targets are required because this is typically called when a peer claims to
95 /// be ahead. If a target returns invalid data, the resolver will block them.
96 /// Sending this message multiple times with different targets adds to the
97 /// target set.
98 HintFinalized {
99 /// The height of the finalization to fetch.
100 height: u64,
101 /// Target peers to fetch from. Added to any existing targets for this height.
102 targets: NonEmptyVec<S::PublicKey>,
103 },
104 /// A request to retrieve a block by its commitment.
105 Subscribe {
106 /// The view in which the block was notarized. This is an optimization
107 /// to help locate the block.
108 round: Option<Round>,
109 /// The commitment of the block to retrieve.
110 commitment: B::Commitment,
111 /// A channel to send the retrieved block.
112 response: oneshot::Sender<B>,
113 },
114 /// A request to broadcast a proposed block to all peers.
115 Proposed {
116 /// The round in which the block was proposed.
117 round: Round,
118 /// The block to broadcast.
119 block: B,
120 },
121 /// A notification that a block has been verified by the application.
122 Verified {
123 /// The round in which the block was verified.
124 round: Round,
125 /// The verified block.
126 block: B,
127 },
128 /// A request to set the sync floor.
129 ///
130 /// The sync floor is the latest block that the application has processed. Marshal
131 /// will not attempt to sync blocks below this height nor deliver blocks below
132 /// this height to the application.
133 ///
134 /// This sets the sync floor only if the provided height is higher than the
135 /// previously recorded floor.
136 ///
137 /// The default sync floor is height 0.
138 SetFloor {
139 /// The candidate sync floor height.
140 height: u64,
141 },
142
143 // -------------------- Consensus Engine Messages --------------------
144 /// A notarization from the consensus engine.
145 Notarization {
146 /// The notarization.
147 notarization: Notarization<S, B::Commitment>,
148 },
149 /// A finalization from the consensus engine.
150 Finalization {
151 /// The finalization.
152 finalization: Finalization<S, B::Commitment>,
153 },
154}
155
156/// A mailbox for sending messages to the marshal [Actor](super::super::actor::Actor).
157#[derive(Clone)]
158pub struct Mailbox<S: Scheme, B: Block> {
159 sender: mpsc::Sender<Message<S, B>>,
160}
161
162impl<S: Scheme, B: Block> Mailbox<S, B> {
163 /// Creates a new mailbox.
164 pub(crate) const fn new(sender: mpsc::Sender<Message<S, B>>) -> Self {
165 Self { sender }
166 }
167
168 /// A request to retrieve the information about the highest finalized block.
169 pub async fn get_info(
170 &mut self,
171 identifier: impl Into<Identifier<B::Commitment>>,
172 ) -> Option<(u64, B::Commitment)> {
173 let (tx, rx) = oneshot::channel();
174 if self
175 .sender
176 .send(Message::GetInfo {
177 identifier: identifier.into(),
178 response: tx,
179 })
180 .await
181 .is_err()
182 {
183 error!("failed to send get info message to actor: receiver dropped");
184 }
185 rx.await.unwrap_or_else(|_| {
186 error!("failed to get block info: receiver dropped");
187 None
188 })
189 }
190
191 /// A best-effort attempt to retrieve a given block from local
192 /// storage. It is not an indication to go fetch the block from the network.
193 pub async fn get_block(
194 &mut self,
195 identifier: impl Into<Identifier<B::Commitment>>,
196 ) -> Option<B> {
197 let (tx, rx) = oneshot::channel();
198 if self
199 .sender
200 .send(Message::GetBlock {
201 identifier: identifier.into(),
202 response: tx,
203 })
204 .await
205 .is_err()
206 {
207 error!("failed to send get block message to actor: receiver dropped");
208 }
209 rx.await.unwrap_or_else(|_| {
210 error!("failed to get block: receiver dropped");
211 None
212 })
213 }
214
215 /// A best-effort attempt to retrieve a given [Finalization] from local
216 /// storage. It is not an indication to go fetch the [Finalization] from the network.
217 pub async fn get_finalization(
218 &mut self,
219 height: u64,
220 ) -> Option<Finalization<S, B::Commitment>> {
221 let (tx, rx) = oneshot::channel();
222 if self
223 .sender
224 .send(Message::GetFinalization {
225 height,
226 response: tx,
227 })
228 .await
229 .is_err()
230 {
231 error!("failed to send get finalization message to actor: receiver dropped");
232 }
233 rx.await.unwrap_or_else(|_| {
234 error!("failed to get finalization: receiver dropped");
235 None
236 })
237 }
238
239 /// Hints that a finalized block may be available at the given height.
240 ///
241 /// This method will request the finalization from the network via the resolver
242 /// if it is not available locally.
243 ///
244 /// Targets are required because this is typically called when a peer claims to be
245 /// ahead. By targeting only those peers, we limit who we ask. If a target returns
246 /// invalid data, they will be blocked by the resolver. If targets don't respond
247 /// or return "no data", they effectively rate-limit themselves.
248 ///
249 /// Calling this multiple times for the same height with different targets will
250 /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
251 ///
252 /// This is fire-and-forget: the finalization will be stored in marshal and delivered
253 /// via the normal finalization flow when available.
254 pub async fn hint_finalized(&mut self, height: u64, targets: NonEmptyVec<S::PublicKey>) {
255 if self
256 .sender
257 .send(Message::HintFinalized { height, targets })
258 .await
259 .is_err()
260 {
261 error!("failed to send hint finalized message to actor: receiver dropped");
262 }
263 }
264
265 /// A request to retrieve a block by its commitment.
266 ///
267 /// If the block is found available locally, the block will be returned immediately.
268 ///
269 /// If the block is not available locally, the request will be registered and the caller will
270 /// be notified when the block is available. If the block is not finalized, it's possible that
271 /// it may never become available.
272 ///
273 /// The oneshot receiver should be dropped to cancel the subscription.
274 pub async fn subscribe(
275 &mut self,
276 round: Option<Round>,
277 commitment: B::Commitment,
278 ) -> oneshot::Receiver<B> {
279 let (tx, rx) = oneshot::channel();
280 if self
281 .sender
282 .send(Message::Subscribe {
283 round,
284 commitment,
285 response: tx,
286 })
287 .await
288 .is_err()
289 {
290 error!("failed to send subscribe message to actor: receiver dropped");
291 }
292 rx
293 }
294
295 /// Returns an [AncestorStream] over the ancestry of a given block, leading up to genesis.
296 ///
297 /// If the starting block is not found, `None` is returned.
298 pub async fn ancestry(
299 &mut self,
300 (start_round, start_commitment): (Option<Round>, B::Commitment),
301 ) -> Option<AncestorStream<S, B>> {
302 self.subscribe(start_round, start_commitment)
303 .await
304 .await
305 .ok()
306 .map(|block| AncestorStream::new(self.clone(), [block]))
307 }
308
309 /// Proposed requests that a proposed block is sent to all peers.
310 pub async fn proposed(&mut self, round: Round, block: B) {
311 if self
312 .sender
313 .send(Message::Proposed { round, block })
314 .await
315 .is_err()
316 {
317 error!("failed to send proposed message to actor: receiver dropped");
318 }
319 }
320
321 /// Notifies the actor that a block has been verified.
322 pub async fn verified(&mut self, round: Round, block: B) {
323 if self
324 .sender
325 .send(Message::Verified { round, block })
326 .await
327 .is_err()
328 {
329 error!("failed to send verified message to actor: receiver dropped");
330 }
331 }
332
333 /// A request to set the sync floor (conditionally advances if higher).
334 ///
335 /// The sync floor is the latest block that the application has processed. Marshal
336 /// will not attempt to sync blocks below this height nor deliver blocks below
337 /// this height to the application.
338 ///
339 /// The default sync floor is height 0.
340 pub async fn set_floor(&mut self, height: u64) {
341 if self
342 .sender
343 .send(Message::SetFloor { height })
344 .await
345 .is_err()
346 {
347 error!("failed to send set sync floor message to actor: receiver dropped");
348 }
349 }
350}
351
352impl<S: Scheme, B: Block> Reporter for Mailbox<S, B> {
353 type Activity = Activity<S, B::Commitment>;
354
355 async fn report(&mut self, activity: Self::Activity) {
356 let message = match activity {
357 Activity::Notarization(notarization) => Message::Notarization { notarization },
358 Activity::Finalization(finalization) => Message::Finalization { finalization },
359 _ => {
360 // Ignore other activity types
361 return;
362 }
363 };
364 if self.sender.send(message).await.is_err() {
365 error!("failed to report activity to actor: receiver dropped");
366 }
367 }
368}
369
370/// Returns a boxed subscription future for a block.
371#[inline]
372fn subscribe_block_future<S: Scheme, B: Block>(
373 mut marshal: Mailbox<S, B>,
374 commitment: B::Commitment,
375) -> BoxFuture<'static, Option<B>> {
376 async move {
377 let receiver = marshal.subscribe(None, commitment).await;
378 receiver.await.ok()
379 }
380 .boxed()
381}
382
383/// Yields the ancestors of a block while prefetching parents, _not_ including the genesis block.
384///
385/// TODO(clabby): Once marshal can also yield the genesis block, this stream should end
386/// at block height 0 rather than 1.
387#[pin_project]
388pub struct AncestorStream<S: Scheme, B: Block> {
389 marshal: Mailbox<S, B>,
390 buffered: Vec<B>,
391 #[pin]
392 pending: FuturesOrdered<BoxFuture<'static, Option<B>>>,
393}
394
395impl<S: Scheme, B: Block> AncestorStream<S, B> {
396 /// Creates a new [AncestorStream] starting from the given ancestry.
397 ///
398 /// # Panics
399 ///
400 /// Panics if the initial blocks are not contiguous in height.
401 pub(crate) fn new(marshal: Mailbox<S, B>, initial: impl IntoIterator<Item = B>) -> Self {
402 let mut buffered = initial.into_iter().collect::<Vec<B>>();
403 buffered.sort_by_key(Block::height);
404
405 // Check that the initial blocks are contiguous in height.
406 buffered.windows(2).for_each(|window| {
407 assert_eq!(
408 window[0].height() + 1,
409 window[1].height(),
410 "initial blocks must be contiguous in height"
411 );
412 });
413
414 Self {
415 marshal,
416 buffered,
417 pending: FuturesOrdered::new(),
418 }
419 }
420}
421
422impl<S: Scheme, B: Block> Stream for AncestorStream<S, B> {
423 type Item = B;
424
425 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
426 // Because marshal cannot currently yield the genesis block, we stop at height 1.
427 const END_BOUND: u64 = 1;
428
429 let mut this = self.project();
430
431 // If a result has been buffered, return it and queue the parent fetch if needed.
432 if let Some(block) = this.buffered.pop() {
433 let height = block.height();
434 let should_fetch_parent = height > END_BOUND && this.buffered.is_empty();
435 if should_fetch_parent {
436 let parent_commitment = block.parent();
437 let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
438 this.pending.push_back(future);
439
440 // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
441 // buffer it for the next poll.
442 if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
443 this.buffered.push(block);
444 }
445 }
446
447 return Poll::Ready(Some(block));
448 }
449
450 match this.pending.as_mut().poll_next(cx) {
451 Poll::Pending => Poll::Pending,
452 Poll::Ready(None) | Poll::Ready(Some(None)) => Poll::Ready(None),
453 Poll::Ready(Some(Some(block))) => {
454 let height = block.height();
455 let should_fetch_parent = height > END_BOUND;
456 if should_fetch_parent {
457 let parent_commitment = block.parent();
458 let future = subscribe_block_future(this.marshal.clone(), parent_commitment);
459 this.pending.push_back(future);
460
461 // Explicitly poll the pending futures to kick off the fetch. If it's already ready,
462 // buffer it for the next poll.
463 if let Poll::Ready(Some(Some(block))) = this.pending.as_mut().poll_next(cx) {
464 this.buffered.push(block);
465 }
466 }
467
468 Poll::Ready(Some(block))
469 }
470 }
471 }
472}