commonware_consensus/marshal/actor.rs
1use super::{
2 cache,
3 config::Config,
4 ingress::{
5 handler::{self, Request},
6 mailbox::{Mailbox, Message},
7 },
8};
9use crate::{
10 marshal::{
11 ingress::mailbox::Identifier as BlockID,
12 store::{Blocks, Certificates},
13 Update,
14 },
15 simplex::{
16 scheme::Scheme,
17 types::{Finalization, Notarization},
18 },
19 types::{Epoch, Epocher, Height, Round, ViewDelta},
20 Block, Reporter,
21};
22use commonware_broadcast::{buffered, Broadcaster};
23use commonware_codec::{Decode, Encode};
24use commonware_cryptography::{
25 certificate::{Provider, Scheme as CertificateScheme},
26 PublicKey,
27};
28use commonware_macros::select_loop;
29use commonware_p2p::Recipients;
30use commonware_parallel::Strategy;
31use commonware_resolver::Resolver;
32use commonware_runtime::{
33 spawn_cell, telemetry::metrics::status::GaugeExt, Clock, ContextCell, Handle, Metrics, Spawner,
34 Storage,
35};
36use commonware_storage::{
37 archive::Identifier as ArchiveID,
38 metadata::{self, Metadata},
39};
40use commonware_utils::{
41 acknowledgement::Exact,
42 channel::{fallible::OneshotExt, mpsc, oneshot},
43 futures::{AbortablePool, Aborter, OptionFuture},
44 sequence::U64,
45 Acknowledgement, BoxedError,
46};
47use futures::try_join;
48use pin_project::pin_project;
49use prometheus_client::metrics::gauge::Gauge;
50use rand_core::CryptoRngCore;
51use std::{
52 collections::{btree_map::Entry, BTreeMap},
53 future::Future,
54 num::NonZeroUsize,
55 sync::Arc,
56};
57use tracing::{debug, error, info, warn};
58
59/// The key used to store the last processed height in the metadata store.
60const LATEST_KEY: U64 = U64::new(0xFF);
61
62/// A pending acknowledgement from the application for processing a block at the contained height/commitment.
63#[pin_project]
64struct PendingAck<B: Block, A: Acknowledgement> {
65 height: Height,
66 commitment: B::Commitment,
67 #[pin]
68 receiver: A::Waiter,
69}
70
71impl<B: Block, A: Acknowledgement> Future for PendingAck<B, A> {
72 type Output = <A::Waiter as Future>::Output;
73
74 fn poll(
75 self: std::pin::Pin<&mut Self>,
76 cx: &mut std::task::Context<'_>,
77 ) -> std::task::Poll<Self::Output> {
78 self.project().receiver.poll(cx)
79 }
80}
81
82/// A struct that holds multiple subscriptions for a block.
83struct BlockSubscription<B: Block> {
84 // The subscribers that are waiting for the block
85 subscribers: Vec<oneshot::Sender<B>>,
86 // Aborter that aborts the waiter future when dropped
87 _aborter: Aborter,
88}
89
90/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
91/// receiving notarizations and finalizations from consensus, and reconstructing a total order
92/// of blocks.
93///
94/// The actor is designed to be used in a view-based model. Each view corresponds to a
95/// potential block in the chain. The actor will only finalize a block if it has a
96/// corresponding finalization.
97///
98/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
99/// finalization for a block that is ahead of its current view, it will request the missing blocks
100/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
101/// behind.
102pub struct Actor<E, B, P, FC, FB, ES, T, A = Exact>
103where
104 E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
105 B: Block,
106 P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
107 FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
108 FB: Blocks<Block = B>,
109 ES: Epocher,
110 T: Strategy,
111 A: Acknowledgement,
112{
113 // ---------- Context ----------
114 context: ContextCell<E>,
115
116 // ---------- Message Passing ----------
117 // Mailbox
118 mailbox: mpsc::Receiver<Message<P::Scheme, B>>,
119
120 // ---------- Configuration ----------
121 // Provider for epoch-specific signing schemes
122 provider: P,
123 // Epoch configuration
124 epocher: ES,
125 // Minimum number of views to retain temporary data after the application processes a block
126 view_retention_timeout: ViewDelta,
127 // Maximum number of blocks to repair at once
128 max_repair: NonZeroUsize,
129 // Codec configuration for block type
130 block_codec_config: B::Cfg,
131 // Strategy for parallel operations
132 strategy: T,
133
134 // ---------- State ----------
135 // Last view processed
136 last_processed_round: Round,
137 // Last height processed by the application
138 last_processed_height: Height,
139 // Pending application acknowledgement, if any
140 pending_ack: OptionFuture<PendingAck<B, A>>,
141 // Highest known finalized height
142 tip: Height,
143 // Outstanding subscriptions for blocks
144 block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
145
146 // ---------- Storage ----------
147 // Prunable cache
148 cache: cache::Manager<E, B, P::Scheme>,
149 // Metadata tracking application progress
150 application_metadata: Metadata<E, U64, Height>,
151 // Finalizations stored by height
152 finalizations_by_height: FC,
153 // Finalized blocks stored by height
154 finalized_blocks: FB,
155
156 // ---------- Metrics ----------
157 // Latest height metric
158 finalized_height: Gauge,
159 // Latest processed height
160 processed_height: Gauge,
161}
162
163impl<E, B, P, FC, FB, ES, T, A> Actor<E, B, P, FC, FB, ES, T, A>
164where
165 E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
166 B: Block,
167 P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
168 FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
169 FB: Blocks<Block = B>,
170 ES: Epocher,
171 T: Strategy,
172 A: Acknowledgement,
173{
174 /// Create a new application actor.
175 pub async fn init(
176 context: E,
177 finalizations_by_height: FC,
178 finalized_blocks: FB,
179 config: Config<B, P, ES, T>,
180 ) -> (Self, Mailbox<P::Scheme, B>, Height) {
181 // Initialize cache
182 let prunable_config = cache::Config {
183 partition_prefix: format!("{}-cache", config.partition_prefix.clone()),
184 prunable_items_per_section: config.prunable_items_per_section,
185 replay_buffer: config.replay_buffer,
186 key_write_buffer: config.key_write_buffer,
187 value_write_buffer: config.value_write_buffer,
188 key_page_cache: config.page_cache.clone(),
189 };
190 let cache = cache::Manager::init(
191 context.with_label("cache"),
192 prunable_config,
193 config.block_codec_config.clone(),
194 )
195 .await;
196
197 // Initialize metadata tracking application progress
198 let application_metadata = Metadata::init(
199 context.with_label("application_metadata"),
200 metadata::Config {
201 partition: format!("{}-application-metadata", config.partition_prefix),
202 codec_config: (),
203 },
204 )
205 .await
206 .expect("failed to initialize application metadata");
207 let last_processed_height = application_metadata
208 .get(&LATEST_KEY)
209 .copied()
210 .unwrap_or(Height::zero());
211
212 // Create metrics
213 let finalized_height = Gauge::default();
214 context.register(
215 "finalized_height",
216 "Finalized height of application",
217 finalized_height.clone(),
218 );
219 let processed_height = Gauge::default();
220 context.register(
221 "processed_height",
222 "Processed height of application",
223 processed_height.clone(),
224 );
225 let _ = processed_height.try_set(last_processed_height.get());
226
227 // Initialize mailbox
228 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
229 (
230 Self {
231 context: ContextCell::new(context),
232 mailbox,
233 provider: config.provider,
234 epocher: config.epocher,
235 view_retention_timeout: config.view_retention_timeout,
236 max_repair: config.max_repair,
237 block_codec_config: config.block_codec_config,
238 strategy: config.strategy,
239 last_processed_round: Round::zero(),
240 last_processed_height,
241 pending_ack: None.into(),
242 tip: Height::zero(),
243 block_subscriptions: BTreeMap::new(),
244 cache,
245 application_metadata,
246 finalizations_by_height,
247 finalized_blocks,
248 finalized_height,
249 processed_height,
250 },
251 Mailbox::new(sender),
252 last_processed_height,
253 )
254 }
255
256 /// Start the actor.
257 pub fn start<R, K>(
258 mut self,
259 application: impl Reporter<Activity = Update<B, A>>,
260 buffer: buffered::Mailbox<K, B>,
261 resolver: (mpsc::Receiver<handler::Message<B>>, R),
262 ) -> Handle<()>
263 where
264 R: Resolver<
265 Key = handler::Request<B>,
266 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
267 >,
268 K: PublicKey,
269 {
270 spawn_cell!(self.context, self.run(application, buffer, resolver).await)
271 }
272
273 /// Run the application actor.
274 async fn run<R, K>(
275 mut self,
276 mut application: impl Reporter<Activity = Update<B, A>>,
277 mut buffer: buffered::Mailbox<K, B>,
278 (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
279 ) where
280 R: Resolver<
281 Key = handler::Request<B>,
282 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
283 >,
284 K: PublicKey,
285 {
286 // Create a local pool for waiter futures.
287 let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
288
289 // Get tip and send to application
290 let tip = self.get_latest().await;
291 if let Some((height, commitment, round)) = tip {
292 application
293 .report(Update::Tip(round, height, commitment))
294 .await;
295 self.tip = height;
296 let _ = self.finalized_height.try_set(height.get());
297 }
298
299 // Attempt to dispatch the next finalized block to the application, if it is ready.
300 self.try_dispatch_block(&mut application).await;
301
302 // Attempt to repair any gaps in the finalized blocks archive, if there are any.
303 self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
304 .await;
305
306 select_loop! {
307 self.context,
308 on_start => {
309 // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
310 self.block_subscriptions.retain(|_, bs| {
311 bs.subscribers.retain(|tx| !tx.is_closed());
312 !bs.subscribers.is_empty()
313 });
314 },
315 on_stopped => {
316 debug!("context shutdown, stopping marshal");
317 },
318 // Handle waiter completions first (aborted futures are skipped)
319 Ok((commitment, block)) = waiters.next_completed() else continue => {
320 self.notify_subscribers(commitment, &block).await;
321 },
322 // Handle application acknowledgements next
323 ack = &mut self.pending_ack => {
324 let PendingAck {
325 height, commitment, ..
326 } = self.pending_ack.take().expect("ack state must be present");
327
328 match ack {
329 Ok(()) => {
330 if let Err(e) = self
331 .handle_block_processed(height, commitment, &mut resolver)
332 .await
333 {
334 error!(?e, %height, "failed to update application progress");
335 return;
336 }
337 self.try_dispatch_block(&mut application).await;
338 }
339 Err(e) => {
340 error!(?e, %height, "application did not acknowledge block");
341 return;
342 }
343 }
344 },
345 // Handle consensus inputs before backfill or resolver traffic
346 Some(message) = self.mailbox.recv() else {
347 info!("mailbox closed, shutting down");
348 break;
349 } => {
350 match message {
351 Message::GetInfo {
352 identifier,
353 response,
354 } => {
355 let info = match identifier {
356 // TODO: Instead of pulling out the entire block, determine the
357 // height directly from the archive by mapping the commitment to
358 // the index, which is the same as the height.
359 BlockID::Commitment(commitment) => self
360 .finalized_blocks
361 .get(ArchiveID::Key(&commitment))
362 .await
363 .ok()
364 .flatten()
365 .map(|b| (b.height(), commitment)),
366 BlockID::Height(height) => self
367 .finalizations_by_height
368 .get(ArchiveID::Index(height.get()))
369 .await
370 .ok()
371 .flatten()
372 .map(|f| (height, f.proposal.payload)),
373 BlockID::Latest => self.get_latest().await.map(|(h, c, _)| (h, c)),
374 };
375 response.send_lossy(info);
376 }
377 Message::Proposed { round, block } => {
378 self.cache_verified(round, block.commitment(), block.clone())
379 .await;
380 let _peers = buffer.broadcast(Recipients::All, block).await;
381 }
382 Message::Verified { round, block } => {
383 self.cache_verified(round, block.commitment(), block).await;
384 }
385 Message::Notarization { notarization } => {
386 let round = notarization.round();
387 let commitment = notarization.proposal.payload;
388
389 // Store notarization by view
390 self.cache
391 .put_notarization(round, commitment, notarization.clone())
392 .await;
393
394 // Search for block locally, otherwise fetch it remotely
395 if let Some(block) = self.find_block(&mut buffer, commitment).await {
396 // If found, persist the block
397 self.cache_block(round, commitment, block).await;
398 } else {
399 debug!(?round, "notarized block missing");
400 resolver.fetch(Request::<B>::Notarized { round }).await;
401 }
402 }
403 Message::Finalization { finalization } => {
404 // Cache finalization by round
405 let round = finalization.round();
406 let commitment = finalization.proposal.payload;
407 self.cache
408 .put_finalization(round, commitment, finalization.clone())
409 .await;
410
411 // Search for block locally, otherwise fetch it remotely
412 if let Some(block) = self.find_block(&mut buffer, commitment).await {
413 // If found, persist the block
414 let height = block.height();
415 self.finalize(
416 height,
417 commitment,
418 block,
419 Some(finalization),
420 &mut application,
421 &mut buffer,
422 &mut resolver,
423 )
424 .await;
425 debug!(?round, %height, "finalized block stored");
426 } else {
427 // Otherwise, fetch the block from the network.
428 debug!(?round, ?commitment, "finalized block missing");
429 resolver.fetch(Request::<B>::Block(commitment)).await;
430 }
431 }
432 Message::GetBlock {
433 identifier,
434 response,
435 } => match identifier {
436 BlockID::Commitment(commitment) => {
437 let result = self.find_block(&mut buffer, commitment).await;
438 response.send_lossy(result);
439 }
440 BlockID::Height(height) => {
441 let result = self.get_finalized_block(height).await;
442 response.send_lossy(result);
443 }
444 BlockID::Latest => {
445 let block = match self.get_latest().await {
446 Some((_, commitment, _)) => {
447 self.find_block(&mut buffer, commitment).await
448 }
449 None => None,
450 };
451 response.send_lossy(block);
452 }
453 },
454 Message::GetFinalization { height, response } => {
455 let finalization = self.get_finalization_by_height(height).await;
456 response.send_lossy(finalization);
457 }
458 Message::HintFinalized { height, targets } => {
459 // Skip if height is at or below the floor
460 if height <= self.last_processed_height {
461 continue;
462 }
463
464 // Skip if finalization is already available locally
465 if self.get_finalization_by_height(height).await.is_some() {
466 continue;
467 }
468
469 // Trigger a targeted fetch via the resolver
470 let request = Request::<B>::Finalized { height };
471 resolver.fetch_targeted(request, targets).await;
472 }
473 Message::Subscribe {
474 round,
475 commitment,
476 response,
477 } => {
478 // Check for block locally
479 if let Some(block) = self.find_block(&mut buffer, commitment).await {
480 response.send_lossy(block);
481 continue;
482 }
483
484 // We don't have the block locally, so fetch the block from the network
485 // if we have an associated view. If we only have the digest, don't make
486 // the request as we wouldn't know when to drop it, and the request may
487 // never complete if the block is not finalized.
488 if let Some(round) = round {
489 if round < self.last_processed_round {
490 // At this point, we have failed to find the block locally, and
491 // we know that its round is less than the last processed round.
492 // This means that something else was finalized in that round,
493 // so we drop the response to indicate that the block may never
494 // be available.
495 continue;
496 }
497 // Attempt to fetch the block (with notarization) from the resolver.
498 // If this is a valid view, this request should be fine to keep open
499 // until resolution or pruning (even if the oneshot is canceled).
500 debug!(?round, ?commitment, "requested block missing");
501 resolver.fetch(Request::<B>::Notarized { round }).await;
502 }
503
504 // Register subscriber
505 debug!(?round, ?commitment, "registering subscriber");
506 match self.block_subscriptions.entry(commitment) {
507 Entry::Occupied(mut entry) => {
508 entry.get_mut().subscribers.push(response);
509 }
510 Entry::Vacant(entry) => {
511 let (tx, rx) = oneshot::channel();
512 buffer.subscribe_prepared(None, commitment, None, tx).await;
513 let aborter = waiters.push(async move {
514 (commitment, rx.await.expect("buffer subscriber closed"))
515 });
516 entry.insert(BlockSubscription {
517 subscribers: vec![response],
518 _aborter: aborter,
519 });
520 }
521 }
522 }
523 Message::SetFloor { height } => {
524 if self.last_processed_height >= height {
525 warn!(
526 %height,
527 existing = %self.last_processed_height,
528 "floor not updated, lower than existing"
529 );
530 continue;
531 }
532
533 // Update the processed height
534 if let Err(err) = self.set_processed_height(height, &mut resolver).await {
535 error!(?err, %height, "failed to update floor");
536 return;
537 }
538
539 // Drop the pending acknowledgement, if one exists. We must do this to prevent
540 // an in-process block from being processed that is below the new floor
541 // updating `last_processed_height`.
542 self.pending_ack = None.into();
543
544 // Prune the finalized block and finalization certificate archives in parallel.
545 if let Err(err) = self.prune_finalized_archives(height).await {
546 error!(?err, %height, "failed to prune finalized archives");
547 return;
548 }
549 }
550 Message::Prune { height } => {
551 // Only allow pruning at or below the current floor
552 if height > self.last_processed_height {
553 warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring");
554 continue;
555 }
556
557 // Prune the finalized block and finalization certificate archives in parallel.
558 if let Err(err) = self.prune_finalized_archives(height).await {
559 error!(?err, %height, "failed to prune finalized archives");
560 return;
561 }
562 }
563 }
564 },
565 // Handle resolver messages last
566 Some(message) = resolver_rx.recv() else {
567 info!("handler closed, shutting down");
568 break;
569 } => {
570 match message {
571 handler::Message::Produce { key, response } => {
572 match key {
573 Request::Block(commitment) => {
574 // Check for block locally
575 let Some(block) = self.find_block(&mut buffer, commitment).await
576 else {
577 debug!(?commitment, "block missing on request");
578 continue;
579 };
580 response.send_lossy(block.encode());
581 }
582 Request::Finalized { height } => {
583 // Get finalization
584 let Some(finalization) =
585 self.get_finalization_by_height(height).await
586 else {
587 debug!(%height, "finalization missing on request");
588 continue;
589 };
590
591 // Get block
592 let Some(block) = self.get_finalized_block(height).await else {
593 debug!(%height, "finalized block missing on request");
594 continue;
595 };
596
597 // Send finalization
598 response.send_lossy((finalization, block).encode());
599 }
600 Request::Notarized { round } => {
601 // Get notarization
602 let Some(notarization) = self.cache.get_notarization(round).await
603 else {
604 debug!(?round, "notarization missing on request");
605 continue;
606 };
607
608 // Get block
609 let commitment = notarization.proposal.payload;
610 let Some(block) = self.find_block(&mut buffer, commitment).await
611 else {
612 debug!(?commitment, "block missing on request");
613 continue;
614 };
615 response.send_lossy((notarization, block).encode());
616 }
617 }
618 }
619 handler::Message::Deliver {
620 key,
621 value,
622 response,
623 } => {
624 match key {
625 Request::Block(commitment) => {
626 // Parse block
627 let Ok(block) =
628 B::decode_cfg(value.as_ref(), &self.block_codec_config)
629 else {
630 response.send_lossy(false);
631 continue;
632 };
633
634 // Validation
635 if block.commitment() != commitment {
636 response.send_lossy(false);
637 continue;
638 }
639
640 // Persist the block, also persisting the finalization if we have it
641 let height = block.height();
642 let finalization =
643 self.cache.get_finalization_for(commitment).await;
644 self.finalize(
645 height,
646 commitment,
647 block,
648 finalization,
649 &mut application,
650 &mut buffer,
651 &mut resolver,
652 )
653 .await;
654 debug!(?commitment, %height, "received block");
655 response.send_lossy(true);
656 }
657 Request::Finalized { height } => {
658 let Some(bounds) = self.epocher.containing(height) else {
659 response.send_lossy(false);
660 continue;
661 };
662 let Some(scheme) =
663 self.get_scheme_certificate_verifier(bounds.epoch())
664 else {
665 response.send_lossy(false);
666 continue;
667 };
668
669 // Parse finalization
670 let Ok((finalization, block)) =
671 <(Finalization<P::Scheme, B::Commitment>, B)>::decode_cfg(
672 value,
673 &(
674 scheme.certificate_codec_config(),
675 self.block_codec_config.clone(),
676 ),
677 )
678 else {
679 response.send_lossy(false);
680 continue;
681 };
682
683 // Validation
684 if block.height() != height
685 || finalization.proposal.payload != block.commitment()
686 || !finalization.verify(
687 &mut self.context,
688 &scheme,
689 &self.strategy,
690 )
691 {
692 response.send_lossy(false);
693 continue;
694 }
695
696 // Valid finalization received
697 debug!(%height, "received finalization");
698 response.send_lossy(true);
699 self.finalize(
700 height,
701 block.commitment(),
702 block,
703 Some(finalization),
704 &mut application,
705 &mut buffer,
706 &mut resolver,
707 )
708 .await;
709 }
710 Request::Notarized { round } => {
711 let Some(scheme) =
712 self.get_scheme_certificate_verifier(round.epoch())
713 else {
714 response.send_lossy(false);
715 continue;
716 };
717
718 // Parse notarization
719 let Ok((notarization, block)) =
720 <(Notarization<P::Scheme, B::Commitment>, B)>::decode_cfg(
721 value,
722 &(
723 scheme.certificate_codec_config(),
724 self.block_codec_config.clone(),
725 ),
726 )
727 else {
728 response.send_lossy(false);
729 continue;
730 };
731
732 // Validation
733 if notarization.round() != round
734 || notarization.proposal.payload != block.commitment()
735 || !notarization.verify(
736 &mut self.context,
737 &scheme,
738 &self.strategy,
739 )
740 {
741 response.send_lossy(false);
742 continue;
743 }
744
745 // Valid notarization received
746 response.send_lossy(true);
747 let commitment = block.commitment();
748 debug!(?round, ?commitment, "received notarization");
749
750 // If there exists a finalization certificate for this block, we
751 // should finalize it. While not necessary, this could finalize
752 // the block faster in the case where a notarization then a
753 // finalization is received via the consensus engine and we
754 // resolve the request for the notarization before we resolve
755 // the request for the block.
756 let height = block.height();
757 if let Some(finalization) =
758 self.cache.get_finalization_for(commitment).await
759 {
760 self.finalize(
761 height,
762 commitment,
763 block.clone(),
764 Some(finalization),
765 &mut application,
766 &mut buffer,
767 &mut resolver,
768 )
769 .await;
770 }
771
772 // Cache the notarization and block
773 self.cache_block(round, commitment, block).await;
774 self.cache
775 .put_notarization(round, commitment, notarization)
776 .await;
777 }
778 }
779 }
780 }
781 },
782 }
783 }
784
785 /// Returns a scheme suitable for verifying certificates at the given epoch.
786 ///
787 /// Prefers a certificate verifier if available, otherwise falls back
788 /// to the scheme for the given epoch.
789 fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
790 self.provider.all().or_else(|| self.provider.scoped(epoch))
791 }
792
793 // -------------------- Waiters --------------------
794
795 /// Notify any subscribers for the given commitment with the provided block.
796 async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
797 if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
798 for subscriber in bs.subscribers.drain(..) {
799 subscriber.send_lossy(block.clone());
800 }
801 }
802 }
803
804 // -------------------- Application Dispatch --------------------
805
806 /// Attempt to dispatch the next finalized block to the application if ready.
807 async fn try_dispatch_block(
808 &mut self,
809 application: &mut impl Reporter<Activity = Update<B, A>>,
810 ) {
811 if self.pending_ack.is_some() {
812 return;
813 }
814
815 let next_height = self.last_processed_height.next();
816 let Some(block) = self.get_finalized_block(next_height).await else {
817 return;
818 };
819 assert_eq!(
820 block.height(),
821 next_height,
822 "finalized block height mismatch"
823 );
824
825 let (height, commitment) = (block.height(), block.commitment());
826 let (ack, ack_waiter) = A::handle();
827 application.report(Update::Block(block, ack)).await;
828 self.pending_ack.replace(PendingAck {
829 height,
830 commitment,
831 receiver: ack_waiter,
832 });
833 }
834
835 /// Handle acknowledgement from the application that a block has been processed.
836 async fn handle_block_processed(
837 &mut self,
838 height: Height,
839 commitment: B::Commitment,
840 resolver: &mut impl Resolver<Key = Request<B>>,
841 ) -> Result<(), metadata::Error> {
842 // Update the processed height
843 self.set_processed_height(height, resolver).await?;
844
845 // Cancel any useless requests
846 resolver.cancel(Request::<B>::Block(commitment)).await;
847
848 if let Some(finalization) = self.get_finalization_by_height(height).await {
849 // Trail the previous processed finalized block by the timeout
850 let lpr = self.last_processed_round;
851 let prune_round = Round::new(
852 lpr.epoch(),
853 lpr.view().saturating_sub(self.view_retention_timeout),
854 );
855
856 // Prune archives
857 self.cache.prune(prune_round).await;
858
859 // Update the last processed round
860 let round = finalization.round();
861 self.last_processed_round = round;
862
863 // Cancel useless requests
864 resolver
865 .retain(Request::<B>::Notarized { round }.predicate())
866 .await;
867 }
868
869 Ok(())
870 }
871
872 // -------------------- Prunable Storage --------------------
873
874 /// Add a verified block to the prunable archive.
875 async fn cache_verified(&mut self, round: Round, commitment: B::Commitment, block: B) {
876 self.notify_subscribers(commitment, &block).await;
877 self.cache.put_verified(round, commitment, block).await;
878 }
879
880 /// Add a notarized block to the prunable archive.
881 async fn cache_block(&mut self, round: Round, commitment: B::Commitment, block: B) {
882 self.notify_subscribers(commitment, &block).await;
883 self.cache.put_block(round, commitment, block).await;
884 }
885
886 // -------------------- Immutable Storage --------------------
887
888 /// Get a finalized block from the immutable archive.
889 async fn get_finalized_block(&self, height: Height) -> Option<B> {
890 match self
891 .finalized_blocks
892 .get(ArchiveID::Index(height.get()))
893 .await
894 {
895 Ok(block) => block,
896 Err(e) => panic!("failed to get block: {e}"),
897 }
898 }
899
900 /// Get a finalization from the archive by height.
901 async fn get_finalization_by_height(
902 &self,
903 height: Height,
904 ) -> Option<Finalization<P::Scheme, B::Commitment>> {
905 match self
906 .finalizations_by_height
907 .get(ArchiveID::Index(height.get()))
908 .await
909 {
910 Ok(finalization) => finalization,
911 Err(e) => panic!("failed to get finalization: {e}"),
912 }
913 }
914
915 /// Add a finalized block, and optionally a finalization, to the archive, and
916 /// attempt to identify + repair any gaps in the archive.
917 #[allow(clippy::too_many_arguments)]
918 async fn finalize(
919 &mut self,
920 height: Height,
921 commitment: B::Commitment,
922 block: B,
923 finalization: Option<Finalization<P::Scheme, B::Commitment>>,
924 application: &mut impl Reporter<Activity = Update<B, A>>,
925 buffer: &mut buffered::Mailbox<impl PublicKey, B>,
926 resolver: &mut impl Resolver<Key = Request<B>>,
927 ) {
928 self.store_finalization(height, commitment, block, finalization, application)
929 .await;
930
931 self.try_repair_gaps(buffer, resolver, application).await;
932 }
933
934 /// Add a finalized block, and optionally a finalization, to the archive.
935 ///
936 /// After persisting the block, attempt to dispatch the next contiguous block to the
937 /// application.
938 async fn store_finalization(
939 &mut self,
940 height: Height,
941 commitment: B::Commitment,
942 block: B,
943 finalization: Option<Finalization<P::Scheme, B::Commitment>>,
944 application: &mut impl Reporter<Activity = Update<B, A>>,
945 ) {
946 self.notify_subscribers(commitment, &block).await;
947
948 // Extract round before finalization is moved into try_join
949 let round = finalization.as_ref().map(|f| f.round());
950
951 // In parallel, update the finalized blocks and finalizations archives
952 if let Err(e) = try_join!(
953 // Update the finalized blocks archive
954 async {
955 self.finalized_blocks.put(block).await.map_err(Box::new)?;
956 Ok::<_, BoxedError>(())
957 },
958 // Update the finalizations archive (if provided)
959 async {
960 if let Some(finalization) = finalization {
961 self.finalizations_by_height
962 .put(height, commitment, finalization)
963 .await
964 .map_err(Box::new)?;
965 }
966 Ok::<_, BoxedError>(())
967 }
968 ) {
969 panic!("failed to finalize: {e}");
970 }
971
972 // Update metrics and send tip update to application
973 if let Some(round) = round.filter(|_| height > self.tip) {
974 application
975 .report(Update::Tip(round, height, commitment))
976 .await;
977 self.tip = height;
978 let _ = self.finalized_height.try_set(height.get());
979 }
980
981 self.try_dispatch_block(application).await;
982 }
983
984 /// Get the latest finalized block information (height and commitment tuple).
985 ///
986 /// Blocks are only finalized directly with a finalization or indirectly via a descendant
987 /// block's finalization. Thus, the highest known finalized block must itself have a direct
988 /// finalization.
989 ///
990 /// We return the height and commitment using the highest known finalization that we know the
991 /// block height for. While it's possible that we have a later finalization, if we do not have
992 /// the full block for that finalization, we do not know it's height and therefore it would not
993 /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
994 /// should have the associated block (in the `finalized_blocks` archive) for the information
995 /// returned.
996 async fn get_latest(&mut self) -> Option<(Height, B::Commitment, Round)> {
997 let height = self.finalizations_by_height.last_index()?;
998 let finalization = self
999 .get_finalization_by_height(height)
1000 .await
1001 .expect("finalization missing");
1002 Some((height, finalization.proposal.payload, finalization.round()))
1003 }
1004
1005 // -------------------- Mixed Storage --------------------
1006
1007 /// Looks for a block anywhere in local storage.
1008 async fn find_block<K: PublicKey>(
1009 &mut self,
1010 buffer: &mut buffered::Mailbox<K, B>,
1011 commitment: B::Commitment,
1012 ) -> Option<B> {
1013 // Check buffer.
1014 if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() {
1015 return Some(block);
1016 }
1017 // Check verified / notarized blocks via cache manager.
1018 if let Some(block) = self.cache.find_block(commitment).await {
1019 return Some(block);
1020 }
1021 // Check finalized blocks.
1022 match self.finalized_blocks.get(ArchiveID::Key(&commitment)).await {
1023 Ok(block) => block, // may be None
1024 Err(e) => panic!("failed to get block: {e}"),
1025 }
1026 }
1027
1028 /// Attempt to repair any identified gaps in the finalized blocks archive. The total
1029 /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
1030 /// though multiple gaps may be spanned.
1031 async fn try_repair_gaps<K: PublicKey>(
1032 &mut self,
1033 buffer: &mut buffered::Mailbox<K, B>,
1034 resolver: &mut impl Resolver<Key = Request<B>>,
1035 application: &mut impl Reporter<Activity = Update<B, A>>,
1036 ) {
1037 let start = self.last_processed_height.next();
1038 'cache_repair: loop {
1039 let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1040 // No gaps detected
1041 return;
1042 };
1043
1044 // Attempt to repair the gap backwards from the end of the gap, using
1045 // blocks from our local storage.
1046 let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1047 panic!("gapped block missing that should exist: {gap_end}");
1048 };
1049
1050 // Compute the lower bound of the recursive repair. `gap_start` is `Some`
1051 // if `start` is not in a gap. We add one to it to ensure we don't
1052 // re-persist it to the database in the repair loop below.
1053 let gap_start = gap_start.map(|s| s.next()).unwrap_or(start);
1054
1055 // Iterate backwards, repairing blocks as we go.
1056 while cursor.height() > gap_start {
1057 let commitment = cursor.parent();
1058 if let Some(block) = self.find_block(buffer, commitment).await {
1059 let finalization = self.cache.get_finalization_for(commitment).await;
1060 self.store_finalization(
1061 block.height(),
1062 commitment,
1063 block.clone(),
1064 finalization,
1065 application,
1066 )
1067 .await;
1068 debug!(height = %block.height(), "repaired block");
1069 cursor = block;
1070 } else {
1071 // Request the next missing block digest
1072 resolver.fetch(Request::<B>::Block(commitment)).await;
1073 break 'cache_repair;
1074 }
1075 }
1076 }
1077
1078 // Request any finalizations for missing items in the archive, up to
1079 // the `max_repair` quota. This may help shrink the size of the gap
1080 // closest to the application's processed height if finalizations
1081 // for the requests' heights exist. If not, we rely on the recursive
1082 // digest fetches above.
1083 let missing_items = self
1084 .finalized_blocks
1085 .missing_items(start, self.max_repair.get());
1086 let requests = missing_items
1087 .into_iter()
1088 .map(|height| Request::<B>::Finalized { height })
1089 .collect::<Vec<_>>();
1090 if !requests.is_empty() {
1091 resolver.fetch_all(requests).await
1092 }
1093 }
1094
1095 /// Sets the processed height in storage, metrics, and in-memory state. Also cancels any
1096 /// outstanding requests below the new processed height.
1097 async fn set_processed_height(
1098 &mut self,
1099 height: Height,
1100 resolver: &mut impl Resolver<Key = Request<B>>,
1101 ) -> Result<(), metadata::Error> {
1102 self.application_metadata
1103 .put_sync(LATEST_KEY.clone(), height)
1104 .await?;
1105 self.last_processed_height = height;
1106 let _ = self
1107 .processed_height
1108 .try_set(self.last_processed_height.get());
1109
1110 // Cancel any existing requests below the new floor.
1111 resolver
1112 .retain(Request::<B>::Finalized { height }.predicate())
1113 .await;
1114
1115 Ok(())
1116 }
1117
1118 /// Prunes finalized blocks and certificates below the given height.
1119 async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
1120 try_join!(
1121 async {
1122 self.finalized_blocks
1123 .prune(height)
1124 .await
1125 .map_err(Box::new)?;
1126 Ok::<_, BoxedError>(())
1127 },
1128 async {
1129 self.finalizations_by_height
1130 .prune(height)
1131 .await
1132 .map_err(Box::new)?;
1133 Ok::<_, BoxedError>(())
1134 }
1135 )?;
1136 Ok(())
1137 }
1138}