commonware_consensus/marshal/actor.rs
1use super::{
2 cache,
3 config::Config,
4 ingress::{
5 handler::{self, Request},
6 mailbox::{Mailbox, Message},
7 },
8};
9use crate::{
10 marshal::{
11 ingress::mailbox::Identifier as BlockID,
12 store::{Blocks, Certificates},
13 Update,
14 },
15 simplex::{
16 scheme::Scheme,
17 types::{Finalization, Notarization},
18 },
19 types::{Epoch, Epocher, Height, Round, ViewDelta},
20 Block, Reporter,
21};
22use commonware_broadcast::{buffered, Broadcaster};
23use commonware_codec::{Decode, Encode};
24use commonware_cryptography::{
25 certificate::{Provider, Scheme as CertificateScheme},
26 PublicKey,
27};
28use commonware_macros::select;
29use commonware_p2p::Recipients;
30use commonware_parallel::Strategy;
31use commonware_resolver::Resolver;
32use commonware_runtime::{
33 spawn_cell, telemetry::metrics::status::GaugeExt, Clock, ContextCell, Handle, Metrics, Spawner,
34 Storage,
35};
36use commonware_storage::{
37 archive::Identifier as ArchiveID,
38 metadata::{self, Metadata},
39};
40use commonware_utils::{
41 acknowledgement::Exact,
42 channels::fallible::OneshotExt,
43 futures::{AbortablePool, Aborter, OptionFuture},
44 sequence::U64,
45 Acknowledgement, BoxedError,
46};
47use futures::{
48 channel::{mpsc, oneshot},
49 try_join, StreamExt,
50};
51use pin_project::pin_project;
52use prometheus_client::metrics::gauge::Gauge;
53use rand_core::CryptoRngCore;
54use std::{
55 collections::{btree_map::Entry, BTreeMap},
56 future::Future,
57 num::NonZeroUsize,
58 sync::Arc,
59};
60use tracing::{debug, error, info, warn};
61
62/// The key used to store the last processed height in the metadata store.
63const LATEST_KEY: U64 = U64::new(0xFF);
64
65/// A pending acknowledgement from the application for processing a block at the contained height/commitment.
66#[pin_project]
67struct PendingAck<B: Block, A: Acknowledgement> {
68 height: Height,
69 commitment: B::Commitment,
70 #[pin]
71 receiver: A::Waiter,
72}
73
74impl<B: Block, A: Acknowledgement> Future for PendingAck<B, A> {
75 type Output = <A::Waiter as Future>::Output;
76
77 fn poll(
78 self: std::pin::Pin<&mut Self>,
79 cx: &mut std::task::Context<'_>,
80 ) -> std::task::Poll<Self::Output> {
81 self.project().receiver.poll(cx)
82 }
83}
84
85/// A struct that holds multiple subscriptions for a block.
86struct BlockSubscription<B: Block> {
87 // The subscribers that are waiting for the block
88 subscribers: Vec<oneshot::Sender<B>>,
89 // Aborter that aborts the waiter future when dropped
90 _aborter: Aborter,
91}
92
93/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
94/// receiving notarizations and finalizations from consensus, and reconstructing a total order
95/// of blocks.
96///
97/// The actor is designed to be used in a view-based model. Each view corresponds to a
98/// potential block in the chain. The actor will only finalize a block if it has a
99/// corresponding finalization.
100///
101/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
102/// finalization for a block that is ahead of its current view, it will request the missing blocks
103/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
104/// behind.
105pub struct Actor<E, B, P, FC, FB, ES, T, A = Exact>
106where
107 E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
108 B: Block,
109 P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
110 FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
111 FB: Blocks<Block = B>,
112 ES: Epocher,
113 T: Strategy,
114 A: Acknowledgement,
115{
116 // ---------- Context ----------
117 context: ContextCell<E>,
118
119 // ---------- Message Passing ----------
120 // Mailbox
121 mailbox: mpsc::Receiver<Message<P::Scheme, B>>,
122
123 // ---------- Configuration ----------
124 // Provider for epoch-specific signing schemes
125 provider: P,
126 // Epoch configuration
127 epocher: ES,
128 // Minimum number of views to retain temporary data after the application processes a block
129 view_retention_timeout: ViewDelta,
130 // Maximum number of blocks to repair at once
131 max_repair: NonZeroUsize,
132 // Codec configuration for block type
133 block_codec_config: B::Cfg,
134 // Strategy for parallel operations
135 strategy: T,
136
137 // ---------- State ----------
138 // Last view processed
139 last_processed_round: Round,
140 // Last height processed by the application
141 last_processed_height: Height,
142 // Pending application acknowledgement, if any
143 pending_ack: OptionFuture<PendingAck<B, A>>,
144 // Highest known finalized height
145 tip: Height,
146 // Outstanding subscriptions for blocks
147 block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
148
149 // ---------- Storage ----------
150 // Prunable cache
151 cache: cache::Manager<E, B, P::Scheme>,
152 // Metadata tracking application progress
153 application_metadata: Metadata<E, U64, Height>,
154 // Finalizations stored by height
155 finalizations_by_height: FC,
156 // Finalized blocks stored by height
157 finalized_blocks: FB,
158
159 // ---------- Metrics ----------
160 // Latest height metric
161 finalized_height: Gauge,
162 // Latest processed height
163 processed_height: Gauge,
164}
165
166impl<E, B, P, FC, FB, ES, T, A> Actor<E, B, P, FC, FB, ES, T, A>
167where
168 E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
169 B: Block,
170 P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
171 FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
172 FB: Blocks<Block = B>,
173 ES: Epocher,
174 T: Strategy,
175 A: Acknowledgement,
176{
177 /// Create a new application actor.
178 pub async fn init(
179 context: E,
180 finalizations_by_height: FC,
181 finalized_blocks: FB,
182 config: Config<B, P, ES, T>,
183 ) -> (Self, Mailbox<P::Scheme, B>, Height) {
184 // Initialize cache
185 let prunable_config = cache::Config {
186 partition_prefix: format!("{}-cache", config.partition_prefix.clone()),
187 prunable_items_per_section: config.prunable_items_per_section,
188 replay_buffer: config.replay_buffer,
189 key_write_buffer: config.key_write_buffer,
190 value_write_buffer: config.value_write_buffer,
191 key_buffer_pool: config.buffer_pool.clone(),
192 };
193 let cache = cache::Manager::init(
194 context.with_label("cache"),
195 prunable_config,
196 config.block_codec_config.clone(),
197 )
198 .await;
199
200 // Initialize metadata tracking application progress
201 let application_metadata = Metadata::init(
202 context.with_label("application_metadata"),
203 metadata::Config {
204 partition: format!("{}-application-metadata", config.partition_prefix),
205 codec_config: (),
206 },
207 )
208 .await
209 .expect("failed to initialize application metadata");
210 let last_processed_height = application_metadata
211 .get(&LATEST_KEY)
212 .copied()
213 .unwrap_or(Height::zero());
214
215 // Create metrics
216 let finalized_height = Gauge::default();
217 context.register(
218 "finalized_height",
219 "Finalized height of application",
220 finalized_height.clone(),
221 );
222 let processed_height = Gauge::default();
223 context.register(
224 "processed_height",
225 "Processed height of application",
226 processed_height.clone(),
227 );
228 let _ = processed_height.try_set(last_processed_height.get());
229
230 // Initialize mailbox
231 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
232 (
233 Self {
234 context: ContextCell::new(context),
235 mailbox,
236 provider: config.provider,
237 epocher: config.epocher,
238 view_retention_timeout: config.view_retention_timeout,
239 max_repair: config.max_repair,
240 block_codec_config: config.block_codec_config,
241 strategy: config.strategy,
242 last_processed_round: Round::zero(),
243 last_processed_height,
244 pending_ack: None.into(),
245 tip: Height::zero(),
246 block_subscriptions: BTreeMap::new(),
247 cache,
248 application_metadata,
249 finalizations_by_height,
250 finalized_blocks,
251 finalized_height,
252 processed_height,
253 },
254 Mailbox::new(sender),
255 last_processed_height,
256 )
257 }
258
259 /// Start the actor.
260 pub fn start<R, K>(
261 mut self,
262 application: impl Reporter<Activity = Update<B, A>>,
263 buffer: buffered::Mailbox<K, B>,
264 resolver: (mpsc::Receiver<handler::Message<B>>, R),
265 ) -> Handle<()>
266 where
267 R: Resolver<
268 Key = handler::Request<B>,
269 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
270 >,
271 K: PublicKey,
272 {
273 spawn_cell!(self.context, self.run(application, buffer, resolver).await)
274 }
275
276 /// Run the application actor.
277 async fn run<R, K>(
278 mut self,
279 mut application: impl Reporter<Activity = Update<B, A>>,
280 mut buffer: buffered::Mailbox<K, B>,
281 (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
282 ) where
283 R: Resolver<
284 Key = handler::Request<B>,
285 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
286 >,
287 K: PublicKey,
288 {
289 // Create a local pool for waiter futures.
290 let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
291
292 // Get tip and send to application
293 let tip = self.get_latest().await;
294 if let Some((height, commitment)) = tip {
295 application.report(Update::Tip(height, commitment)).await;
296 self.tip = height;
297 let _ = self.finalized_height.try_set(height.get());
298 }
299
300 // Attempt to dispatch the next finalized block to the application, if it is ready.
301 self.try_dispatch_block(&mut application).await;
302
303 // Attempt to repair any gaps in the finalized blocks archive, if there are any.
304 self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
305 .await;
306
307 loop {
308 // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
309 self.block_subscriptions.retain(|_, bs| {
310 bs.subscribers.retain(|tx| !tx.is_canceled());
311 !bs.subscribers.is_empty()
312 });
313
314 // Select messages
315 select! {
316 // Handle waiter completions first
317 result = waiters.next_completed() => {
318 let Ok((commitment, block)) = result else {
319 continue; // Aborted future
320 };
321 self.notify_subscribers(commitment, &block).await;
322 },
323 // Handle application acknowledgements next
324 ack = &mut self.pending_ack => {
325 let PendingAck { height, commitment, .. } = self.pending_ack.take().expect("ack state must be present");
326
327 match ack {
328 Ok(()) => {
329 if let Err(e) = self
330 .handle_block_processed(height, commitment, &mut resolver)
331 .await
332 {
333 error!(?e, %height, "failed to update application progress");
334 return;
335 }
336 self.try_dispatch_block(&mut application).await;
337 }
338 Err(e) => {
339 error!(?e, %height, "application did not acknowledge block");
340 return;
341 }
342 }
343 },
344 // Handle consensus inputs before backfill or resolver traffic
345 mailbox_message = self.mailbox.next() => {
346 let Some(message) = mailbox_message else {
347 info!("mailbox closed, shutting down");
348 return;
349 };
350 match message {
351 Message::GetInfo { identifier, response } => {
352 let info = match identifier {
353 // TODO: Instead of pulling out the entire block, determine the
354 // height directly from the archive by mapping the commitment to
355 // the index, which is the same as the height.
356 BlockID::Commitment(commitment) => self
357 .finalized_blocks
358 .get(ArchiveID::Key(&commitment))
359 .await
360 .ok()
361 .flatten()
362 .map(|b| (b.height(), commitment)),
363 BlockID::Height(height) => self
364 .finalizations_by_height
365 .get(ArchiveID::Index(height.get()))
366 .await
367 .ok()
368 .flatten()
369 .map(|f| (height, f.proposal.payload)),
370 BlockID::Latest => self.get_latest().await,
371 };
372 response.send_lossy(info);
373 }
374 Message::Proposed { round, block } => {
375 self.cache_verified(round, block.commitment(), block.clone()).await;
376 let _peers = buffer.broadcast(Recipients::All, block).await;
377 }
378 Message::Verified { round, block } => {
379 self.cache_verified(round, block.commitment(), block).await;
380 }
381 Message::Notarization { notarization } => {
382 let round = notarization.round();
383 let commitment = notarization.proposal.payload;
384
385 // Store notarization by view
386 self.cache.put_notarization(round, commitment, notarization.clone()).await;
387
388 // Search for block locally, otherwise fetch it remotely
389 if let Some(block) = self.find_block(&mut buffer, commitment).await {
390 // If found, persist the block
391 self.cache_block(round, commitment, block).await;
392 } else {
393 debug!(?round, "notarized block missing");
394 resolver.fetch(Request::<B>::Notarized { round }).await;
395 }
396 }
397 Message::Finalization { finalization } => {
398 // Cache finalization by round
399 let round = finalization.round();
400 let commitment = finalization.proposal.payload;
401 self.cache.put_finalization(round, commitment, finalization.clone()).await;
402
403 // Search for block locally, otherwise fetch it remotely
404 if let Some(block) = self.find_block(&mut buffer, commitment).await {
405 // If found, persist the block
406 let height = block.height();
407 self.finalize(
408 height,
409 commitment,
410 block,
411 Some(finalization),
412 &mut application,
413 &mut buffer,
414 &mut resolver,
415 )
416 .await;
417 debug!(?round, %height, "finalized block stored");
418 } else {
419 // Otherwise, fetch the block from the network.
420 debug!(?round, ?commitment, "finalized block missing");
421 resolver.fetch(Request::<B>::Block(commitment)).await;
422 }
423 }
424 Message::GetBlock { identifier, response } => {
425 match identifier {
426 BlockID::Commitment(commitment) => {
427 let result = self.find_block(&mut buffer, commitment).await;
428 response.send_lossy(result);
429 }
430 BlockID::Height(height) => {
431 let result = self.get_finalized_block(height).await;
432 response.send_lossy(result);
433 }
434 BlockID::Latest => {
435 let block = match self.get_latest().await {
436 Some((_, commitment)) => self.find_block(&mut buffer, commitment).await,
437 None => None,
438 };
439 response.send_lossy(block);
440 }
441 }
442 }
443 Message::GetFinalization { height, response } => {
444 let finalization = self.get_finalization_by_height(height).await;
445 response.send_lossy(finalization);
446 }
447 Message::HintFinalized { height, targets } => {
448 // Skip if height is at or below the floor
449 if height <= self.last_processed_height {
450 continue;
451 }
452
453 // Skip if finalization is already available locally
454 if self.get_finalization_by_height(height).await.is_some() {
455 continue;
456 }
457
458 // Trigger a targeted fetch via the resolver
459 let request = Request::<B>::Finalized { height };
460 resolver.fetch_targeted(request, targets).await;
461 }
462 Message::Subscribe { round, commitment, response } => {
463 // Check for block locally
464 if let Some(block) = self.find_block(&mut buffer, commitment).await {
465 response.send_lossy(block);
466 continue;
467 }
468
469 // We don't have the block locally, so fetch the block from the network
470 // if we have an associated view. If we only have the digest, don't make
471 // the request as we wouldn't know when to drop it, and the request may
472 // never complete if the block is not finalized.
473 if let Some(round) = round {
474 if round < self.last_processed_round {
475 // At this point, we have failed to find the block locally, and
476 // we know that its round is less than the last processed round.
477 // This means that something else was finalized in that round,
478 // so we drop the response to indicate that the block may never
479 // be available.
480 continue;
481 }
482 // Attempt to fetch the block (with notarization) from the resolver.
483 // If this is a valid view, this request should be fine to keep open
484 // until resolution or pruning (even if the oneshot is canceled).
485 debug!(?round, ?commitment, "requested block missing");
486 resolver.fetch(Request::<B>::Notarized { round }).await;
487 }
488
489 // Register subscriber
490 debug!(?round, ?commitment, "registering subscriber");
491 match self.block_subscriptions.entry(commitment) {
492 Entry::Occupied(mut entry) => {
493 entry.get_mut().subscribers.push(response);
494 }
495 Entry::Vacant(entry) => {
496 let (tx, rx) = oneshot::channel();
497 buffer.subscribe_prepared(None, commitment, None, tx).await;
498 let aborter = waiters.push(async move {
499 (commitment, rx.await.expect("buffer subscriber closed"))
500 });
501 entry.insert(BlockSubscription {
502 subscribers: vec![response],
503 _aborter: aborter,
504 });
505 }
506 }
507 }
508 Message::SetFloor { height } => {
509 if let Some(stored_height) = self.application_metadata.get(&LATEST_KEY) {
510 if *stored_height >= height {
511 warn!(%height, existing = %stored_height, "floor not updated, lower than existing");
512 continue;
513 }
514 }
515
516 // Update the processed height
517 if let Err(err) = self.set_processed_height(height, &mut resolver).await {
518 error!(?err, %height, "failed to update floor");
519 return;
520 }
521
522 // Drop the pending acknowledgement, if one exists. We must do this to prevent
523 // an in-process block from being processed that is below the new floor
524 // updating `last_processed_height`.
525 self.pending_ack = None.into();
526
527 // Prune the finalized block and finalization certificate archives in parallel.
528 if let Err(err) = try_join!(
529 // Prune the finalized blocks archive
530 async {
531 self.finalized_blocks.prune(height).await.map_err(Box::new)?;
532 Ok::<_, BoxedError>(())
533 },
534 // Prune the finalization certificate archive
535 async {
536 self.finalizations_by_height
537 .prune(height)
538 .await
539 .map_err(Box::new)?;
540 Ok::<_, BoxedError>(())
541 }
542 ) {
543 error!(?err, %height, "failed to prune finalized archives");
544 return;
545 }
546 }
547 }
548 },
549 // Handle resolver messages last
550 message = resolver_rx.next() => {
551 let Some(message) = message else {
552 info!("handler closed, shutting down");
553 return;
554 };
555 match message {
556 handler::Message::Produce { key, response } => {
557 match key {
558 Request::Block(commitment) => {
559 // Check for block locally
560 let Some(block) = self.find_block(&mut buffer, commitment).await else {
561 debug!(?commitment, "block missing on request");
562 continue;
563 };
564 response.send_lossy(block.encode());
565 }
566 Request::Finalized { height } => {
567 // Get finalization
568 let Some(finalization) = self.get_finalization_by_height(height).await else {
569 debug!(%height, "finalization missing on request");
570 continue;
571 };
572
573 // Get block
574 let Some(block) = self.get_finalized_block(height).await else {
575 debug!(%height, "finalized block missing on request");
576 continue;
577 };
578
579 // Send finalization
580 response.send_lossy((finalization, block).encode());
581 }
582 Request::Notarized { round } => {
583 // Get notarization
584 let Some(notarization) = self.cache.get_notarization(round).await else {
585 debug!(?round, "notarization missing on request");
586 continue;
587 };
588
589 // Get block
590 let commitment = notarization.proposal.payload;
591 let Some(block) = self.find_block(&mut buffer, commitment).await else {
592 debug!(?commitment, "block missing on request");
593 continue;
594 };
595 response.send_lossy((notarization, block).encode());
596 }
597 }
598 },
599 handler::Message::Deliver { key, value, response } => {
600 match key {
601 Request::Block(commitment) => {
602 // Parse block
603 let Ok(block) = B::decode_cfg(value.as_ref(), &self.block_codec_config) else {
604 response.send_lossy(false);
605 continue;
606 };
607
608 // Validation
609 if block.commitment() != commitment {
610 response.send_lossy(false);
611 continue;
612 }
613
614 // Persist the block, also persisting the finalization if we have it
615 let height = block.height();
616 let finalization = self.cache.get_finalization_for(commitment).await;
617 self.finalize(
618 height,
619 commitment,
620 block,
621 finalization,
622 &mut application,
623 &mut buffer,
624 &mut resolver,
625 )
626 .await;
627 debug!(?commitment, %height, "received block");
628 response.send_lossy(true);
629 },
630 Request::Finalized { height } => {
631 let Some(bounds) = self.epocher.containing(height) else {
632 response.send_lossy(false);
633 continue;
634 };
635 let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
636 response.send_lossy(false);
637 continue;
638 };
639
640 // Parse finalization
641 let Ok((finalization, block)) =
642 <(Finalization<P::Scheme, B::Commitment>, B)>::decode_cfg(
643 value,
644 &(scheme.certificate_codec_config(), self.block_codec_config.clone()),
645 )
646 else {
647 response.send_lossy(false);
648 continue;
649 };
650
651 // Validation
652 if block.height() != height
653 || finalization.proposal.payload != block.commitment()
654 || !finalization.verify(&mut self.context, &scheme, &self.strategy)
655 {
656 response.send_lossy(false);
657 continue;
658 }
659
660 // Valid finalization received
661 debug!(%height, "received finalization");
662 response.send_lossy(true);
663 self.finalize(
664 height,
665 block.commitment(),
666 block,
667 Some(finalization),
668 &mut application,
669 &mut buffer,
670 &mut resolver,
671 )
672 .await;
673 },
674 Request::Notarized { round } => {
675 let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
676 response.send_lossy(false);
677 continue;
678 };
679
680 // Parse notarization
681 let Ok((notarization, block)) =
682 <(Notarization<P::Scheme, B::Commitment>, B)>::decode_cfg(
683 value,
684 &(scheme.certificate_codec_config(), self.block_codec_config.clone()),
685 )
686 else {
687 response.send_lossy(false);
688 continue;
689 };
690
691 // Validation
692 if notarization.round() != round
693 || notarization.proposal.payload != block.commitment()
694 || !notarization.verify(&mut self.context, &scheme, &self.strategy)
695 {
696 response.send_lossy(false);
697 continue;
698 }
699
700 // Valid notarization received
701 response.send_lossy(true);
702 let commitment = block.commitment();
703 debug!(?round, ?commitment, "received notarization");
704
705 // If there exists a finalization certificate for this block, we
706 // should finalize it. While not necessary, this could finalize
707 // the block faster in the case where a notarization then a
708 // finalization is received via the consensus engine and we
709 // resolve the request for the notarization before we resolve
710 // the request for the block.
711 let height = block.height();
712 if let Some(finalization) = self.cache.get_finalization_for(commitment).await {
713 self.finalize(
714 height,
715 commitment,
716 block.clone(),
717 Some(finalization),
718 &mut application,
719 &mut buffer,
720 &mut resolver,
721 )
722 .await;
723 }
724
725 // Cache the notarization and block
726 self.cache_block(round, commitment, block).await;
727 self.cache.put_notarization(round, commitment, notarization).await;
728 },
729 }
730 },
731 }
732 },
733 }
734 }
735 }
736
737 /// Returns a scheme suitable for verifying certificates at the given epoch.
738 ///
739 /// Prefers a certificate verifier if available, otherwise falls back
740 /// to the scheme for the given epoch.
741 fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
742 self.provider.all().or_else(|| self.provider.scoped(epoch))
743 }
744
745 // -------------------- Waiters --------------------
746
747 /// Notify any subscribers for the given commitment with the provided block.
748 async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
749 if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
750 for subscriber in bs.subscribers.drain(..) {
751 subscriber.send_lossy(block.clone());
752 }
753 }
754 }
755
756 // -------------------- Application Dispatch --------------------
757
758 /// Attempt to dispatch the next finalized block to the application if ready.
759 async fn try_dispatch_block(
760 &mut self,
761 application: &mut impl Reporter<Activity = Update<B, A>>,
762 ) {
763 if self.pending_ack.is_some() {
764 return;
765 }
766
767 let next_height = self.last_processed_height.next();
768 let Some(block) = self.get_finalized_block(next_height).await else {
769 return;
770 };
771 assert_eq!(
772 block.height(),
773 next_height,
774 "finalized block height mismatch"
775 );
776
777 let (height, commitment) = (block.height(), block.commitment());
778 let (ack, ack_waiter) = A::handle();
779 application.report(Update::Block(block, ack)).await;
780 self.pending_ack.replace(PendingAck {
781 height,
782 commitment,
783 receiver: ack_waiter,
784 });
785 }
786
787 /// Handle acknowledgement from the application that a block has been processed.
788 async fn handle_block_processed(
789 &mut self,
790 height: Height,
791 commitment: B::Commitment,
792 resolver: &mut impl Resolver<Key = Request<B>>,
793 ) -> Result<(), metadata::Error> {
794 // Update the processed height
795 self.set_processed_height(height, resolver).await?;
796
797 // Cancel any useless requests
798 resolver.cancel(Request::<B>::Block(commitment)).await;
799
800 if let Some(finalization) = self.get_finalization_by_height(height).await {
801 // Trail the previous processed finalized block by the timeout
802 let lpr = self.last_processed_round;
803 let prune_round = Round::new(
804 lpr.epoch(),
805 lpr.view().saturating_sub(self.view_retention_timeout),
806 );
807
808 // Prune archives
809 self.cache.prune(prune_round).await;
810
811 // Update the last processed round
812 let round = finalization.round();
813 self.last_processed_round = round;
814
815 // Cancel useless requests
816 resolver
817 .retain(Request::<B>::Notarized { round }.predicate())
818 .await;
819 }
820
821 Ok(())
822 }
823
824 // -------------------- Prunable Storage --------------------
825
826 /// Add a verified block to the prunable archive.
827 async fn cache_verified(&mut self, round: Round, commitment: B::Commitment, block: B) {
828 self.notify_subscribers(commitment, &block).await;
829 self.cache.put_verified(round, commitment, block).await;
830 }
831
832 /// Add a notarized block to the prunable archive.
833 async fn cache_block(&mut self, round: Round, commitment: B::Commitment, block: B) {
834 self.notify_subscribers(commitment, &block).await;
835 self.cache.put_block(round, commitment, block).await;
836 }
837
838 // -------------------- Immutable Storage --------------------
839
840 /// Get a finalized block from the immutable archive.
841 async fn get_finalized_block(&self, height: Height) -> Option<B> {
842 match self
843 .finalized_blocks
844 .get(ArchiveID::Index(height.get()))
845 .await
846 {
847 Ok(block) => block,
848 Err(e) => panic!("failed to get block: {e}"),
849 }
850 }
851
852 /// Get a finalization from the archive by height.
853 async fn get_finalization_by_height(
854 &self,
855 height: Height,
856 ) -> Option<Finalization<P::Scheme, B::Commitment>> {
857 match self
858 .finalizations_by_height
859 .get(ArchiveID::Index(height.get()))
860 .await
861 {
862 Ok(finalization) => finalization,
863 Err(e) => panic!("failed to get finalization: {e}"),
864 }
865 }
866
867 /// Add a finalized block, and optionally a finalization, to the archive, and
868 /// attempt to identify + repair any gaps in the archive.
869 #[allow(clippy::too_many_arguments)]
870 async fn finalize(
871 &mut self,
872 height: Height,
873 commitment: B::Commitment,
874 block: B,
875 finalization: Option<Finalization<P::Scheme, B::Commitment>>,
876 application: &mut impl Reporter<Activity = Update<B, A>>,
877 buffer: &mut buffered::Mailbox<impl PublicKey, B>,
878 resolver: &mut impl Resolver<Key = Request<B>>,
879 ) {
880 self.store_finalization(height, commitment, block, finalization, application)
881 .await;
882
883 self.try_repair_gaps(buffer, resolver, application).await;
884 }
885
886 /// Add a finalized block, and optionally a finalization, to the archive.
887 ///
888 /// After persisting the block, attempt to dispatch the next contiguous block to the
889 /// application.
890 async fn store_finalization(
891 &mut self,
892 height: Height,
893 commitment: B::Commitment,
894 block: B,
895 finalization: Option<Finalization<P::Scheme, B::Commitment>>,
896 application: &mut impl Reporter<Activity = Update<B, A>>,
897 ) {
898 self.notify_subscribers(commitment, &block).await;
899
900 // In parallel, update the finalized blocks and finalizations archives
901 if let Err(e) = try_join!(
902 // Update the finalized blocks archive
903 async {
904 self.finalized_blocks.put(block).await.map_err(Box::new)?;
905 Ok::<_, BoxedError>(())
906 },
907 // Update the finalizations archive (if provided)
908 async {
909 if let Some(finalization) = finalization {
910 self.finalizations_by_height
911 .put(height, commitment, finalization)
912 .await
913 .map_err(Box::new)?;
914 }
915 Ok::<_, BoxedError>(())
916 }
917 ) {
918 panic!("failed to finalize: {e}");
919 }
920
921 // Update metrics and send tip update to application
922 if height > self.tip {
923 application.report(Update::Tip(height, commitment)).await;
924 self.tip = height;
925 let _ = self.finalized_height.try_set(height.get());
926 }
927
928 self.try_dispatch_block(application).await;
929 }
930
931 /// Get the latest finalized block information (height and commitment tuple).
932 ///
933 /// Blocks are only finalized directly with a finalization or indirectly via a descendant
934 /// block's finalization. Thus, the highest known finalized block must itself have a direct
935 /// finalization.
936 ///
937 /// We return the height and commitment using the highest known finalization that we know the
938 /// block height for. While it's possible that we have a later finalization, if we do not have
939 /// the full block for that finalization, we do not know it's height and therefore it would not
940 /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
941 /// should have the associated block (in the `finalized_blocks` archive) for the information
942 /// returned.
943 async fn get_latest(&mut self) -> Option<(Height, B::Commitment)> {
944 let height = self.finalizations_by_height.last_index()?;
945 let finalization = self
946 .get_finalization_by_height(height)
947 .await
948 .expect("finalization missing");
949 Some((height, finalization.proposal.payload))
950 }
951
952 // -------------------- Mixed Storage --------------------
953
954 /// Looks for a block anywhere in local storage.
955 async fn find_block<K: PublicKey>(
956 &mut self,
957 buffer: &mut buffered::Mailbox<K, B>,
958 commitment: B::Commitment,
959 ) -> Option<B> {
960 // Check buffer.
961 if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() {
962 return Some(block);
963 }
964 // Check verified / notarized blocks via cache manager.
965 if let Some(block) = self.cache.find_block(commitment).await {
966 return Some(block);
967 }
968 // Check finalized blocks.
969 match self.finalized_blocks.get(ArchiveID::Key(&commitment)).await {
970 Ok(block) => block, // may be None
971 Err(e) => panic!("failed to get block: {e}"),
972 }
973 }
974
975 /// Attempt to repair any identified gaps in the finalized blocks archive. The total
976 /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
977 /// though multiple gaps may be spanned.
978 async fn try_repair_gaps<K: PublicKey>(
979 &mut self,
980 buffer: &mut buffered::Mailbox<K, B>,
981 resolver: &mut impl Resolver<Key = Request<B>>,
982 application: &mut impl Reporter<Activity = Update<B, A>>,
983 ) {
984 let start = self.last_processed_height.next();
985 'cache_repair: loop {
986 let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
987 // No gaps detected
988 return;
989 };
990
991 // Attempt to repair the gap backwards from the end of the gap, using
992 // blocks from our local storage.
993 let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
994 panic!("gapped block missing that should exist: {gap_end}");
995 };
996
997 // Compute the lower bound of the recursive repair. `gap_start` is `Some`
998 // if `start` is not in a gap. We add one to it to ensure we don't
999 // re-persist it to the database in the repair loop below.
1000 let gap_start = gap_start.map(|s| s.next()).unwrap_or(start);
1001
1002 // Iterate backwards, repairing blocks as we go.
1003 while cursor.height() > gap_start {
1004 let commitment = cursor.parent();
1005 if let Some(block) = self.find_block(buffer, commitment).await {
1006 let finalization = self.cache.get_finalization_for(commitment).await;
1007 self.store_finalization(
1008 block.height(),
1009 commitment,
1010 block.clone(),
1011 finalization,
1012 application,
1013 )
1014 .await;
1015 debug!(height = %block.height(), "repaired block");
1016 cursor = block;
1017 } else {
1018 // Request the next missing block digest
1019 resolver.fetch(Request::<B>::Block(commitment)).await;
1020 break 'cache_repair;
1021 }
1022 }
1023 }
1024
1025 // Request any finalizations for missing items in the archive, up to
1026 // the `max_repair` quota. This may help shrink the size of the gap
1027 // closest to the application's processed height if finalizations
1028 // for the requests' heights exist. If not, we rely on the recursive
1029 // digest fetches above.
1030 let missing_items = self
1031 .finalized_blocks
1032 .missing_items(start, self.max_repair.get());
1033 let requests = missing_items
1034 .into_iter()
1035 .map(|height| Request::<B>::Finalized { height })
1036 .collect::<Vec<_>>();
1037 if !requests.is_empty() {
1038 resolver.fetch_all(requests).await
1039 }
1040 }
1041
1042 /// Sets the processed height in storage, metrics, and in-memory state. Also cancels any
1043 /// outstanding requests below the new processed height.
1044 async fn set_processed_height(
1045 &mut self,
1046 height: Height,
1047 resolver: &mut impl Resolver<Key = Request<B>>,
1048 ) -> Result<(), metadata::Error> {
1049 self.application_metadata
1050 .put_sync(LATEST_KEY.clone(), height)
1051 .await?;
1052 self.last_processed_height = height;
1053 let _ = self
1054 .processed_height
1055 .try_set(self.last_processed_height.get());
1056
1057 // Cancel any existing requests below the new floor.
1058 resolver
1059 .retain(Request::<B>::Finalized { height }.predicate())
1060 .await;
1061
1062 Ok(())
1063 }
1064}