Skip to main content

commonware_consensus/marshal/core/
actor.rs

1use super::{
2    cache,
3    mailbox::{Mailbox, Message},
4    Buffer, IntoBlock, Variant,
5};
6use crate::{
7    marshal::{
8        resolver::handler::{self, Request},
9        store::{Blocks, Certificates},
10        Config, Identifier as BlockID, Update,
11    },
12    simplex::{
13        scheme::Scheme,
14        types::{verify_certificates, Finalization, Notarization, Subject},
15    },
16    types::{Epoch, Epocher, Height, Round, ViewDelta},
17    Block, Epochable, Heightable, Reporter,
18};
19use bytes::Bytes;
20use commonware_codec::{Decode, Encode, Read};
21use commonware_cryptography::{
22    certificate::{Provider, Scheme as CertificateScheme},
23    Digestible,
24};
25use commonware_macros::select_loop;
26use commonware_p2p::Recipients;
27use commonware_parallel::Strategy;
28use commonware_resolver::Resolver;
29use commonware_runtime::{
30    spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle,
31    Metrics, Spawner, Storage,
32};
33use commonware_storage::{
34    archive::Identifier as ArchiveID,
35    metadata::{self, Metadata},
36};
37use commonware_utils::{
38    acknowledgement::Exact,
39    channel::{fallible::OneshotExt, mpsc, oneshot},
40    futures::{AbortablePool, Aborter, OptionFuture},
41    sequence::U64,
42    Acknowledgement, BoxedError,
43};
44use futures::{future::join_all, try_join, FutureExt};
45use pin_project::pin_project;
46use prometheus_client::metrics::gauge::Gauge;
47use rand_core::CryptoRngCore;
48use std::{
49    collections::{btree_map::Entry, BTreeMap, VecDeque},
50    future::Future,
51    num::NonZeroUsize,
52    pin::Pin,
53    sync::Arc,
54};
55use tracing::{debug, error, info, warn};
56
57/// The key used to store the last processed height in the metadata store.
58const LATEST_KEY: U64 = U64::new(0xFF);
59
60/// A parsed-but-unverified resolver delivery awaiting batch certificate verification.
61enum PendingVerification<S: CertificateScheme, V: Variant> {
62    Notarized {
63        notarization: Notarization<S, V::Commitment>,
64        block: V::Block,
65        response: oneshot::Sender<bool>,
66    },
67    Finalized {
68        finalization: Finalization<S, V::Commitment>,
69        block: V::Block,
70        response: oneshot::Sender<bool>,
71    },
72}
73
74/// A pending acknowledgement from the application for a block at the contained height/commitment.
75#[pin_project]
76struct PendingAck<V: Variant, A: Acknowledgement> {
77    height: Height,
78    commitment: V::Commitment,
79    #[pin]
80    receiver: A::Waiter,
81}
82
83impl<V: Variant, A: Acknowledgement> Future for PendingAck<V, A> {
84    type Output = <A::Waiter as Future>::Output;
85
86    fn poll(
87        self: std::pin::Pin<&mut Self>,
88        cx: &mut std::task::Context<'_>,
89    ) -> std::task::Poll<Self::Output> {
90        self.project().receiver.poll(cx)
91    }
92}
93
94/// Tracks in-flight application acknowledgements with FIFO semantics.
95struct PendingAcks<V: Variant, A: Acknowledgement> {
96    current: OptionFuture<PendingAck<V, A>>,
97    queue: VecDeque<PendingAck<V, A>>,
98    max: usize,
99}
100
101impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
102    /// Creates a new pending-ack tracker with a maximum in-flight capacity.
103    fn new(max: usize) -> Self {
104        Self {
105            current: None.into(),
106            queue: VecDeque::with_capacity(max),
107            max,
108        }
109    }
110
111    /// Drops the current ack and all queued acks.
112    fn clear(&mut self) {
113        self.current = None.into();
114        self.queue.clear();
115    }
116
117    /// Returns the currently armed ack future (if any) for `select_loop!`.
118    const fn current(&mut self) -> &mut OptionFuture<PendingAck<V, A>> {
119        &mut self.current
120    }
121
122    /// Returns whether we can dispatch another block without exceeding capacity.
123    fn has_capacity(&self) -> bool {
124        let reserved = usize::from(self.current.is_some());
125        self.queue.len() < self.max - reserved
126    }
127
128    /// Returns the next height to dispatch while preserving sequential order.
129    fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
130        self.queue
131            .back()
132            .map(|ack| ack.height.next())
133            .or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
134            .unwrap_or_else(|| last_processed_height.next())
135    }
136
137    /// Enqueues a newly dispatched ack, arming it immediately when idle.
138    fn enqueue(&mut self, ack: PendingAck<V, A>) {
139        if self.current.is_none() {
140            self.current.replace(ack);
141            return;
142        }
143        self.queue.push_back(ack);
144    }
145
146    /// Returns metadata for a completed current ack and arms the next queued ack.
147    fn complete_current(
148        &mut self,
149        result: <A::Waiter as Future>::Output,
150    ) -> (Height, V::Commitment, <A::Waiter as Future>::Output) {
151        let PendingAck {
152            height, commitment, ..
153        } = self.current.take().expect("ack state must be present");
154        if let Some(next) = self.queue.pop_front() {
155            self.current.replace(next);
156        }
157        (height, commitment, result)
158    }
159
160    /// If the current ack is already resolved, takes it and arms the next ack.
161    fn pop_ready(&mut self) -> Option<(Height, V::Commitment, <A::Waiter as Future>::Output)> {
162        let pending = self.current.as_mut()?;
163        let result = Pin::new(&mut pending.receiver).now_or_never()?;
164        Some(self.complete_current(result))
165    }
166}
167
168/// A struct that holds multiple subscriptions for a block.
169struct BlockSubscription<V: Variant> {
170    // The subscribers that are waiting for the block
171    subscribers: Vec<oneshot::Sender<V::Block>>,
172    // Aborter that aborts the waiter future when dropped
173    _aborter: Aborter,
174}
175
176/// The key used to track block subscriptions.
177///
178/// Digest-scoped and commitment-scoped subscriptions are intentionally distinct
179/// so a block that aliases on digest cannot satisfy a different commitment wait.
180#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
181enum BlockSubscriptionKey<C, D> {
182    Digest(D),
183    Commitment(C),
184}
185
186type BlockSubscriptionKeyFor<V> =
187    BlockSubscriptionKey<<V as Variant>::Commitment, <<V as Variant>::Block as Digestible>::Digest>;
188
189/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
190/// receiving notarizations and finalizations from consensus, and reconstructing a total order
191/// of blocks.
192///
193/// The actor is designed to be used in a view-based model. Each view corresponds to a
194/// potential block in the chain. The actor will only finalize a block if it has a
195/// corresponding finalization.
196///
197/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
198/// finalization for a block that is ahead of its current view, it will request the missing blocks
199/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
200/// behind.
201pub struct Actor<E, V, P, FC, FB, ES, T, A = Exact>
202where
203    E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
204    V: Variant,
205    P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
206    FC: Certificates<
207        BlockDigest = <V::Block as Digestible>::Digest,
208        Commitment = V::Commitment,
209        Scheme = P::Scheme,
210    >,
211    FB: Blocks<Block = V::StoredBlock>,
212    ES: Epocher,
213    T: Strategy,
214    A: Acknowledgement,
215{
216    // ---------- Context ----------
217    context: ContextCell<E>,
218
219    // ---------- Message Passing ----------
220    // Mailbox
221    mailbox: mpsc::Receiver<Message<P::Scheme, V>>,
222
223    // ---------- Configuration ----------
224    // Provider for epoch-specific signing schemes
225    provider: P,
226    // Epoch configuration
227    epocher: ES,
228    // Minimum number of views to retain temporary data after the application processes a block
229    view_retention_timeout: ViewDelta,
230    // Maximum number of blocks to repair at once
231    max_repair: NonZeroUsize,
232    // Codec configuration for block type
233    block_codec_config: <V::Block as Read>::Cfg,
234    // Strategy for parallel operations
235    strategy: T,
236
237    // ---------- State ----------
238    // Last view processed
239    last_processed_round: Round,
240    // Last height processed by the application
241    last_processed_height: Height,
242    // Pending application acknowledgements
243    pending_acks: PendingAcks<V, A>,
244    // Highest known finalized height
245    tip: Height,
246    // Outstanding subscriptions for blocks
247    block_subscriptions: BTreeMap<BlockSubscriptionKeyFor<V>, BlockSubscription<V>>,
248
249    // ---------- Storage ----------
250    // Prunable cache
251    cache: cache::Manager<E, V, P::Scheme>,
252    // Metadata tracking application progress
253    application_metadata: Metadata<E, U64, Height>,
254    // Finalizations stored by height
255    finalizations_by_height: FC,
256    // Finalized blocks stored by height
257    finalized_blocks: FB,
258
259    // ---------- Metrics ----------
260    // Latest height metric
261    finalized_height: Gauge,
262    // Latest processed height
263    processed_height: Gauge,
264}
265
266impl<E, V, P, FC, FB, ES, T, A> Actor<E, V, P, FC, FB, ES, T, A>
267where
268    E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
269    V: Variant,
270    P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
271    FC: Certificates<
272        BlockDigest = <V::Block as Digestible>::Digest,
273        Commitment = V::Commitment,
274        Scheme = P::Scheme,
275    >,
276    FB: Blocks<Block = V::StoredBlock>,
277    ES: Epocher,
278    T: Strategy,
279    A: Acknowledgement,
280{
281    /// Create a new application actor.
282    pub async fn init(
283        context: E,
284        finalizations_by_height: FC,
285        finalized_blocks: FB,
286        config: Config<V::Block, P, ES, T>,
287    ) -> (Self, Mailbox<P::Scheme, V>, Height) {
288        // Initialize cache
289        let prunable_config = cache::Config {
290            partition_prefix: format!("{}-cache", config.partition_prefix),
291            prunable_items_per_section: config.prunable_items_per_section,
292            replay_buffer: config.replay_buffer,
293            key_write_buffer: config.key_write_buffer,
294            value_write_buffer: config.value_write_buffer,
295            key_page_cache: config.page_cache.clone(),
296        };
297        let cache = cache::Manager::init(
298            context.with_label("cache"),
299            prunable_config,
300            config.block_codec_config.clone(),
301        )
302        .await;
303
304        // Initialize metadata tracking application progress
305        let application_metadata = Metadata::init(
306            context.with_label("application_metadata"),
307            metadata::Config {
308                partition: format!("{}-application-metadata", config.partition_prefix),
309                codec_config: (),
310            },
311        )
312        .await
313        .expect("failed to initialize application metadata");
314        let last_processed_height = application_metadata
315            .get(&LATEST_KEY)
316            .copied()
317            .unwrap_or(Height::zero());
318
319        // Create metrics
320        let finalized_height = Gauge::default();
321        context.register(
322            "finalized_height",
323            "Finalized height of application",
324            finalized_height.clone(),
325        );
326        let processed_height = Gauge::default();
327        context.register(
328            "processed_height",
329            "Processed height of application",
330            processed_height.clone(),
331        );
332        let _ = processed_height.try_set(last_processed_height.get());
333
334        // Initialize mailbox
335        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
336        (
337            Self {
338                context: ContextCell::new(context),
339                mailbox,
340                provider: config.provider,
341                epocher: config.epocher,
342                view_retention_timeout: config.view_retention_timeout,
343                max_repair: config.max_repair,
344                block_codec_config: config.block_codec_config,
345                strategy: config.strategy,
346                last_processed_round: Round::zero(),
347                last_processed_height,
348                pending_acks: PendingAcks::new(config.max_pending_acks.get()),
349                tip: Height::zero(),
350                block_subscriptions: BTreeMap::new(),
351                cache,
352                application_metadata,
353                finalizations_by_height,
354                finalized_blocks,
355                finalized_height,
356                processed_height,
357            },
358            Mailbox::new(sender),
359            last_processed_height,
360        )
361    }
362
363    /// Start the actor.
364    pub fn start<R, Buf>(
365        mut self,
366        application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
367        buffer: Buf,
368        resolver: (mpsc::Receiver<handler::Message<V::Commitment>>, R),
369    ) -> Handle<()>
370    where
371        R: Resolver<
372            Key = handler::Request<V::Commitment>,
373            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
374        >,
375        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
376    {
377        spawn_cell!(self.context, self.run(application, buffer, resolver).await)
378    }
379
380    /// Run the application actor.
381    async fn run<R, Buf>(
382        mut self,
383        mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
384        mut buffer: Buf,
385        (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<V::Commitment>>, R),
386    ) where
387        R: Resolver<
388            Key = handler::Request<V::Commitment>,
389            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
390        >,
391        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
392    {
393        // Create a local pool for waiter futures.
394        let mut waiters = AbortablePool::<Result<V::Block, BlockSubscriptionKeyFor<V>>>::default();
395
396        // Get tip and send to application
397        let tip = self.get_latest().await;
398        if let Some((height, digest, round)) = tip {
399            application.report(Update::Tip(round, height, digest)).await;
400            self.tip = height;
401            let _ = self.finalized_height.try_set(height.get());
402        }
403
404        // Load persisted cache epochs so find_block can discover blocks
405        // written before the last shutdown.
406        self.cache.load_persisted_epochs().await;
407
408        // Attempt to repair any gaps in the finalized blocks archive, if there are any.
409        if self
410            .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
411            .await
412        {
413            self.sync_finalized().await;
414        }
415
416        // Attempt to dispatch the next finalized block to the application, if it is ready.
417        self.try_dispatch_blocks(&mut application).await;
418
419        select_loop! {
420            self.context,
421            on_start => {
422                // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
423                self.block_subscriptions.retain(|_, bs| {
424                    bs.subscribers.retain(|tx| !tx.is_closed());
425                    !bs.subscribers.is_empty()
426                });
427            },
428            on_stopped => {
429                debug!("context shutdown, stopping marshal");
430            },
431            // Handle waiter completions first
432            Ok(completion) = waiters.next_completed() else continue => match completion {
433                Ok(block) => self.notify_subscribers(&block),
434                Err(key) => {
435                    match key {
436                        BlockSubscriptionKey::Digest(digest) => {
437                            debug!(
438                                ?digest,
439                                "buffer subscription closed, canceling local subscribers"
440                            );
441                        }
442                        BlockSubscriptionKey::Commitment(commitment) => {
443                            debug!(
444                                ?commitment,
445                                "buffer subscription closed, canceling local subscribers"
446                            );
447                        }
448                    }
449                    self.block_subscriptions.remove(&key);
450                }
451            },
452            // Handle application acknowledgements (drain all ready acks, sync once)
453            result = self.pending_acks.current() => {
454                // Start with the ack that woke this `select_loop!` arm.
455                let mut pending = Some(self.pending_acks.complete_current(result));
456                loop {
457                    let (height, commitment, result) =
458                        pending.take().expect("pending ack must exist");
459                    match result {
460                        Ok(()) => {
461                            // Apply in-memory progress updates for this acknowledged block.
462                            self.handle_block_processed(height, commitment, &mut resolver)
463                                .await;
464                        }
465                        Err(e) => {
466                            // Ack failures are fatal for marshal/application coordination.
467                            error!(e = ?e, height = %height, "application did not acknowledge block");
468                            return;
469                        }
470                    }
471
472                    // Opportunistically drain any additional already-ready acks so we
473                    // can persist one metadata sync for the whole batch below.
474                    let Some(next) = self.pending_acks.pop_ready() else {
475                        break;
476                    };
477                    pending = Some(next);
478                }
479
480                // Persist buffered processed-height updates once after draining all ready acks.
481                if let Err(e) = self.application_metadata.sync().await {
482                    error!(?e, "failed to sync application progress");
483                    return;
484                }
485
486                // Fill the pipeline
487                self.try_dispatch_blocks(&mut application).await;
488            },
489            // Handle consensus inputs before backfill or resolver traffic
490            Some(message) = self.mailbox.recv() else {
491                info!("mailbox closed, shutting down");
492                break;
493            } => {
494                match message {
495                    Message::GetInfo {
496                        identifier,
497                        response,
498                    } => {
499                        let info = match identifier {
500                            // TODO: Instead of pulling out the entire block, determine the
501                            // height directly from the archive by mapping the digest to
502                            // the index, which is the same as the height.
503                            BlockID::Digest(digest) => self
504                                .finalized_blocks
505                                .get(ArchiveID::Key(&digest))
506                                .await
507                                .ok()
508                                .flatten()
509                                .map(|b| (b.height(), digest)),
510                            BlockID::Height(height) => self
511                                .finalizations_by_height
512                                .get(ArchiveID::Index(height.get()))
513                                .await
514                                .ok()
515                                .flatten()
516                                .map(|f| (height, V::commitment_to_inner(f.proposal.payload))),
517                            BlockID::Latest => self.get_latest().await.map(|(h, d, _)| (h, d)),
518                        };
519                        response.send_lossy(info);
520                    }
521                    Message::Proposed { round, block } => {
522                        self.cache_verified(round, block.digest(), block.clone())
523                            .await;
524                        buffer.send(round, block, Recipients::All).await;
525                    }
526                    Message::Forward {
527                        round,
528                        commitment,
529                        peers,
530                    } => {
531                        if peers.is_empty() {
532                            continue;
533                        }
534                        let Some(block) = self.find_block_by_commitment(&buffer, commitment).await
535                        else {
536                            debug!(?commitment, "block not found for forwarding");
537                            continue;
538                        };
539                        buffer.send(round, block, Recipients::Some(peers)).await;
540                    }
541                    Message::Verified { round, block } => {
542                        self.cache_verified(round, block.digest(), block).await;
543                    }
544                    Message::Notarization { notarization } => {
545                        let round = notarization.round();
546                        let commitment = notarization.proposal.payload;
547                        let digest = V::commitment_to_inner(commitment);
548
549                        // Store notarization by view
550                        self.cache
551                            .put_notarization(round, digest, notarization.clone())
552                            .await;
553
554                        // Search for block locally, otherwise fetch it remotely.
555                        if let Some(block) =
556                            self.find_block_by_commitment(&buffer, commitment).await
557                        {
558                            // If found, persist the block
559                            self.cache_block(round, digest, block).await;
560                        } else {
561                            debug!(?round, "notarized block missing");
562                            resolver
563                                .fetch(Request::<V::Commitment>::Notarized { round })
564                                .await;
565                        }
566                    }
567                    Message::Finalization { finalization } => {
568                        // Cache finalization by round
569                        let round = finalization.round();
570                        let commitment = finalization.proposal.payload;
571                        let digest = V::commitment_to_inner(commitment);
572                        self.cache
573                            .put_finalization(round, digest, finalization.clone())
574                            .await;
575
576                        // Search for block locally, otherwise fetch it remotely.
577                        if let Some(block) =
578                            self.find_block_by_commitment(&buffer, commitment).await
579                        {
580                            // If found, persist the block
581                            let height = block.height();
582                            if self
583                                .store_finalization(
584                                    height,
585                                    digest,
586                                    block,
587                                    Some(finalization),
588                                    &mut application,
589                                    &mut buffer,
590                                )
591                                .await
592                            {
593                                self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
594                                    .await;
595                                self.sync_finalized().await;
596                                debug!(?round, %height, "finalized block stored");
597                            }
598                        } else {
599                            // Otherwise, fetch the block from the network.
600                            debug!(?round, ?commitment, "finalized block missing");
601                            resolver
602                                .fetch(Request::<V::Commitment>::Block(commitment))
603                                .await;
604                        }
605                    }
606                    Message::GetBlock {
607                        identifier,
608                        response,
609                    } => match identifier {
610                        BlockID::Digest(digest) => {
611                            let result = self.find_block_by_digest(&mut buffer, digest).await;
612                            response.send_lossy(result);
613                        }
614                        BlockID::Height(height) => {
615                            let result = self.get_finalized_block(height).await;
616                            response.send_lossy(result);
617                        }
618                        BlockID::Latest => {
619                            let block = match self.get_latest().await {
620                                Some((_, digest, _)) => {
621                                    self.find_block_by_digest(&mut buffer, digest).await
622                                }
623                                None => None,
624                            };
625                            response.send_lossy(block);
626                        }
627                    },
628                    Message::GetFinalization { height, response } => {
629                        let finalization = self.get_finalization_by_height(height).await;
630                        response.send_lossy(finalization);
631                    }
632                    Message::HintFinalized { height, targets } => {
633                        // Skip if height is at or below the floor
634                        if height <= self.last_processed_height {
635                            continue;
636                        }
637
638                        // Skip if finalization is already available locally
639                        if self.get_finalization_by_height(height).await.is_some() {
640                            continue;
641                        }
642
643                        // Trigger a targeted fetch via the resolver
644                        let request = Request::<V::Commitment>::Finalized { height };
645                        resolver.fetch_targeted(request, targets).await;
646                    }
647                    Message::SubscribeByDigest {
648                        round,
649                        digest,
650                        response,
651                    } => {
652                        self.handle_subscribe(
653                            round,
654                            BlockSubscriptionKey::Digest(digest),
655                            response,
656                            &mut resolver,
657                            &mut waiters,
658                            &mut buffer,
659                        )
660                        .await;
661                    }
662                    Message::SubscribeByCommitment {
663                        round,
664                        commitment,
665                        response,
666                    } => {
667                        self.handle_subscribe(
668                            round,
669                            BlockSubscriptionKey::Commitment(commitment),
670                            response,
671                            &mut resolver,
672                            &mut waiters,
673                            &mut buffer,
674                        )
675                        .await;
676                    }
677                    Message::SetFloor { height } => {
678                        if self.last_processed_height >= height {
679                            warn!(
680                                %height,
681                                existing = %self.last_processed_height,
682                                "floor not updated, lower than existing"
683                            );
684                            continue;
685                        }
686
687                        // Update the processed height
688                        self.update_processed_height(height, &mut resolver).await;
689                        if let Err(err) = self.application_metadata.sync().await {
690                            error!(?err, %height, "failed to update floor");
691                            return;
692                        }
693
694                        // Drop all pending acknowledgements. We must do this to prevent
695                        // an in-process block from being processed that is below the new floor
696                        // updating `last_processed_height`.
697                        self.pending_acks.clear();
698
699                        // Prune data in the finalized archives below the new floor.
700                        if let Err(err) = self.prune_finalized_archives(height).await {
701                            error!(?err, %height, "failed to prune finalized archives");
702                            return;
703                        }
704
705                        // Intentionally keep existing block subscriptions alive. Canceling
706                        // waiters can have catastrophic consequences (nodes can get stuck in
707                        // different views) as actors do not retry subscriptions on failed channels.
708                    }
709                    Message::Prune { height } => {
710                        // Only allow pruning at or below the current floor
711                        if height > self.last_processed_height {
712                            warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring");
713                            continue;
714                        }
715
716                        // Prune the finalized block and finalization certificate archives in parallel.
717                        if let Err(err) = self.prune_finalized_archives(height).await {
718                            error!(?err, %height, "failed to prune finalized archives");
719                            return;
720                        }
721
722                        // Intentionally keep existing block subscriptions alive. Canceling
723                        // waiters can have catastrophic consequences (nodes can get stuck in
724                        // different views) as actors do not retry subscriptions on failed channels.
725                    }
726                }
727            },
728            // Handle resolver messages last (batched up to max_repair, sync once)
729            Some(message) = resolver_rx.recv() else {
730                info!("handler closed, shutting down");
731                return;
732            } => {
733                // Drain up to max_repair messages: blocks handled immediately,
734                // certificates batched for verification, produces deferred.
735                let mut needs_sync = false;
736                let mut produces = Vec::new();
737                let mut delivers = Vec::new();
738                for msg in std::iter::once(message)
739                    .chain(std::iter::from_fn(|| resolver_rx.try_recv().ok()))
740                    .take(self.max_repair.get())
741                {
742                    match msg {
743                        handler::Message::Produce { key, response } => {
744                            produces.push((key, response));
745                        }
746                        handler::Message::Deliver {
747                            key,
748                            value,
749                            response,
750                        } => {
751                            needs_sync |= self
752                                .handle_deliver(
753                                    key,
754                                    value,
755                                    response,
756                                    &mut delivers,
757                                    &mut application,
758                                    &mut buffer,
759                                )
760                                .await;
761                        }
762                    }
763                }
764
765                // Batch verify and process all delivers.
766                needs_sync |= self
767                    .verify_delivered(delivers, &mut application, &mut buffer)
768                    .await;
769
770                // Attempt to fill gaps before handling produce requests (so we
771                // can serve data we just received).
772                needs_sync |= self
773                    .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
774                    .await;
775
776                // Sync archives before responding to peers (prioritize our own
777                // durability).
778                if needs_sync {
779                    self.sync_finalized().await;
780                }
781
782                // Handle produce requests in parallel.
783                join_all(
784                    produces
785                        .into_iter()
786                        .map(|(key, response)| self.handle_produce(key, response, &buffer)),
787                )
788                .await;
789            },
790        }
791    }
792
793    /// Handle a produce request from a remote peer.
794    async fn handle_produce<Buf: Buffer<V>>(
795        &self,
796        key: Request<V::Commitment>,
797        response: oneshot::Sender<Bytes>,
798        buffer: &Buf,
799    ) {
800        match key {
801            Request::Block(commitment) => {
802                let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
803                    debug!(?commitment, "block missing on request");
804                    return;
805                };
806                response.send_lossy(block.encode());
807            }
808            Request::Finalized { height } => {
809                let Some(finalization) = self.get_finalization_by_height(height).await else {
810                    debug!(%height, "finalization missing on request");
811                    return;
812                };
813                let Some(block) = self.get_finalized_block(height).await else {
814                    debug!(%height, "finalized block missing on request");
815                    return;
816                };
817                response.send_lossy((finalization, block).encode());
818            }
819            Request::Notarized { round } => {
820                let Some(notarization) = self.cache.get_notarization(round).await else {
821                    debug!(?round, "notarization missing on request");
822                    return;
823                };
824                let commitment = notarization.proposal.payload;
825                let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
826                    debug!(?commitment, "block missing on request");
827                    return;
828                };
829                response.send_lossy((notarization, block).encode());
830            }
831        }
832    }
833
834    /// Handle a local subscription request for a block.
835    async fn handle_subscribe<Buf: Buffer<V>>(
836        &mut self,
837        round: Option<Round>,
838        key: BlockSubscriptionKeyFor<V>,
839        response: oneshot::Sender<V::Block>,
840        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
841        waiters: &mut AbortablePool<Result<V::Block, BlockSubscriptionKeyFor<V>>>,
842        buffer: &mut Buf,
843    ) {
844        let digest = match key {
845            BlockSubscriptionKey::Digest(digest) => digest,
846            BlockSubscriptionKey::Commitment(commitment) => V::commitment_to_inner(commitment),
847        };
848
849        // Check for block locally.
850        let block = match key {
851            BlockSubscriptionKey::Digest(digest) => self.find_block_by_digest(buffer, digest).await,
852            BlockSubscriptionKey::Commitment(commitment) => {
853                self.find_block_by_commitment(buffer, commitment).await
854            }
855        };
856        if let Some(block) = block {
857            response.send_lossy(block);
858            return;
859        }
860
861        // We don't have the block locally, so fetch by round if we have one.
862        // If we only have a digest (no round), do not issue a resolver request:
863        // we would not know when to drop it, and it could stay pending forever
864        // for a block that never finalizes.
865        if let Some(round) = round {
866            if round < self.last_processed_round {
867                // At this point, we have failed to find the block locally, and
868                // we know that its round is less than the last processed round.
869                // This means that something else was finalized in that round,
870                // so we drop the response to indicate that the block may never
871                // be available.
872                return;
873            }
874            // Attempt to fetch the block (with notarization) from the resolver.
875            // If this is a valid view, this request should be fine to keep open
876            // until resolution or pruning (even if the oneshot is canceled).
877            debug!(?round, ?digest, "requested block missing");
878            resolver
879                .fetch(Request::<V::Commitment>::Notarized { round })
880                .await;
881        }
882
883        // Register subscriber.
884        match key {
885            BlockSubscriptionKey::Digest(digest) => {
886                debug!(?round, ?digest, "registering subscriber");
887            }
888            BlockSubscriptionKey::Commitment(commitment) => {
889                debug!(?round, ?commitment, ?digest, "registering subscriber");
890            }
891        }
892        match self.block_subscriptions.entry(key) {
893            Entry::Occupied(mut entry) => {
894                entry.get_mut().subscribers.push(response);
895            }
896            Entry::Vacant(entry) => {
897                let rx = match key {
898                    BlockSubscriptionKey::Digest(digest) => {
899                        buffer.subscribe_by_digest(digest).await
900                    }
901                    BlockSubscriptionKey::Commitment(commitment) => {
902                        buffer.subscribe_by_commitment(commitment).await
903                    }
904                };
905                let waiter_key = key;
906                let aborter = waiters.push(async move {
907                    rx.await
908                        .map_or_else(|_| Err(waiter_key), |block| Ok(block.into_block()))
909                });
910                entry.insert(BlockSubscription {
911                    subscribers: vec![response],
912                    _aborter: aborter,
913                });
914            }
915        }
916    }
917
918    /// Handle a deliver message from the resolver. Block delivers are handled
919    /// immediately. Finalized/Notarized delivers are parsed and structurally
920    /// validated, then collected into `delivers` for batch certificate verification.
921    /// Returns true if finalization archives were written and need syncing.
922    async fn handle_deliver<Buf: Buffer<V>>(
923        &mut self,
924        key: Request<V::Commitment>,
925        value: Bytes,
926        response: oneshot::Sender<bool>,
927        delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
928        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
929        buffer: &mut Buf,
930    ) -> bool {
931        match key {
932            Request::Block(commitment) => {
933                let Ok(block) = V::Block::decode_cfg(value.as_ref(), &self.block_codec_config)
934                else {
935                    response.send_lossy(false);
936                    return false;
937                };
938                if V::commitment(&block) != commitment {
939                    response.send_lossy(false);
940                    return false;
941                }
942
943                // Persist the block, also storing the finalization if we have it.
944                let height = block.height();
945                let digest = block.digest();
946                let finalization = self.cache.get_finalization_for(digest).await;
947                let wrote = self
948                    .store_finalization(height, digest, block, finalization, application, buffer)
949                    .await;
950                debug!(?digest, %height, "received block");
951                response.send_lossy(true); // if a valid block is received, we should still send true (even if it was stale)
952                wrote
953            }
954            Request::Finalized { height } => {
955                let Some(bounds) = self.epocher.containing(height) else {
956                    debug!(
957                        %height,
958                        floor = %self.last_processed_height,
959                        "ignoring stale delivery"
960                    );
961                    response.send_lossy(true);
962                    return false;
963                };
964                let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
965                    debug!(
966                        %height,
967                        floor = %self.last_processed_height,
968                        "ignoring stale delivery"
969                    );
970                    response.send_lossy(true);
971                    return false;
972                };
973
974                let Ok((finalization, block)) =
975                    <(Finalization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
976                        value,
977                        &(
978                            scheme.certificate_codec_config(),
979                            self.block_codec_config.clone(),
980                        ),
981                    )
982                else {
983                    response.send_lossy(false);
984                    return false;
985                };
986
987                let commitment = finalization.proposal.payload;
988                if block.height() != height
989                    || V::commitment(&block) != commitment
990                    || finalization.epoch() != bounds.epoch()
991                {
992                    response.send_lossy(false);
993                    return false;
994                }
995                delivers.push(PendingVerification::Finalized {
996                    finalization,
997                    block,
998                    response,
999                });
1000                false
1001            }
1002            Request::Notarized { round } => {
1003                let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
1004                    debug!(
1005                        ?round,
1006                        floor = %self.last_processed_height,
1007                        "ignoring stale delivery"
1008                    );
1009                    response.send_lossy(true);
1010                    return false;
1011                };
1012
1013                let Ok((notarization, block)) =
1014                    <(Notarization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
1015                        value,
1016                        &(
1017                            scheme.certificate_codec_config(),
1018                            self.block_codec_config.clone(),
1019                        ),
1020                    )
1021                else {
1022                    response.send_lossy(false);
1023                    return false;
1024                };
1025
1026                if notarization.round() != round
1027                    || V::commitment(&block) != notarization.proposal.payload
1028                {
1029                    response.send_lossy(false);
1030                    return false;
1031                }
1032                delivers.push(PendingVerification::Notarized {
1033                    notarization,
1034                    block,
1035                    response,
1036                });
1037                false
1038            }
1039        }
1040    }
1041
1042    /// Batch verify pending certificates and process valid items. Returns true
1043    /// if finalization archives were written and need syncing.
1044    async fn verify_delivered<Buf: Buffer<V>>(
1045        &mut self,
1046        mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1047        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1048        buffer: &mut Buf,
1049    ) -> bool {
1050        if delivers.is_empty() {
1051            return false;
1052        }
1053
1054        // Extract (subject, certificate) pairs for batch verification.
1055        let certs: Vec<_> = delivers
1056            .iter()
1057            .map(|item| match item {
1058                PendingVerification::Finalized { finalization, .. } => (
1059                    Subject::Finalize {
1060                        proposal: &finalization.proposal,
1061                    },
1062                    &finalization.certificate,
1063                ),
1064                PendingVerification::Notarized { notarization, .. } => (
1065                    Subject::Notarize {
1066                        proposal: &notarization.proposal,
1067                    },
1068                    &notarization.certificate,
1069                ),
1070            })
1071            .collect();
1072
1073        // Batch verify using the all-epoch verifier if available, otherwise
1074        // batch verify per epoch using scoped verifiers.
1075        let verified = if let Some(scheme) = self.provider.all() {
1076            verify_certificates(&mut self.context, scheme.as_ref(), &certs, &self.strategy)
1077        } else {
1078            let mut verified = vec![false; delivers.len()];
1079
1080            // Group indices by epoch.
1081            let mut by_epoch: BTreeMap<Epoch, Vec<usize>> = BTreeMap::new();
1082            for (i, item) in delivers.iter().enumerate() {
1083                let epoch = match item {
1084                    PendingVerification::Notarized { notarization, .. } => notarization.epoch(),
1085                    PendingVerification::Finalized { finalization, .. } => finalization.epoch(),
1086                };
1087                by_epoch.entry(epoch).or_default().push(i);
1088            }
1089
1090            // Batch verify each epoch group.
1091            for (epoch, indices) in &by_epoch {
1092                let Some(scheme) = self.provider.scoped(*epoch) else {
1093                    continue;
1094                };
1095                let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect();
1096                let results =
1097                    verify_certificates(&mut self.context, scheme.as_ref(), &group, &self.strategy);
1098                for (j, &idx) in indices.iter().enumerate() {
1099                    verified[idx] = results[j];
1100                }
1101            }
1102            verified
1103        };
1104
1105        // Process each verified item, rejecting unverified ones.
1106        let mut wrote = false;
1107        for (index, item) in delivers.drain(..).enumerate() {
1108            if !verified[index] {
1109                match item {
1110                    PendingVerification::Finalized { response, .. }
1111                    | PendingVerification::Notarized { response, .. } => {
1112                        response.send_lossy(false);
1113                    }
1114                }
1115                continue;
1116            }
1117            match item {
1118                PendingVerification::Finalized {
1119                    finalization,
1120                    block,
1121                    response,
1122                } => {
1123                    // Valid finalization received.
1124                    response.send_lossy(true);
1125                    let round = finalization.round();
1126                    let height = block.height();
1127                    let digest = block.digest();
1128                    debug!(?round, %height, "received finalization");
1129
1130                    wrote |= self
1131                        .store_finalization(
1132                            height,
1133                            digest,
1134                            block,
1135                            Some(finalization),
1136                            application,
1137                            buffer,
1138                        )
1139                        .await;
1140                }
1141                PendingVerification::Notarized {
1142                    notarization,
1143                    block,
1144                    response,
1145                } => {
1146                    // Valid notarization received.
1147                    response.send_lossy(true);
1148                    let round = notarization.round();
1149                    let commitment = notarization.proposal.payload;
1150                    let digest = V::commitment_to_inner(commitment);
1151                    debug!(?round, ?digest, "received notarization");
1152
1153                    // If there exists a finalization certificate for this block, we
1154                    // should finalize it. This could finalize the block faster when
1155                    // a notarization then a finalization are received via consensus
1156                    // and we resolve the notarization request before the block request.
1157                    let height = block.height();
1158                    if let Some(finalization) = self.cache.get_finalization_for(digest).await {
1159                        // SAFETY: `digest` identifies a unique `commitment`, so this
1160                        // cached finalization payload must match `V::commitment(&block)`.
1161                        wrote |= self
1162                            .store_finalization(
1163                                height,
1164                                digest,
1165                                block.clone(),
1166                                Some(finalization),
1167                                application,
1168                                buffer,
1169                            )
1170                            .await;
1171                    }
1172
1173                    // Cache the notarization and block.
1174                    self.cache_block(round, digest, block).await;
1175                    self.cache
1176                        .put_notarization(round, digest, notarization)
1177                        .await;
1178                }
1179            }
1180        }
1181
1182        wrote
1183    }
1184
1185    /// Returns a scheme suitable for verifying certificates at the given epoch.
1186    ///
1187    /// Prefers a certificate verifier if available, otherwise falls back
1188    /// to the scheme for the given epoch.
1189    fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
1190        self.provider.all().or_else(|| self.provider.scoped(epoch))
1191    }
1192
1193    // -------------------- Waiters --------------------
1194
1195    /// Notify any subscribers for the given digest with the provided block.
1196    fn notify_subscribers(&mut self, block: &V::Block) {
1197        if let Some(mut bs) = self
1198            .block_subscriptions
1199            .remove(&BlockSubscriptionKey::Digest(block.digest()))
1200        {
1201            for subscriber in bs.subscribers.drain(..) {
1202                subscriber.send_lossy(block.clone());
1203            }
1204        }
1205        if let Some(mut bs) = self
1206            .block_subscriptions
1207            .remove(&BlockSubscriptionKey::Commitment(V::commitment(block)))
1208        {
1209            for subscriber in bs.subscribers.drain(..) {
1210                subscriber.send_lossy(block.clone());
1211            }
1212        }
1213    }
1214
1215    // -------------------- Application Dispatch --------------------
1216
1217    /// Attempt to dispatch the next finalized block to the application if ready.
1218    ///
1219    /// Dispatch finalized blocks to the application until the pipeline is full
1220    /// or no more blocks are available.
1221    ///
1222    /// This does NOT advance `last_processed_height` or sync metadata. It only
1223    /// sends blocks to the application and enqueues pending acks. Metadata is
1224    /// updated later, in a subsequent `select_loop!` iteration, when acks
1225    /// arrive and [`Self::handle_block_processed`] calls
1226    /// [`Self::update_processed_height`].
1227    ///
1228    /// Acks are processed in FIFO order so `last_processed_height` always
1229    /// advances sequentially.
1230    ///
1231    /// # Crash safety
1232    ///
1233    /// Because `select_loop!` arms run to completion, the caller's
1234    /// [`Self::sync_finalized`] always executes before the ack handler runs. This
1235    /// guarantees archive data is durable before `last_processed_height`
1236    /// advances:
1237    ///
1238    /// ```text
1239    /// Iteration N (caller):
1240    ///   store_finalization  ->  Archive::put (buffered)
1241    ///   try_dispatch_blocks  ->  sends blocks to app, enqueues pending acks
1242    ///   sync_finalized      ->  archive durable
1243    ///
1244    /// Iteration M (ack handler, M > N):
1245    ///   handle_block_processed   ->  update_processed_height  ->  metadata buffered
1246    ///   application_metadata.sync ->  metadata durable
1247    /// ```
1248    async fn try_dispatch_blocks(
1249        &mut self,
1250        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1251    ) {
1252        while self.pending_acks.has_capacity() {
1253            let next_height = self
1254                .pending_acks
1255                .next_dispatch_height(self.last_processed_height);
1256            let Some(block) = self.get_finalized_block(next_height).await else {
1257                return;
1258            };
1259            assert_eq!(
1260                block.height(),
1261                next_height,
1262                "finalized block height mismatch"
1263            );
1264
1265            let (height, commitment) = (block.height(), V::commitment(&block));
1266            let (ack, ack_waiter) = A::handle();
1267            application
1268                .report(Update::Block(V::into_inner(block), ack))
1269                .await;
1270            self.pending_acks.enqueue(PendingAck {
1271                height,
1272                commitment,
1273                receiver: ack_waiter,
1274            });
1275        }
1276    }
1277
1278    /// Handle acknowledgement from the application that a block has been processed.
1279    ///
1280    /// Buffers the processed height update but does NOT sync to durable storage.
1281    /// The caller must sync metadata after processing all ready acks.
1282    async fn handle_block_processed(
1283        &mut self,
1284        height: Height,
1285        commitment: V::Commitment,
1286        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1287    ) {
1288        // Update the processed height (buffered, not synced)
1289        self.update_processed_height(height, resolver).await;
1290
1291        // Cancel any useless requests
1292        resolver
1293            .cancel(Request::<V::Commitment>::Block(commitment))
1294            .await;
1295
1296        if let Some(finalization) = self.get_finalization_by_height(height).await {
1297            // Trail the previous processed finalized block by the timeout
1298            let lpr = self.last_processed_round;
1299            let prune_round = Round::new(
1300                lpr.epoch(),
1301                lpr.view().saturating_sub(self.view_retention_timeout),
1302            );
1303
1304            // Prune archives
1305            self.cache.prune(prune_round).await;
1306
1307            // Update the last processed round
1308            let round = finalization.round();
1309            self.last_processed_round = round;
1310
1311            // Cancel useless requests
1312            resolver
1313                .retain(Request::<V::Commitment>::Notarized { round }.predicate())
1314                .await;
1315        }
1316    }
1317
1318    // -------------------- Prunable Storage --------------------
1319
1320    /// Add a verified block to the prunable archive.
1321    async fn cache_verified(
1322        &mut self,
1323        round: Round,
1324        digest: <V::Block as Digestible>::Digest,
1325        block: V::Block,
1326    ) {
1327        self.notify_subscribers(&block);
1328        self.cache.put_verified(round, digest, block.into()).await;
1329    }
1330
1331    /// Add a notarized block to the prunable archive.
1332    async fn cache_block(
1333        &mut self,
1334        round: Round,
1335        digest: <V::Block as Digestible>::Digest,
1336        block: V::Block,
1337    ) {
1338        self.notify_subscribers(&block);
1339        self.cache.put_block(round, digest, block.into()).await;
1340    }
1341
1342    /// Sync both finalization archives to durable storage.
1343    ///
1344    /// Must be called within the same `select_loop!` arm as any preceding
1345    /// [`Self::store_finalization`] / [`Self::try_repair_gaps`] writes, before yielding back
1346    /// to the loop. This ensures archives are durable before the ack handler
1347    /// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for details.
1348    async fn sync_finalized(&mut self) {
1349        if let Err(e) = try_join!(
1350            async {
1351                self.finalized_blocks.sync().await.map_err(Box::new)?;
1352                Ok::<_, BoxedError>(())
1353            },
1354            async {
1355                self.finalizations_by_height
1356                    .sync()
1357                    .await
1358                    .map_err(Box::new)?;
1359                Ok::<_, BoxedError>(())
1360            },
1361        ) {
1362            panic!("failed to sync finalization archives: {e}");
1363        }
1364    }
1365
1366    // -------------------- Immutable Storage --------------------
1367
1368    /// Get a finalized block from the immutable archive.
1369    async fn get_finalized_block(&self, height: Height) -> Option<V::Block> {
1370        match self
1371            .finalized_blocks
1372            .get(ArchiveID::Index(height.get()))
1373            .await
1374        {
1375            Ok(stored) => stored.map(|stored| stored.into()),
1376            Err(e) => panic!("failed to get block: {e}"),
1377        }
1378    }
1379
1380    /// Get a finalization from the archive by height.
1381    async fn get_finalization_by_height(
1382        &self,
1383        height: Height,
1384    ) -> Option<Finalization<P::Scheme, V::Commitment>> {
1385        match self
1386            .finalizations_by_height
1387            .get(ArchiveID::Index(height.get()))
1388            .await
1389        {
1390            Ok(finalization) => finalization,
1391            Err(e) => panic!("failed to get finalization: {e}"),
1392        }
1393    }
1394
1395    /// Add a finalized block, and optionally a finalization, to the archive.
1396    ///
1397    /// After persisting the block, attempt to dispatch the next contiguous block to the application.
1398    ///
1399    /// Writes are buffered and not synced. The caller must call
1400    /// [sync_finalized](Self::sync_finalized) before yielding to the
1401    /// `select_loop!` so that archive data is durable before the ack handler
1402    /// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for the
1403    /// crash safety invariant.
1404    async fn store_finalization<Buf: Buffer<V>>(
1405        &mut self,
1406        height: Height,
1407        digest: <V::Block as Digestible>::Digest,
1408        block: V::Block,
1409        finalization: Option<Finalization<P::Scheme, V::Commitment>>,
1410        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1411        buffer: &mut Buf,
1412    ) -> bool {
1413        // Blocks below the last processed height are not useful to us, so we ignore them (this
1414        // has the nice byproduct of ensuring we don't call a backing store with a block below the
1415        // pruning boundary)
1416        if height <= self.last_processed_height {
1417            debug!(
1418                %height,
1419                floor = %self.last_processed_height,
1420                ?digest,
1421                "dropping finalization at or below processed height floor"
1422            );
1423            return false;
1424        }
1425        self.notify_subscribers(&block);
1426
1427        // Convert block to storage format
1428        let commitment = V::commitment(&block);
1429        let stored: V::StoredBlock = block.into();
1430        let round = finalization.as_ref().map(|f| f.round());
1431
1432        // In parallel, update the finalized blocks and finalizations archives
1433        if let Err(e) = try_join!(
1434            // Update the finalized blocks archive
1435            async {
1436                self.finalized_blocks.put(stored).await.map_err(Box::new)?;
1437                Ok::<_, BoxedError>(())
1438            },
1439            // Update the finalizations archive (if provided)
1440            async {
1441                if let Some(finalization) = finalization {
1442                    self.finalizations_by_height
1443                        .put(height, digest, finalization)
1444                        .await
1445                        .map_err(Box::new)?;
1446                }
1447                Ok::<_, BoxedError>(())
1448            }
1449        ) {
1450            panic!("failed to finalize: {e}");
1451        }
1452
1453        // Update metrics, buffer, and application
1454        if let Some(round) = round.filter(|_| height > self.tip) {
1455            application.report(Update::Tip(round, height, digest)).await;
1456            self.tip = height;
1457            let _ = self.finalized_height.try_set(height.get());
1458        }
1459        buffer.finalized(commitment).await;
1460        self.try_dispatch_blocks(application).await;
1461
1462        true
1463    }
1464
1465    /// Get the latest finalized block information (height and digest tuple).
1466    ///
1467    /// Blocks are only finalized directly with a finalization or indirectly via a descendant
1468    /// block's finalization. Thus, the highest known finalized block must itself have a direct
1469    /// finalization.
1470    ///
1471    /// We return the height and digest using the highest known finalization that we know the
1472    /// block height for. While it's possible that we have a later finalization, if we do not have
1473    /// the full block for that finalization, we do not know its height and therefore it would not
1474    /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
1475    /// should have the associated block (in the `finalized_blocks` archive) for the information
1476    /// returned.
1477    async fn get_latest(&mut self) -> Option<(Height, <V::Block as Digestible>::Digest, Round)> {
1478        let height = self.finalizations_by_height.last_index()?;
1479        let finalization = self
1480            .get_finalization_by_height(height)
1481            .await
1482            .expect("finalization missing");
1483        Some((
1484            height,
1485            V::commitment_to_inner(finalization.proposal.payload),
1486            finalization.round(),
1487        ))
1488    }
1489
1490    // -------------------- Mixed Storage --------------------
1491
1492    /// Looks for a block in cache and finalized storage by digest.
1493    async fn find_block_in_storage(
1494        &self,
1495        digest: <V::Block as Digestible>::Digest,
1496    ) -> Option<V::Block> {
1497        // Check verified / notarized blocks via cache manager.
1498        if let Some(block) = self.cache.find_block(digest).await {
1499            return Some(block.into());
1500        }
1501        // Check finalized blocks.
1502        match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1503            Ok(stored) => stored.map(|stored| stored.into()),
1504            Err(e) => panic!("failed to get block: {e}"),
1505        }
1506    }
1507
1508    /// Looks for a block anywhere in local storage using only the digest.
1509    ///
1510    /// This is used when we only have a digest (during gap repair following
1511    /// parent links).
1512    async fn find_block_by_digest<Buf: Buffer<V>>(
1513        &self,
1514        buffer: &mut Buf,
1515        digest: <V::Block as Digestible>::Digest,
1516    ) -> Option<V::Block> {
1517        if let Some(block) = buffer.find_by_digest(digest).await {
1518            return Some(block.into_block());
1519        }
1520        self.find_block_in_storage(digest).await
1521    }
1522
1523    /// Looks for a block anywhere in local storage using the full commitment.
1524    ///
1525    /// This is used when we have a full commitment (from notarizations/finalizations).
1526    /// Having the full commitment may enable additional retrieval mechanisms.
1527    async fn find_block_by_commitment<Buf: Buffer<V>>(
1528        &self,
1529        buffer: &Buf,
1530        commitment: V::Commitment,
1531    ) -> Option<V::Block> {
1532        if let Some(block) = buffer.find_by_commitment(commitment).await {
1533            return Some(block.into_block());
1534        }
1535        self.find_block_in_storage(V::commitment_to_inner(commitment))
1536            .await
1537    }
1538
1539    /// Attempt to repair any identified gaps in the finalized blocks archive. The total
1540    /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
1541    /// though multiple gaps may be spanned.
1542    ///
1543    /// This also handles the "trailing" case where finalizations exist beyond
1544    /// the last stored block (the block data was lost before a crash). The
1545    /// trailing block is anchored first so that backward gap repair can fill
1546    /// inward from it.
1547    ///
1548    /// Writes are buffered. Returns `true` if this call wrote repaired blocks and
1549    /// needs a subsequent [`sync_finalized`](Self::sync_finalized).
1550    async fn try_repair_gaps<Buf: Buffer<V>>(
1551        &mut self,
1552        buffer: &mut Buf,
1553        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1554        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1555    ) -> bool {
1556        let mut wrote = false;
1557        let start = self.last_processed_height.next();
1558
1559        // If finalizations extend beyond the last stored block, anchor the
1560        // trailing block so the gap repair loop below can walk backward from it.
1561        if let Some(last_finalized) = self.finalizations_by_height.last_index() {
1562            let have_block = self
1563                .finalized_blocks
1564                .last_index()
1565                .is_some_and(|last| last >= last_finalized);
1566            if last_finalized > self.last_processed_height && !have_block {
1567                // Get the finalization for the last finalized block.
1568                let finalization = self
1569                    .get_finalization_by_height(last_finalized)
1570                    .await
1571                    .expect("finalization missing");
1572                let commitment = finalization.proposal.payload;
1573                if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1574                    // If found, persist the block.
1575                    let digest = block.digest();
1576                    wrote |= self
1577                        .store_finalization(
1578                            last_finalized,
1579                            digest,
1580                            block,
1581                            Some(finalization),
1582                            application,
1583                            buffer,
1584                        )
1585                        .await;
1586                } else {
1587                    // Request the missing block.
1588                    resolver
1589                        .fetch(Request::<V::Commitment>::Block(commitment))
1590                        .await;
1591                }
1592            }
1593        }
1594
1595        // Fill internal gaps by walking backward from each gap's end block.
1596        'cache_repair: loop {
1597            let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1598                // No gaps detected
1599                return wrote;
1600            };
1601
1602            // Attempt to repair the gap backwards from the end of the gap, using
1603            // blocks from our local storage.
1604            let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1605                panic!("gapped block missing that should exist: {gap_end}");
1606            };
1607
1608            // Compute the lower bound of the recursive repair. `gap_start` is `Some`
1609            // if `start` is not in a gap. We add one to it to ensure we don't
1610            // re-persist it to the database in the repair loop below.
1611            let gap_start = gap_start.map(Height::next).unwrap_or(start);
1612
1613            // Iterate backwards, repairing blocks as we go.
1614            while cursor.height() > gap_start {
1615                let parent_digest = cursor.parent();
1616                let parent_commitment = V::parent_commitment(&cursor);
1617                if let Some(block) = self
1618                    .find_block_by_commitment(buffer, parent_commitment)
1619                    .await
1620                {
1621                    let finalization = self.cache.get_finalization_for(parent_digest).await;
1622                    wrote |= self
1623                        .store_finalization(
1624                            block.height(),
1625                            parent_digest,
1626                            block.clone(),
1627                            finalization,
1628                            application,
1629                            buffer,
1630                        )
1631                        .await;
1632                    debug!(height = %block.height(), "repaired block");
1633                    cursor = block;
1634                } else {
1635                    // Request the next missing commitment.
1636                    //
1637                    // SAFETY: We can rely on this derived parent commitment because
1638                    // the block is provably a member of the finalized chain due to the end
1639                    // boundary of the gap being finalized.
1640                    resolver
1641                        .fetch(Request::<V::Commitment>::Block(parent_commitment))
1642                        .await;
1643                    break 'cache_repair;
1644                }
1645            }
1646        }
1647
1648        // Request any finalizations for missing items in the archive, up to
1649        // the `max_repair` quota. This may help shrink the size of the gap
1650        // closest to the application's processed height if finalizations
1651        // for the requests' heights exist. If not, we rely on the recursive
1652        // digest fetches above.
1653        let missing_items = self
1654            .finalized_blocks
1655            .missing_items(start, self.max_repair.get());
1656        let requests: Vec<_> = missing_items
1657            .into_iter()
1658            .map(|height| Request::<V::Commitment>::Finalized { height })
1659            .collect();
1660        if !requests.is_empty() {
1661            resolver.fetch_all(requests).await
1662        }
1663        wrote
1664    }
1665
1666    /// Buffers a processed height update in memory and metrics. Does NOT sync
1667    /// to durable storage. Sync metadata after buffered updates to make them durable.
1668    async fn update_processed_height(
1669        &mut self,
1670        height: Height,
1671        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1672    ) {
1673        self.application_metadata.put(LATEST_KEY, height);
1674        self.last_processed_height = height;
1675        let _ = self
1676            .processed_height
1677            .try_set(self.last_processed_height.get());
1678
1679        // Cancel any existing requests below the new floor.
1680        resolver
1681            .retain(Request::<V::Commitment>::Finalized { height }.predicate())
1682            .await;
1683    }
1684
1685    /// Prunes finalized blocks and certificates below the given height.
1686    async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
1687        try_join!(
1688            async {
1689                self.finalized_blocks
1690                    .prune(height)
1691                    .await
1692                    .map_err(Box::new)?;
1693                Ok::<_, BoxedError>(())
1694            },
1695            async {
1696                self.finalizations_by_height
1697                    .prune(height)
1698                    .await
1699                    .map_err(Box::new)?;
1700                Ok::<_, BoxedError>(())
1701            }
1702        )?;
1703        Ok(())
1704    }
1705}