Skip to main content

commonware_consensus/marshal/
actor.rs

1use super::{
2    cache,
3    config::Config,
4    ingress::{
5        handler::{self, Request},
6        mailbox::{Mailbox, Message},
7    },
8};
9use crate::{
10    marshal::{
11        ingress::mailbox::Identifier as BlockID,
12        store::{Blocks, Certificates},
13        Update,
14    },
15    simplex::{
16        scheme::Scheme,
17        types::{Finalization, Notarization},
18    },
19    types::{Epoch, Epocher, Height, Round, ViewDelta},
20    Block, Reporter,
21};
22use commonware_broadcast::{buffered, Broadcaster};
23use commonware_codec::{Decode, Encode};
24use commonware_cryptography::{
25    certificate::{Provider, Scheme as CertificateScheme},
26    PublicKey,
27};
28use commonware_macros::select_loop;
29use commonware_p2p::Recipients;
30use commonware_parallel::Strategy;
31use commonware_resolver::Resolver;
32use commonware_runtime::{
33    spawn_cell, telemetry::metrics::status::GaugeExt, Clock, ContextCell, Handle, Metrics, Spawner,
34    Storage,
35};
36use commonware_storage::{
37    archive::Identifier as ArchiveID,
38    metadata::{self, Metadata},
39};
40use commonware_utils::{
41    acknowledgement::Exact,
42    channel::{fallible::OneshotExt, mpsc, oneshot},
43    futures::{AbortablePool, Aborter, OptionFuture},
44    sequence::U64,
45    Acknowledgement, BoxedError,
46};
47use futures::try_join;
48use pin_project::pin_project;
49use prometheus_client::metrics::gauge::Gauge;
50use rand_core::CryptoRngCore;
51use std::{
52    collections::{btree_map::Entry, BTreeMap},
53    future::Future,
54    num::NonZeroUsize,
55    sync::Arc,
56};
57use tracing::{debug, error, info, warn};
58
59/// The key used to store the last processed height in the metadata store.
60const LATEST_KEY: U64 = U64::new(0xFF);
61
62/// A pending acknowledgement from the application for processing a block at the contained height/commitment.
63#[pin_project]
64struct PendingAck<B: Block, A: Acknowledgement> {
65    height: Height,
66    commitment: B::Commitment,
67    #[pin]
68    receiver: A::Waiter,
69}
70
71impl<B: Block, A: Acknowledgement> Future for PendingAck<B, A> {
72    type Output = <A::Waiter as Future>::Output;
73
74    fn poll(
75        self: std::pin::Pin<&mut Self>,
76        cx: &mut std::task::Context<'_>,
77    ) -> std::task::Poll<Self::Output> {
78        self.project().receiver.poll(cx)
79    }
80}
81
82/// A struct that holds multiple subscriptions for a block.
83struct BlockSubscription<B: Block> {
84    // The subscribers that are waiting for the block
85    subscribers: Vec<oneshot::Sender<B>>,
86    // Aborter that aborts the waiter future when dropped
87    _aborter: Aborter,
88}
89
90/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
91/// receiving notarizations and finalizations from consensus, and reconstructing a total order
92/// of blocks.
93///
94/// The actor is designed to be used in a view-based model. Each view corresponds to a
95/// potential block in the chain. The actor will only finalize a block if it has a
96/// corresponding finalization.
97///
98/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
99/// finalization for a block that is ahead of its current view, it will request the missing blocks
100/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
101/// behind.
102pub struct Actor<E, B, P, FC, FB, ES, T, A = Exact>
103where
104    E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
105    B: Block,
106    P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
107    FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
108    FB: Blocks<Block = B>,
109    ES: Epocher,
110    T: Strategy,
111    A: Acknowledgement,
112{
113    // ---------- Context ----------
114    context: ContextCell<E>,
115
116    // ---------- Message Passing ----------
117    // Mailbox
118    mailbox: mpsc::Receiver<Message<P::Scheme, B>>,
119
120    // ---------- Configuration ----------
121    // Provider for epoch-specific signing schemes
122    provider: P,
123    // Epoch configuration
124    epocher: ES,
125    // Minimum number of views to retain temporary data after the application processes a block
126    view_retention_timeout: ViewDelta,
127    // Maximum number of blocks to repair at once
128    max_repair: NonZeroUsize,
129    // Codec configuration for block type
130    block_codec_config: B::Cfg,
131    // Strategy for parallel operations
132    strategy: T,
133
134    // ---------- State ----------
135    // Last view processed
136    last_processed_round: Round,
137    // Last height processed by the application
138    last_processed_height: Height,
139    // Pending application acknowledgement, if any
140    pending_ack: OptionFuture<PendingAck<B, A>>,
141    // Highest known finalized height
142    tip: Height,
143    // Outstanding subscriptions for blocks
144    block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
145
146    // ---------- Storage ----------
147    // Prunable cache
148    cache: cache::Manager<E, B, P::Scheme>,
149    // Metadata tracking application progress
150    application_metadata: Metadata<E, U64, Height>,
151    // Finalizations stored by height
152    finalizations_by_height: FC,
153    // Finalized blocks stored by height
154    finalized_blocks: FB,
155
156    // ---------- Metrics ----------
157    // Latest height metric
158    finalized_height: Gauge,
159    // Latest processed height
160    processed_height: Gauge,
161}
162
163impl<E, B, P, FC, FB, ES, T, A> Actor<E, B, P, FC, FB, ES, T, A>
164where
165    E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
166    B: Block,
167    P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
168    FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
169    FB: Blocks<Block = B>,
170    ES: Epocher,
171    T: Strategy,
172    A: Acknowledgement,
173{
174    /// Create a new application actor.
175    pub async fn init(
176        context: E,
177        finalizations_by_height: FC,
178        finalized_blocks: FB,
179        config: Config<B, P, ES, T>,
180    ) -> (Self, Mailbox<P::Scheme, B>, Height) {
181        // Initialize cache
182        let prunable_config = cache::Config {
183            partition_prefix: format!("{}-cache", config.partition_prefix.clone()),
184            prunable_items_per_section: config.prunable_items_per_section,
185            replay_buffer: config.replay_buffer,
186            key_write_buffer: config.key_write_buffer,
187            value_write_buffer: config.value_write_buffer,
188            key_page_cache: config.page_cache.clone(),
189        };
190        let cache = cache::Manager::init(
191            context.with_label("cache"),
192            prunable_config,
193            config.block_codec_config.clone(),
194        )
195        .await;
196
197        // Initialize metadata tracking application progress
198        let application_metadata = Metadata::init(
199            context.with_label("application_metadata"),
200            metadata::Config {
201                partition: format!("{}-application-metadata", config.partition_prefix),
202                codec_config: (),
203            },
204        )
205        .await
206        .expect("failed to initialize application metadata");
207        let last_processed_height = application_metadata
208            .get(&LATEST_KEY)
209            .copied()
210            .unwrap_or(Height::zero());
211
212        // Create metrics
213        let finalized_height = Gauge::default();
214        context.register(
215            "finalized_height",
216            "Finalized height of application",
217            finalized_height.clone(),
218        );
219        let processed_height = Gauge::default();
220        context.register(
221            "processed_height",
222            "Processed height of application",
223            processed_height.clone(),
224        );
225        let _ = processed_height.try_set(last_processed_height.get());
226
227        // Initialize mailbox
228        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
229        (
230            Self {
231                context: ContextCell::new(context),
232                mailbox,
233                provider: config.provider,
234                epocher: config.epocher,
235                view_retention_timeout: config.view_retention_timeout,
236                max_repair: config.max_repair,
237                block_codec_config: config.block_codec_config,
238                strategy: config.strategy,
239                last_processed_round: Round::zero(),
240                last_processed_height,
241                pending_ack: None.into(),
242                tip: Height::zero(),
243                block_subscriptions: BTreeMap::new(),
244                cache,
245                application_metadata,
246                finalizations_by_height,
247                finalized_blocks,
248                finalized_height,
249                processed_height,
250            },
251            Mailbox::new(sender),
252            last_processed_height,
253        )
254    }
255
256    /// Start the actor.
257    pub fn start<R, K>(
258        mut self,
259        application: impl Reporter<Activity = Update<B, A>>,
260        buffer: buffered::Mailbox<K, B>,
261        resolver: (mpsc::Receiver<handler::Message<B>>, R),
262    ) -> Handle<()>
263    where
264        R: Resolver<
265            Key = handler::Request<B>,
266            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
267        >,
268        K: PublicKey,
269    {
270        spawn_cell!(self.context, self.run(application, buffer, resolver).await)
271    }
272
273    /// Run the application actor.
274    async fn run<R, K>(
275        mut self,
276        mut application: impl Reporter<Activity = Update<B, A>>,
277        mut buffer: buffered::Mailbox<K, B>,
278        (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
279    ) where
280        R: Resolver<
281            Key = handler::Request<B>,
282            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
283        >,
284        K: PublicKey,
285    {
286        // Create a local pool for waiter futures.
287        let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
288
289        // Get tip and send to application
290        let tip = self.get_latest().await;
291        if let Some((height, commitment, round)) = tip {
292            application
293                .report(Update::Tip(round, height, commitment))
294                .await;
295            self.tip = height;
296            let _ = self.finalized_height.try_set(height.get());
297        }
298
299        // Attempt to dispatch the next finalized block to the application, if it is ready.
300        self.try_dispatch_block(&mut application).await;
301
302        // Attempt to repair any gaps in the finalized blocks archive, if there are any.
303        self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
304            .await;
305
306        select_loop! {
307            self.context,
308            on_start => {
309                // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
310                self.block_subscriptions.retain(|_, bs| {
311                    bs.subscribers.retain(|tx| !tx.is_closed());
312                    !bs.subscribers.is_empty()
313                });
314            },
315            on_stopped => {
316                debug!("context shutdown, stopping marshal");
317            },
318            // Handle waiter completions first (aborted futures are skipped)
319            Ok((commitment, block)) = waiters.next_completed() else continue => {
320                self.notify_subscribers(commitment, &block).await;
321            },
322            // Handle application acknowledgements next
323            ack = &mut self.pending_ack => {
324                let PendingAck {
325                    height, commitment, ..
326                } = self.pending_ack.take().expect("ack state must be present");
327
328                match ack {
329                    Ok(()) => {
330                        if let Err(e) = self
331                            .handle_block_processed(height, commitment, &mut resolver)
332                            .await
333                        {
334                            error!(?e, %height, "failed to update application progress");
335                            return;
336                        }
337                        self.try_dispatch_block(&mut application).await;
338                    }
339                    Err(e) => {
340                        error!(?e, %height, "application did not acknowledge block");
341                        return;
342                    }
343                }
344            },
345            // Handle consensus inputs before backfill or resolver traffic
346            Some(message) = self.mailbox.recv() else {
347                info!("mailbox closed, shutting down");
348                break;
349            } => {
350                match message {
351                    Message::GetInfo {
352                        identifier,
353                        response,
354                    } => {
355                        let info = match identifier {
356                            // TODO: Instead of pulling out the entire block, determine the
357                            // height directly from the archive by mapping the commitment to
358                            // the index, which is the same as the height.
359                            BlockID::Commitment(commitment) => self
360                                .finalized_blocks
361                                .get(ArchiveID::Key(&commitment))
362                                .await
363                                .ok()
364                                .flatten()
365                                .map(|b| (b.height(), commitment)),
366                            BlockID::Height(height) => self
367                                .finalizations_by_height
368                                .get(ArchiveID::Index(height.get()))
369                                .await
370                                .ok()
371                                .flatten()
372                                .map(|f| (height, f.proposal.payload)),
373                            BlockID::Latest => self.get_latest().await.map(|(h, c, _)| (h, c)),
374                        };
375                        response.send_lossy(info);
376                    }
377                    Message::Proposed { round, block } => {
378                        self.cache_verified(round, block.commitment(), block.clone())
379                            .await;
380                        let _peers = buffer.broadcast(Recipients::All, block).await;
381                    }
382                    Message::Verified { round, block } => {
383                        self.cache_verified(round, block.commitment(), block).await;
384                    }
385                    Message::Notarization { notarization } => {
386                        let round = notarization.round();
387                        let commitment = notarization.proposal.payload;
388
389                        // Store notarization by view
390                        self.cache
391                            .put_notarization(round, commitment, notarization.clone())
392                            .await;
393
394                        // Search for block locally, otherwise fetch it remotely
395                        if let Some(block) = self.find_block(&mut buffer, commitment).await {
396                            // If found, persist the block
397                            self.cache_block(round, commitment, block).await;
398                        } else {
399                            debug!(?round, "notarized block missing");
400                            resolver.fetch(Request::<B>::Notarized { round }).await;
401                        }
402                    }
403                    Message::Finalization { finalization } => {
404                        // Cache finalization by round
405                        let round = finalization.round();
406                        let commitment = finalization.proposal.payload;
407                        self.cache
408                            .put_finalization(round, commitment, finalization.clone())
409                            .await;
410
411                        // Search for block locally, otherwise fetch it remotely
412                        if let Some(block) = self.find_block(&mut buffer, commitment).await {
413                            // If found, persist the block
414                            let height = block.height();
415                            self.finalize(
416                                height,
417                                commitment,
418                                block,
419                                Some(finalization),
420                                &mut application,
421                                &mut buffer,
422                                &mut resolver,
423                            )
424                            .await;
425                            debug!(?round, %height, "finalized block stored");
426                        } else {
427                            // Otherwise, fetch the block from the network.
428                            debug!(?round, ?commitment, "finalized block missing");
429                            resolver.fetch(Request::<B>::Block(commitment)).await;
430                        }
431                    }
432                    Message::GetBlock {
433                        identifier,
434                        response,
435                    } => match identifier {
436                        BlockID::Commitment(commitment) => {
437                            let result = self.find_block(&mut buffer, commitment).await;
438                            response.send_lossy(result);
439                        }
440                        BlockID::Height(height) => {
441                            let result = self.get_finalized_block(height).await;
442                            response.send_lossy(result);
443                        }
444                        BlockID::Latest => {
445                            let block = match self.get_latest().await {
446                                Some((_, commitment, _)) => {
447                                    self.find_block(&mut buffer, commitment).await
448                                }
449                                None => None,
450                            };
451                            response.send_lossy(block);
452                        }
453                    },
454                    Message::GetFinalization { height, response } => {
455                        let finalization = self.get_finalization_by_height(height).await;
456                        response.send_lossy(finalization);
457                    }
458                    Message::HintFinalized { height, targets } => {
459                        // Skip if height is at or below the floor
460                        if height <= self.last_processed_height {
461                            continue;
462                        }
463
464                        // Skip if finalization is already available locally
465                        if self.get_finalization_by_height(height).await.is_some() {
466                            continue;
467                        }
468
469                        // Trigger a targeted fetch via the resolver
470                        let request = Request::<B>::Finalized { height };
471                        resolver.fetch_targeted(request, targets).await;
472                    }
473                    Message::Subscribe {
474                        round,
475                        commitment,
476                        response,
477                    } => {
478                        // Check for block locally
479                        if let Some(block) = self.find_block(&mut buffer, commitment).await {
480                            response.send_lossy(block);
481                            continue;
482                        }
483
484                        // We don't have the block locally, so fetch the block from the network
485                        // if we have an associated view. If we only have the digest, don't make
486                        // the request as we wouldn't know when to drop it, and the request may
487                        // never complete if the block is not finalized.
488                        if let Some(round) = round {
489                            if round < self.last_processed_round {
490                                // At this point, we have failed to find the block locally, and
491                                // we know that its round is less than the last processed round.
492                                // This means that something else was finalized in that round,
493                                // so we drop the response to indicate that the block may never
494                                // be available.
495                                continue;
496                            }
497                            // Attempt to fetch the block (with notarization) from the resolver.
498                            // If this is a valid view, this request should be fine to keep open
499                            // until resolution or pruning (even if the oneshot is canceled).
500                            debug!(?round, ?commitment, "requested block missing");
501                            resolver.fetch(Request::<B>::Notarized { round }).await;
502                        }
503
504                        // Register subscriber
505                        debug!(?round, ?commitment, "registering subscriber");
506                        match self.block_subscriptions.entry(commitment) {
507                            Entry::Occupied(mut entry) => {
508                                entry.get_mut().subscribers.push(response);
509                            }
510                            Entry::Vacant(entry) => {
511                                let (tx, rx) = oneshot::channel();
512                                buffer.subscribe_prepared(None, commitment, None, tx).await;
513                                let aborter = waiters.push(async move {
514                                    (commitment, rx.await.expect("buffer subscriber closed"))
515                                });
516                                entry.insert(BlockSubscription {
517                                    subscribers: vec![response],
518                                    _aborter: aborter,
519                                });
520                            }
521                        }
522                    }
523                    Message::SetFloor { height } => {
524                        if self.last_processed_height >= height {
525                            warn!(
526                                %height,
527                                existing = %self.last_processed_height,
528                                "floor not updated, lower than existing"
529                            );
530                            continue;
531                        }
532
533                        // Update the processed height
534                        if let Err(err) = self.set_processed_height(height, &mut resolver).await {
535                            error!(?err, %height, "failed to update floor");
536                            return;
537                        }
538
539                        // Drop the pending acknowledgement, if one exists. We must do this to prevent
540                        // an in-process block from being processed that is below the new floor
541                        // updating `last_processed_height`.
542                        self.pending_ack = None.into();
543
544                        // Prune the finalized block and finalization certificate archives in parallel.
545                        if let Err(err) = self.prune_finalized_archives(height).await {
546                            error!(?err, %height, "failed to prune finalized archives");
547                            return;
548                        }
549                    }
550                    Message::Prune { height } => {
551                        // Only allow pruning at or below the current floor
552                        if height > self.last_processed_height {
553                            warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring");
554                            continue;
555                        }
556
557                        // Prune the finalized block and finalization certificate archives in parallel.
558                        if let Err(err) = self.prune_finalized_archives(height).await {
559                            error!(?err, %height, "failed to prune finalized archives");
560                            return;
561                        }
562                    }
563                }
564            },
565            // Handle resolver messages last
566            Some(message) = resolver_rx.recv() else {
567                info!("handler closed, shutting down");
568                break;
569            } => {
570                match message {
571                    handler::Message::Produce { key, response } => {
572                        match key {
573                            Request::Block(commitment) => {
574                                // Check for block locally
575                                let Some(block) = self.find_block(&mut buffer, commitment).await
576                                else {
577                                    debug!(?commitment, "block missing on request");
578                                    continue;
579                                };
580                                response.send_lossy(block.encode());
581                            }
582                            Request::Finalized { height } => {
583                                // Get finalization
584                                let Some(finalization) =
585                                    self.get_finalization_by_height(height).await
586                                else {
587                                    debug!(%height, "finalization missing on request");
588                                    continue;
589                                };
590
591                                // Get block
592                                let Some(block) = self.get_finalized_block(height).await else {
593                                    debug!(%height, "finalized block missing on request");
594                                    continue;
595                                };
596
597                                // Send finalization
598                                response.send_lossy((finalization, block).encode());
599                            }
600                            Request::Notarized { round } => {
601                                // Get notarization
602                                let Some(notarization) = self.cache.get_notarization(round).await
603                                else {
604                                    debug!(?round, "notarization missing on request");
605                                    continue;
606                                };
607
608                                // Get block
609                                let commitment = notarization.proposal.payload;
610                                let Some(block) = self.find_block(&mut buffer, commitment).await
611                                else {
612                                    debug!(?commitment, "block missing on request");
613                                    continue;
614                                };
615                                response.send_lossy((notarization, block).encode());
616                            }
617                        }
618                    }
619                    handler::Message::Deliver {
620                        key,
621                        value,
622                        response,
623                    } => {
624                        match key {
625                            Request::Block(commitment) => {
626                                // Parse block
627                                let Ok(block) =
628                                    B::decode_cfg(value.as_ref(), &self.block_codec_config)
629                                else {
630                                    response.send_lossy(false);
631                                    continue;
632                                };
633
634                                // Validation
635                                if block.commitment() != commitment {
636                                    response.send_lossy(false);
637                                    continue;
638                                }
639
640                                // Persist the block, also persisting the finalization if we have it
641                                let height = block.height();
642                                let finalization =
643                                    self.cache.get_finalization_for(commitment).await;
644                                self.finalize(
645                                    height,
646                                    commitment,
647                                    block,
648                                    finalization,
649                                    &mut application,
650                                    &mut buffer,
651                                    &mut resolver,
652                                )
653                                .await;
654                                debug!(?commitment, %height, "received block");
655                                response.send_lossy(true);
656                            }
657                            Request::Finalized { height } => {
658                                let Some(bounds) = self.epocher.containing(height) else {
659                                    response.send_lossy(false);
660                                    continue;
661                                };
662                                let Some(scheme) =
663                                    self.get_scheme_certificate_verifier(bounds.epoch())
664                                else {
665                                    response.send_lossy(false);
666                                    continue;
667                                };
668
669                                // Parse finalization
670                                let Ok((finalization, block)) =
671                                    <(Finalization<P::Scheme, B::Commitment>, B)>::decode_cfg(
672                                        value,
673                                        &(
674                                            scheme.certificate_codec_config(),
675                                            self.block_codec_config.clone(),
676                                        ),
677                                    )
678                                else {
679                                    response.send_lossy(false);
680                                    continue;
681                                };
682
683                                // Validation
684                                if block.height() != height
685                                    || finalization.proposal.payload != block.commitment()
686                                    || !finalization.verify(
687                                        &mut self.context,
688                                        &scheme,
689                                        &self.strategy,
690                                    )
691                                {
692                                    response.send_lossy(false);
693                                    continue;
694                                }
695
696                                // Valid finalization received
697                                debug!(%height, "received finalization");
698                                response.send_lossy(true);
699                                self.finalize(
700                                    height,
701                                    block.commitment(),
702                                    block,
703                                    Some(finalization),
704                                    &mut application,
705                                    &mut buffer,
706                                    &mut resolver,
707                                )
708                                .await;
709                            }
710                            Request::Notarized { round } => {
711                                let Some(scheme) =
712                                    self.get_scheme_certificate_verifier(round.epoch())
713                                else {
714                                    response.send_lossy(false);
715                                    continue;
716                                };
717
718                                // Parse notarization
719                                let Ok((notarization, block)) =
720                                    <(Notarization<P::Scheme, B::Commitment>, B)>::decode_cfg(
721                                        value,
722                                        &(
723                                            scheme.certificate_codec_config(),
724                                            self.block_codec_config.clone(),
725                                        ),
726                                    )
727                                else {
728                                    response.send_lossy(false);
729                                    continue;
730                                };
731
732                                // Validation
733                                if notarization.round() != round
734                                    || notarization.proposal.payload != block.commitment()
735                                    || !notarization.verify(
736                                        &mut self.context,
737                                        &scheme,
738                                        &self.strategy,
739                                    )
740                                {
741                                    response.send_lossy(false);
742                                    continue;
743                                }
744
745                                // Valid notarization received
746                                response.send_lossy(true);
747                                let commitment = block.commitment();
748                                debug!(?round, ?commitment, "received notarization");
749
750                                // If there exists a finalization certificate for this block, we
751                                // should finalize it. While not necessary, this could finalize
752                                // the block faster in the case where a notarization then a
753                                // finalization is received via the consensus engine and we
754                                // resolve the request for the notarization before we resolve
755                                // the request for the block.
756                                let height = block.height();
757                                if let Some(finalization) =
758                                    self.cache.get_finalization_for(commitment).await
759                                {
760                                    self.finalize(
761                                        height,
762                                        commitment,
763                                        block.clone(),
764                                        Some(finalization),
765                                        &mut application,
766                                        &mut buffer,
767                                        &mut resolver,
768                                    )
769                                    .await;
770                                }
771
772                                // Cache the notarization and block
773                                self.cache_block(round, commitment, block).await;
774                                self.cache
775                                    .put_notarization(round, commitment, notarization)
776                                    .await;
777                            }
778                        }
779                    }
780                }
781            },
782        }
783    }
784
785    /// Returns a scheme suitable for verifying certificates at the given epoch.
786    ///
787    /// Prefers a certificate verifier if available, otherwise falls back
788    /// to the scheme for the given epoch.
789    fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
790        self.provider.all().or_else(|| self.provider.scoped(epoch))
791    }
792
793    // -------------------- Waiters --------------------
794
795    /// Notify any subscribers for the given commitment with the provided block.
796    async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
797        if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
798            for subscriber in bs.subscribers.drain(..) {
799                subscriber.send_lossy(block.clone());
800            }
801        }
802    }
803
804    // -------------------- Application Dispatch --------------------
805
806    /// Attempt to dispatch the next finalized block to the application if ready.
807    async fn try_dispatch_block(
808        &mut self,
809        application: &mut impl Reporter<Activity = Update<B, A>>,
810    ) {
811        if self.pending_ack.is_some() {
812            return;
813        }
814
815        let next_height = self.last_processed_height.next();
816        let Some(block) = self.get_finalized_block(next_height).await else {
817            return;
818        };
819        assert_eq!(
820            block.height(),
821            next_height,
822            "finalized block height mismatch"
823        );
824
825        let (height, commitment) = (block.height(), block.commitment());
826        let (ack, ack_waiter) = A::handle();
827        application.report(Update::Block(block, ack)).await;
828        self.pending_ack.replace(PendingAck {
829            height,
830            commitment,
831            receiver: ack_waiter,
832        });
833    }
834
835    /// Handle acknowledgement from the application that a block has been processed.
836    async fn handle_block_processed(
837        &mut self,
838        height: Height,
839        commitment: B::Commitment,
840        resolver: &mut impl Resolver<Key = Request<B>>,
841    ) -> Result<(), metadata::Error> {
842        // Update the processed height
843        self.set_processed_height(height, resolver).await?;
844
845        // Cancel any useless requests
846        resolver.cancel(Request::<B>::Block(commitment)).await;
847
848        if let Some(finalization) = self.get_finalization_by_height(height).await {
849            // Trail the previous processed finalized block by the timeout
850            let lpr = self.last_processed_round;
851            let prune_round = Round::new(
852                lpr.epoch(),
853                lpr.view().saturating_sub(self.view_retention_timeout),
854            );
855
856            // Prune archives
857            self.cache.prune(prune_round).await;
858
859            // Update the last processed round
860            let round = finalization.round();
861            self.last_processed_round = round;
862
863            // Cancel useless requests
864            resolver
865                .retain(Request::<B>::Notarized { round }.predicate())
866                .await;
867        }
868
869        Ok(())
870    }
871
872    // -------------------- Prunable Storage --------------------
873
874    /// Add a verified block to the prunable archive.
875    async fn cache_verified(&mut self, round: Round, commitment: B::Commitment, block: B) {
876        self.notify_subscribers(commitment, &block).await;
877        self.cache.put_verified(round, commitment, block).await;
878    }
879
880    /// Add a notarized block to the prunable archive.
881    async fn cache_block(&mut self, round: Round, commitment: B::Commitment, block: B) {
882        self.notify_subscribers(commitment, &block).await;
883        self.cache.put_block(round, commitment, block).await;
884    }
885
886    // -------------------- Immutable Storage --------------------
887
888    /// Get a finalized block from the immutable archive.
889    async fn get_finalized_block(&self, height: Height) -> Option<B> {
890        match self
891            .finalized_blocks
892            .get(ArchiveID::Index(height.get()))
893            .await
894        {
895            Ok(block) => block,
896            Err(e) => panic!("failed to get block: {e}"),
897        }
898    }
899
900    /// Get a finalization from the archive by height.
901    async fn get_finalization_by_height(
902        &self,
903        height: Height,
904    ) -> Option<Finalization<P::Scheme, B::Commitment>> {
905        match self
906            .finalizations_by_height
907            .get(ArchiveID::Index(height.get()))
908            .await
909        {
910            Ok(finalization) => finalization,
911            Err(e) => panic!("failed to get finalization: {e}"),
912        }
913    }
914
915    /// Add a finalized block, and optionally a finalization, to the archive, and
916    /// attempt to identify + repair any gaps in the archive.
917    #[allow(clippy::too_many_arguments)]
918    async fn finalize(
919        &mut self,
920        height: Height,
921        commitment: B::Commitment,
922        block: B,
923        finalization: Option<Finalization<P::Scheme, B::Commitment>>,
924        application: &mut impl Reporter<Activity = Update<B, A>>,
925        buffer: &mut buffered::Mailbox<impl PublicKey, B>,
926        resolver: &mut impl Resolver<Key = Request<B>>,
927    ) {
928        self.store_finalization(height, commitment, block, finalization, application)
929            .await;
930
931        self.try_repair_gaps(buffer, resolver, application).await;
932    }
933
934    /// Add a finalized block, and optionally a finalization, to the archive.
935    ///
936    /// After persisting the block, attempt to dispatch the next contiguous block to the
937    /// application.
938    async fn store_finalization(
939        &mut self,
940        height: Height,
941        commitment: B::Commitment,
942        block: B,
943        finalization: Option<Finalization<P::Scheme, B::Commitment>>,
944        application: &mut impl Reporter<Activity = Update<B, A>>,
945    ) {
946        self.notify_subscribers(commitment, &block).await;
947
948        // Extract round before finalization is moved into try_join
949        let round = finalization.as_ref().map(|f| f.round());
950
951        // In parallel, update the finalized blocks and finalizations archives
952        if let Err(e) = try_join!(
953            // Update the finalized blocks archive
954            async {
955                self.finalized_blocks.put(block).await.map_err(Box::new)?;
956                Ok::<_, BoxedError>(())
957            },
958            // Update the finalizations archive (if provided)
959            async {
960                if let Some(finalization) = finalization {
961                    self.finalizations_by_height
962                        .put(height, commitment, finalization)
963                        .await
964                        .map_err(Box::new)?;
965                }
966                Ok::<_, BoxedError>(())
967            }
968        ) {
969            panic!("failed to finalize: {e}");
970        }
971
972        // Update metrics and send tip update to application
973        if let Some(round) = round.filter(|_| height > self.tip) {
974            application
975                .report(Update::Tip(round, height, commitment))
976                .await;
977            self.tip = height;
978            let _ = self.finalized_height.try_set(height.get());
979        }
980
981        self.try_dispatch_block(application).await;
982    }
983
984    /// Get the latest finalized block information (height and commitment tuple).
985    ///
986    /// Blocks are only finalized directly with a finalization or indirectly via a descendant
987    /// block's finalization. Thus, the highest known finalized block must itself have a direct
988    /// finalization.
989    ///
990    /// We return the height and commitment using the highest known finalization that we know the
991    /// block height for. While it's possible that we have a later finalization, if we do not have
992    /// the full block for that finalization, we do not know it's height and therefore it would not
993    /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
994    /// should have the associated block (in the `finalized_blocks` archive) for the information
995    /// returned.
996    async fn get_latest(&mut self) -> Option<(Height, B::Commitment, Round)> {
997        let height = self.finalizations_by_height.last_index()?;
998        let finalization = self
999            .get_finalization_by_height(height)
1000            .await
1001            .expect("finalization missing");
1002        Some((height, finalization.proposal.payload, finalization.round()))
1003    }
1004
1005    // -------------------- Mixed Storage --------------------
1006
1007    /// Looks for a block anywhere in local storage.
1008    async fn find_block<K: PublicKey>(
1009        &mut self,
1010        buffer: &mut buffered::Mailbox<K, B>,
1011        commitment: B::Commitment,
1012    ) -> Option<B> {
1013        // Check buffer.
1014        if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() {
1015            return Some(block);
1016        }
1017        // Check verified / notarized blocks via cache manager.
1018        if let Some(block) = self.cache.find_block(commitment).await {
1019            return Some(block);
1020        }
1021        // Check finalized blocks.
1022        match self.finalized_blocks.get(ArchiveID::Key(&commitment)).await {
1023            Ok(block) => block, // may be None
1024            Err(e) => panic!("failed to get block: {e}"),
1025        }
1026    }
1027
1028    /// Attempt to repair any identified gaps in the finalized blocks archive. The total
1029    /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
1030    /// though multiple gaps may be spanned.
1031    async fn try_repair_gaps<K: PublicKey>(
1032        &mut self,
1033        buffer: &mut buffered::Mailbox<K, B>,
1034        resolver: &mut impl Resolver<Key = Request<B>>,
1035        application: &mut impl Reporter<Activity = Update<B, A>>,
1036    ) {
1037        let start = self.last_processed_height.next();
1038        'cache_repair: loop {
1039            let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1040                // No gaps detected
1041                return;
1042            };
1043
1044            // Attempt to repair the gap backwards from the end of the gap, using
1045            // blocks from our local storage.
1046            let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1047                panic!("gapped block missing that should exist: {gap_end}");
1048            };
1049
1050            // Compute the lower bound of the recursive repair. `gap_start` is `Some`
1051            // if `start` is not in a gap. We add one to it to ensure we don't
1052            // re-persist it to the database in the repair loop below.
1053            let gap_start = gap_start.map(|s| s.next()).unwrap_or(start);
1054
1055            // Iterate backwards, repairing blocks as we go.
1056            while cursor.height() > gap_start {
1057                let commitment = cursor.parent();
1058                if let Some(block) = self.find_block(buffer, commitment).await {
1059                    let finalization = self.cache.get_finalization_for(commitment).await;
1060                    self.store_finalization(
1061                        block.height(),
1062                        commitment,
1063                        block.clone(),
1064                        finalization,
1065                        application,
1066                    )
1067                    .await;
1068                    debug!(height = %block.height(), "repaired block");
1069                    cursor = block;
1070                } else {
1071                    // Request the next missing block digest
1072                    resolver.fetch(Request::<B>::Block(commitment)).await;
1073                    break 'cache_repair;
1074                }
1075            }
1076        }
1077
1078        // Request any finalizations for missing items in the archive, up to
1079        // the `max_repair` quota. This may help shrink the size of the gap
1080        // closest to the application's processed height if finalizations
1081        // for the requests' heights exist. If not, we rely on the recursive
1082        // digest fetches above.
1083        let missing_items = self
1084            .finalized_blocks
1085            .missing_items(start, self.max_repair.get());
1086        let requests = missing_items
1087            .into_iter()
1088            .map(|height| Request::<B>::Finalized { height })
1089            .collect::<Vec<_>>();
1090        if !requests.is_empty() {
1091            resolver.fetch_all(requests).await
1092        }
1093    }
1094
1095    /// Sets the processed height in storage, metrics, and in-memory state. Also cancels any
1096    /// outstanding requests below the new processed height.
1097    async fn set_processed_height(
1098        &mut self,
1099        height: Height,
1100        resolver: &mut impl Resolver<Key = Request<B>>,
1101    ) -> Result<(), metadata::Error> {
1102        self.application_metadata
1103            .put_sync(LATEST_KEY.clone(), height)
1104            .await?;
1105        self.last_processed_height = height;
1106        let _ = self
1107            .processed_height
1108            .try_set(self.last_processed_height.get());
1109
1110        // Cancel any existing requests below the new floor.
1111        resolver
1112            .retain(Request::<B>::Finalized { height }.predicate())
1113            .await;
1114
1115        Ok(())
1116    }
1117
1118    /// Prunes finalized blocks and certificates below the given height.
1119    async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
1120        try_join!(
1121            async {
1122                self.finalized_blocks
1123                    .prune(height)
1124                    .await
1125                    .map_err(Box::new)?;
1126                Ok::<_, BoxedError>(())
1127            },
1128            async {
1129                self.finalizations_by_height
1130                    .prune(height)
1131                    .await
1132                    .map_err(Box::new)?;
1133                Ok::<_, BoxedError>(())
1134            }
1135        )?;
1136        Ok(())
1137    }
1138}