Skip to main content

commonware_consensus/marshal/core/
actor.rs

1use super::{
2    cache,
3    mailbox::{Mailbox, Message},
4    Buffer, IntoBlock, Variant,
5};
6use crate::{
7    marshal::{
8        resolver::handler::{self, Request},
9        store::{Blocks, Certificates},
10        Config, Identifier as BlockID, Update,
11    },
12    simplex::{
13        scheme::Scheme,
14        types::{verify_certificates, Finalization, Notarization, Subject},
15    },
16    types::{Epoch, Epocher, Height, Round, ViewDelta},
17    Block, Epochable, Heightable, Reporter,
18};
19use bytes::Bytes;
20use commonware_codec::{Decode, Encode, Read};
21use commonware_cryptography::{
22    certificate::{Provider, Scheme as CertificateScheme},
23    Digestible,
24};
25use commonware_macros::select_loop;
26use commonware_parallel::Strategy;
27use commonware_resolver::Resolver;
28use commonware_runtime::{
29    spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle,
30    Metrics, Spawner, Storage,
31};
32use commonware_storage::{
33    archive::Identifier as ArchiveID,
34    metadata::{self, Metadata},
35};
36use commonware_utils::{
37    acknowledgement::Exact,
38    channel::{fallible::OneshotExt, mpsc, oneshot},
39    futures::{AbortablePool, Aborter, OptionFuture},
40    sequence::U64,
41    Acknowledgement, BoxedError,
42};
43use futures::{future::join_all, try_join, FutureExt};
44use pin_project::pin_project;
45use prometheus_client::metrics::gauge::Gauge;
46use rand_core::CryptoRngCore;
47use std::{
48    collections::{btree_map::Entry, BTreeMap, VecDeque},
49    future::Future,
50    num::NonZeroUsize,
51    pin::Pin,
52    sync::Arc,
53};
54use tracing::{debug, error, info, warn};
55
56/// The key used to store the last processed height in the metadata store.
57const LATEST_KEY: U64 = U64::new(0xFF);
58
59/// A parsed-but-unverified resolver delivery awaiting batch certificate verification.
60enum PendingVerification<S: CertificateScheme, V: Variant> {
61    Notarized {
62        notarization: Notarization<S, V::Commitment>,
63        block: V::Block,
64        response: oneshot::Sender<bool>,
65    },
66    Finalized {
67        finalization: Finalization<S, V::Commitment>,
68        block: V::Block,
69        response: oneshot::Sender<bool>,
70    },
71}
72
73/// A pending acknowledgement from the application for a block at the contained height/commitment.
74#[pin_project]
75struct PendingAck<V: Variant, A: Acknowledgement> {
76    height: Height,
77    commitment: V::Commitment,
78    #[pin]
79    receiver: A::Waiter,
80}
81
82impl<V: Variant, A: Acknowledgement> Future for PendingAck<V, A> {
83    type Output = <A::Waiter as Future>::Output;
84
85    fn poll(
86        self: std::pin::Pin<&mut Self>,
87        cx: &mut std::task::Context<'_>,
88    ) -> std::task::Poll<Self::Output> {
89        self.project().receiver.poll(cx)
90    }
91}
92
93/// Tracks in-flight application acknowledgements with FIFO semantics.
94struct PendingAcks<V: Variant, A: Acknowledgement> {
95    current: OptionFuture<PendingAck<V, A>>,
96    queue: VecDeque<PendingAck<V, A>>,
97    max: usize,
98}
99
100impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
101    /// Creates a new pending-ack tracker with a maximum in-flight capacity.
102    fn new(max: usize) -> Self {
103        Self {
104            current: None.into(),
105            queue: VecDeque::with_capacity(max),
106            max,
107        }
108    }
109
110    /// Drops the current ack and all queued acks.
111    fn clear(&mut self) {
112        self.current = None.into();
113        self.queue.clear();
114    }
115
116    /// Returns the currently armed ack future (if any) for `select_loop!`.
117    const fn current(&mut self) -> &mut OptionFuture<PendingAck<V, A>> {
118        &mut self.current
119    }
120
121    /// Returns whether we can dispatch another block without exceeding capacity.
122    fn has_capacity(&self) -> bool {
123        let reserved = usize::from(self.current.is_some());
124        self.queue.len() < self.max - reserved
125    }
126
127    /// Returns the next height to dispatch while preserving sequential order.
128    fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
129        self.queue
130            .back()
131            .map(|ack| ack.height.next())
132            .or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
133            .unwrap_or_else(|| last_processed_height.next())
134    }
135
136    /// Enqueues a newly dispatched ack, arming it immediately when idle.
137    fn enqueue(&mut self, ack: PendingAck<V, A>) {
138        if self.current.is_none() {
139            self.current.replace(ack);
140            return;
141        }
142        self.queue.push_back(ack);
143    }
144
145    /// Returns metadata for a completed current ack and arms the next queued ack.
146    fn complete_current(
147        &mut self,
148        result: <A::Waiter as Future>::Output,
149    ) -> (Height, V::Commitment, <A::Waiter as Future>::Output) {
150        let PendingAck {
151            height, commitment, ..
152        } = self.current.take().expect("ack state must be present");
153        if let Some(next) = self.queue.pop_front() {
154            self.current.replace(next);
155        }
156        (height, commitment, result)
157    }
158
159    /// If the current ack is already resolved, takes it and arms the next ack.
160    fn pop_ready(&mut self) -> Option<(Height, V::Commitment, <A::Waiter as Future>::Output)> {
161        let pending = self.current.as_mut()?;
162        let result = Pin::new(&mut pending.receiver).now_or_never()?;
163        Some(self.complete_current(result))
164    }
165}
166
167/// A struct that holds multiple subscriptions for a block.
168struct BlockSubscription<V: Variant> {
169    // The subscribers that are waiting for the block
170    subscribers: Vec<oneshot::Sender<V::Block>>,
171    // Aborter that aborts the waiter future when dropped
172    _aborter: Aborter,
173}
174
175/// The key used to track block subscriptions.
176///
177/// Digest-scoped and commitment-scoped subscriptions are intentionally distinct
178/// so a block that aliases on digest cannot satisfy a different commitment wait.
179#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
180enum BlockSubscriptionKey<C, D> {
181    Digest(D),
182    Commitment(C),
183}
184
185type BlockSubscriptionKeyFor<V> =
186    BlockSubscriptionKey<<V as Variant>::Commitment, <<V as Variant>::Block as Digestible>::Digest>;
187
188/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
189/// receiving notarizations and finalizations from consensus, and reconstructing a total order
190/// of blocks.
191///
192/// The actor is designed to be used in a view-based model. Each view corresponds to a
193/// potential block in the chain. The actor will only finalize a block if it has a
194/// corresponding finalization.
195///
196/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
197/// finalization for a block that is ahead of its current view, it will request the missing blocks
198/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
199/// behind.
200pub struct Actor<E, V, P, FC, FB, ES, T, A = Exact>
201where
202    E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
203    V: Variant,
204    P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
205    FC: Certificates<
206        BlockDigest = <V::Block as Digestible>::Digest,
207        Commitment = V::Commitment,
208        Scheme = P::Scheme,
209    >,
210    FB: Blocks<Block = V::StoredBlock>,
211    ES: Epocher,
212    T: Strategy,
213    A: Acknowledgement,
214{
215    // ---------- Context ----------
216    context: ContextCell<E>,
217
218    // ---------- Message Passing ----------
219    // Mailbox
220    mailbox: mpsc::Receiver<Message<P::Scheme, V>>,
221
222    // ---------- Configuration ----------
223    // Provider for epoch-specific signing schemes
224    provider: P,
225    // Epoch configuration
226    epocher: ES,
227    // Minimum number of views to retain temporary data after the application processes a block
228    view_retention_timeout: ViewDelta,
229    // Maximum number of blocks to repair at once
230    max_repair: NonZeroUsize,
231    // Codec configuration for block type
232    block_codec_config: <V::Block as Read>::Cfg,
233    // Strategy for parallel operations
234    strategy: T,
235
236    // ---------- State ----------
237    // Last view processed
238    last_processed_round: Round,
239    // Last height processed by the application
240    last_processed_height: Height,
241    // Pending application acknowledgements
242    pending_acks: PendingAcks<V, A>,
243    // Highest known finalized height
244    tip: Height,
245    // Outstanding subscriptions for blocks
246    block_subscriptions: BTreeMap<BlockSubscriptionKeyFor<V>, BlockSubscription<V>>,
247
248    // ---------- Storage ----------
249    // Prunable cache
250    cache: cache::Manager<E, V, P::Scheme>,
251    // Metadata tracking application progress
252    application_metadata: Metadata<E, U64, Height>,
253    // Finalizations stored by height
254    finalizations_by_height: FC,
255    // Finalized blocks stored by height
256    finalized_blocks: FB,
257
258    // ---------- Metrics ----------
259    // Latest height metric
260    finalized_height: Gauge,
261    // Latest processed height
262    processed_height: Gauge,
263}
264
265impl<E, V, P, FC, FB, ES, T, A> Actor<E, V, P, FC, FB, ES, T, A>
266where
267    E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
268    V: Variant,
269    P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
270    FC: Certificates<
271        BlockDigest = <V::Block as Digestible>::Digest,
272        Commitment = V::Commitment,
273        Scheme = P::Scheme,
274    >,
275    FB: Blocks<Block = V::StoredBlock>,
276    ES: Epocher,
277    T: Strategy,
278    A: Acknowledgement,
279{
280    /// Create a new application actor.
281    pub async fn init(
282        context: E,
283        finalizations_by_height: FC,
284        finalized_blocks: FB,
285        config: Config<V::Block, P, ES, T>,
286    ) -> (Self, Mailbox<P::Scheme, V>, Height) {
287        // Initialize cache
288        let prunable_config = cache::Config {
289            partition_prefix: format!("{}-cache", config.partition_prefix),
290            prunable_items_per_section: config.prunable_items_per_section,
291            replay_buffer: config.replay_buffer,
292            key_write_buffer: config.key_write_buffer,
293            value_write_buffer: config.value_write_buffer,
294            key_page_cache: config.page_cache.clone(),
295        };
296        let cache = cache::Manager::init(
297            context.with_label("cache"),
298            prunable_config,
299            config.block_codec_config.clone(),
300        )
301        .await;
302
303        // Initialize metadata tracking application progress
304        let application_metadata = Metadata::init(
305            context.with_label("application_metadata"),
306            metadata::Config {
307                partition: format!("{}-application-metadata", config.partition_prefix),
308                codec_config: (),
309            },
310        )
311        .await
312        .expect("failed to initialize application metadata");
313        let last_processed_height = application_metadata
314            .get(&LATEST_KEY)
315            .copied()
316            .unwrap_or(Height::zero());
317
318        // Create metrics
319        let finalized_height = Gauge::default();
320        context.register(
321            "finalized_height",
322            "Finalized height of application",
323            finalized_height.clone(),
324        );
325        let processed_height = Gauge::default();
326        context.register(
327            "processed_height",
328            "Processed height of application",
329            processed_height.clone(),
330        );
331        let _ = processed_height.try_set(last_processed_height.get());
332
333        // Initialize mailbox
334        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
335        (
336            Self {
337                context: ContextCell::new(context),
338                mailbox,
339                provider: config.provider,
340                epocher: config.epocher,
341                view_retention_timeout: config.view_retention_timeout,
342                max_repair: config.max_repair,
343                block_codec_config: config.block_codec_config,
344                strategy: config.strategy,
345                last_processed_round: Round::zero(),
346                last_processed_height,
347                pending_acks: PendingAcks::new(config.max_pending_acks.get()),
348                tip: Height::zero(),
349                block_subscriptions: BTreeMap::new(),
350                cache,
351                application_metadata,
352                finalizations_by_height,
353                finalized_blocks,
354                finalized_height,
355                processed_height,
356            },
357            Mailbox::new(sender),
358            last_processed_height,
359        )
360    }
361
362    /// Start the actor.
363    pub fn start<R, Buf>(
364        mut self,
365        application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
366        buffer: Buf,
367        resolver: (mpsc::Receiver<handler::Message<V::Commitment>>, R),
368    ) -> Handle<()>
369    where
370        R: Resolver<
371            Key = handler::Request<V::Commitment>,
372            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
373        >,
374        Buf: Buffer<V>,
375    {
376        spawn_cell!(self.context, self.run(application, buffer, resolver).await)
377    }
378
379    /// Run the application actor.
380    async fn run<R, Buf>(
381        mut self,
382        mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
383        mut buffer: Buf,
384        (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<V::Commitment>>, R),
385    ) where
386        R: Resolver<
387            Key = handler::Request<V::Commitment>,
388            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
389        >,
390        Buf: Buffer<V>,
391    {
392        // Create a local pool for waiter futures.
393        let mut waiters = AbortablePool::<Result<V::Block, BlockSubscriptionKeyFor<V>>>::default();
394
395        // Get tip and send to application
396        let tip = self.get_latest().await;
397        if let Some((height, digest, round)) = tip {
398            application.report(Update::Tip(round, height, digest)).await;
399            self.tip = height;
400            let _ = self.finalized_height.try_set(height.get());
401        }
402
403        // Attempt to dispatch the next finalized block to the application, if it is ready.
404        self.try_dispatch_blocks(&mut application).await;
405
406        // Attempt to repair any gaps in the finalized blocks archive, if there are any.
407        if self
408            .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
409            .await
410        {
411            self.sync_finalized().await;
412        }
413
414        select_loop! {
415            self.context,
416            on_start => {
417                // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
418                self.block_subscriptions.retain(|_, bs| {
419                    bs.subscribers.retain(|tx| !tx.is_closed());
420                    !bs.subscribers.is_empty()
421                });
422            },
423            on_stopped => {
424                debug!("context shutdown, stopping marshal");
425            },
426            // Handle waiter completions first
427            Ok(completion) = waiters.next_completed() else continue => match completion {
428                Ok(block) => self.notify_subscribers(&block),
429                Err(key) => {
430                    match key {
431                        BlockSubscriptionKey::Digest(digest) => {
432                            debug!(
433                                ?digest,
434                                "buffer subscription closed, canceling local subscribers"
435                            );
436                        }
437                        BlockSubscriptionKey::Commitment(commitment) => {
438                            debug!(
439                                ?commitment,
440                                "buffer subscription closed, canceling local subscribers"
441                            );
442                        }
443                    }
444                    self.block_subscriptions.remove(&key);
445                }
446            },
447            // Handle application acknowledgements (drain all ready acks, sync once)
448            result = self.pending_acks.current() => {
449                // Start with the ack that woke this `select_loop!` arm.
450                let mut pending = Some(self.pending_acks.complete_current(result));
451                loop {
452                    let (height, commitment, result) =
453                        pending.take().expect("pending ack must exist");
454                    match result {
455                        Ok(()) => {
456                            // Apply in-memory progress updates for this acknowledged block.
457                            self.handle_block_processed(height, commitment, &mut resolver)
458                                .await;
459                        }
460                        Err(e) => {
461                            // Ack failures are fatal for marshal/application coordination.
462                            error!(e = ?e, height = %height, "application did not acknowledge block");
463                            return;
464                        }
465                    }
466
467                    // Opportunistically drain any additional already-ready acks so we
468                    // can persist one metadata sync for the whole batch below.
469                    let Some(next) = self.pending_acks.pop_ready() else {
470                        break;
471                    };
472                    pending = Some(next);
473                }
474
475                // Persist buffered processed-height updates once after draining all ready acks.
476                if let Err(e) = self.application_metadata.sync().await {
477                    error!(?e, "failed to sync application progress");
478                    return;
479                }
480
481                // Fill the pipeline
482                self.try_dispatch_blocks(&mut application).await;
483            },
484            // Handle consensus inputs before backfill or resolver traffic
485            Some(message) = self.mailbox.recv() else {
486                info!("mailbox closed, shutting down");
487                break;
488            } => {
489                match message {
490                    Message::GetInfo {
491                        identifier,
492                        response,
493                    } => {
494                        let info = match identifier {
495                            // TODO: Instead of pulling out the entire block, determine the
496                            // height directly from the archive by mapping the digest to
497                            // the index, which is the same as the height.
498                            BlockID::Digest(digest) => self
499                                .finalized_blocks
500                                .get(ArchiveID::Key(&digest))
501                                .await
502                                .ok()
503                                .flatten()
504                                .map(|b| (b.height(), digest)),
505                            BlockID::Height(height) => self
506                                .finalizations_by_height
507                                .get(ArchiveID::Index(height.get()))
508                                .await
509                                .ok()
510                                .flatten()
511                                .map(|f| (height, V::commitment_to_inner(f.proposal.payload))),
512                            BlockID::Latest => self.get_latest().await.map(|(h, d, _)| (h, d)),
513                        };
514                        response.send_lossy(info);
515                    }
516                    Message::Proposed { round, block } => {
517                        self.cache_verified(round, block.digest(), block.clone())
518                            .await;
519                        buffer.proposed(round, block).await;
520                    }
521                    Message::Verified { round, block } => {
522                        self.cache_verified(round, block.digest(), block).await;
523                    }
524                    Message::Notarization { notarization } => {
525                        let round = notarization.round();
526                        let commitment = notarization.proposal.payload;
527                        let digest = V::commitment_to_inner(commitment);
528
529                        // Store notarization by view
530                        self.cache
531                            .put_notarization(round, digest, notarization.clone())
532                            .await;
533
534                        // Search for block locally, otherwise fetch it remotely.
535                        if let Some(block) =
536                            self.find_block_by_commitment(&buffer, commitment).await
537                        {
538                            // If found, persist the block
539                            self.cache_block(round, digest, block).await;
540                        } else {
541                            debug!(?round, "notarized block missing");
542                            resolver
543                                .fetch(Request::<V::Commitment>::Notarized { round })
544                                .await;
545                        }
546                    }
547                    Message::Finalization { finalization } => {
548                        // Cache finalization by round
549                        let round = finalization.round();
550                        let commitment = finalization.proposal.payload;
551                        let digest = V::commitment_to_inner(commitment);
552                        self.cache
553                            .put_finalization(round, digest, finalization.clone())
554                            .await;
555
556                        // Search for block locally, otherwise fetch it remotely.
557                        if let Some(block) =
558                            self.find_block_by_commitment(&buffer, commitment).await
559                        {
560                            // If found, persist the block
561                            let height = block.height();
562                            if self
563                                .store_finalization(
564                                    height,
565                                    digest,
566                                    block,
567                                    Some(finalization),
568                                    &mut application,
569                                    &mut buffer,
570                                )
571                                .await
572                            {
573                                self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
574                                    .await;
575                                self.sync_finalized().await;
576                                debug!(?round, %height, "finalized block stored");
577                            }
578                        } else {
579                            // Otherwise, fetch the block from the network.
580                            debug!(?round, ?commitment, "finalized block missing");
581                            resolver
582                                .fetch(Request::<V::Commitment>::Block(commitment))
583                                .await;
584                        }
585                    }
586                    Message::GetBlock {
587                        identifier,
588                        response,
589                    } => match identifier {
590                        BlockID::Digest(digest) => {
591                            let result = self.find_block_by_digest(&mut buffer, digest).await;
592                            response.send_lossy(result);
593                        }
594                        BlockID::Height(height) => {
595                            let result = self.get_finalized_block(height).await;
596                            response.send_lossy(result);
597                        }
598                        BlockID::Latest => {
599                            let block = match self.get_latest().await {
600                                Some((_, digest, _)) => {
601                                    self.find_block_by_digest(&mut buffer, digest).await
602                                }
603                                None => None,
604                            };
605                            response.send_lossy(block);
606                        }
607                    },
608                    Message::GetFinalization { height, response } => {
609                        let finalization = self.get_finalization_by_height(height).await;
610                        response.send_lossy(finalization);
611                    }
612                    Message::HintFinalized { height, targets } => {
613                        // Skip if height is at or below the floor
614                        if height <= self.last_processed_height {
615                            continue;
616                        }
617
618                        // Skip if finalization is already available locally
619                        if self.get_finalization_by_height(height).await.is_some() {
620                            continue;
621                        }
622
623                        // Trigger a targeted fetch via the resolver
624                        let request = Request::<V::Commitment>::Finalized { height };
625                        resolver.fetch_targeted(request, targets).await;
626                    }
627                    Message::SubscribeByDigest {
628                        round,
629                        digest,
630                        response,
631                    } => {
632                        self.handle_subscribe(
633                            round,
634                            BlockSubscriptionKey::Digest(digest),
635                            response,
636                            &mut resolver,
637                            &mut waiters,
638                            &mut buffer,
639                        )
640                        .await;
641                    }
642                    Message::SubscribeByCommitment {
643                        round,
644                        commitment,
645                        response,
646                    } => {
647                        self.handle_subscribe(
648                            round,
649                            BlockSubscriptionKey::Commitment(commitment),
650                            response,
651                            &mut resolver,
652                            &mut waiters,
653                            &mut buffer,
654                        )
655                        .await;
656                    }
657                    Message::SetFloor { height } => {
658                        if self.last_processed_height >= height {
659                            warn!(
660                                %height,
661                                existing = %self.last_processed_height,
662                                "floor not updated, lower than existing"
663                            );
664                            continue;
665                        }
666
667                        // Update the processed height
668                        self.update_processed_height(height, &mut resolver).await;
669                        if let Err(err) = self.application_metadata.sync().await {
670                            error!(?err, %height, "failed to update floor");
671                            return;
672                        }
673
674                        // Drop all pending acknowledgements. We must do this to prevent
675                        // an in-process block from being processed that is below the new floor
676                        // updating `last_processed_height`.
677                        self.pending_acks.clear();
678
679                        // Prune data in the finalized archives below the new floor.
680                        if let Err(err) = self.prune_finalized_archives(height).await {
681                            error!(?err, %height, "failed to prune finalized archives");
682                            return;
683                        }
684
685                        // Intentionally keep existing block subscriptions alive. Canceling
686                        // waiters can have catastrophic consequences (nodes can get stuck in
687                        // different views) as actors do not retry subscriptions on failed channels.
688                    }
689                    Message::Prune { height } => {
690                        // Only allow pruning at or below the current floor
691                        if height > self.last_processed_height {
692                            warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring");
693                            continue;
694                        }
695
696                        // Prune the finalized block and finalization certificate archives in parallel.
697                        if let Err(err) = self.prune_finalized_archives(height).await {
698                            error!(?err, %height, "failed to prune finalized archives");
699                            return;
700                        }
701
702                        // Intentionally keep existing block subscriptions alive. Canceling
703                        // waiters can have catastrophic consequences (nodes can get stuck in
704                        // different views) as actors do not retry subscriptions on failed channels.
705                    }
706                }
707            },
708            // Handle resolver messages last (batched up to max_repair, sync once)
709            Some(message) = resolver_rx.recv() else {
710                info!("handler closed, shutting down");
711                return;
712            } => {
713                // Drain up to max_repair messages: blocks handled immediately,
714                // certificates batched for verification, produces deferred.
715                let mut needs_sync = false;
716                let mut produces = Vec::new();
717                let mut delivers = Vec::new();
718                for msg in std::iter::once(message)
719                    .chain(std::iter::from_fn(|| resolver_rx.try_recv().ok()))
720                    .take(self.max_repair.get())
721                {
722                    match msg {
723                        handler::Message::Produce { key, response } => {
724                            produces.push((key, response));
725                        }
726                        handler::Message::Deliver {
727                            key,
728                            value,
729                            response,
730                        } => {
731                            needs_sync |= self
732                                .handle_deliver(
733                                    key,
734                                    value,
735                                    response,
736                                    &mut delivers,
737                                    &mut application,
738                                    &mut buffer,
739                                )
740                                .await;
741                        }
742                    }
743                }
744
745                // Batch verify and process all delivers.
746                needs_sync |= self
747                    .verify_delivered(delivers, &mut application, &mut buffer)
748                    .await;
749
750                // Attempt to fill gaps before handling produce requests (so we
751                // can serve data we just received).
752                needs_sync |= self
753                    .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
754                    .await;
755
756                // Sync archives before responding to peers (prioritize our own
757                // durability).
758                if needs_sync {
759                    self.sync_finalized().await;
760                }
761
762                // Handle produce requests in parallel.
763                join_all(
764                    produces
765                        .into_iter()
766                        .map(|(key, response)| self.handle_produce(key, response, &buffer)),
767                )
768                .await;
769            },
770        }
771    }
772
773    /// Handle a produce request from a remote peer.
774    async fn handle_produce<Buf: Buffer<V>>(
775        &self,
776        key: Request<V::Commitment>,
777        response: oneshot::Sender<Bytes>,
778        buffer: &Buf,
779    ) {
780        match key {
781            Request::Block(commitment) => {
782                let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
783                    debug!(?commitment, "block missing on request");
784                    return;
785                };
786                response.send_lossy(block.encode());
787            }
788            Request::Finalized { height } => {
789                let Some(finalization) = self.get_finalization_by_height(height).await else {
790                    debug!(%height, "finalization missing on request");
791                    return;
792                };
793                let Some(block) = self.get_finalized_block(height).await else {
794                    debug!(%height, "finalized block missing on request");
795                    return;
796                };
797                response.send_lossy((finalization, block).encode());
798            }
799            Request::Notarized { round } => {
800                let Some(notarization) = self.cache.get_notarization(round).await else {
801                    debug!(?round, "notarization missing on request");
802                    return;
803                };
804                let commitment = notarization.proposal.payload;
805                let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
806                    debug!(?commitment, "block missing on request");
807                    return;
808                };
809                response.send_lossy((notarization, block).encode());
810            }
811        }
812    }
813
814    /// Handle a local subscription request for a block.
815    async fn handle_subscribe<Buf: Buffer<V>>(
816        &mut self,
817        round: Option<Round>,
818        key: BlockSubscriptionKeyFor<V>,
819        response: oneshot::Sender<V::Block>,
820        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
821        waiters: &mut AbortablePool<Result<V::Block, BlockSubscriptionKeyFor<V>>>,
822        buffer: &mut Buf,
823    ) {
824        let digest = match key {
825            BlockSubscriptionKey::Digest(digest) => digest,
826            BlockSubscriptionKey::Commitment(commitment) => V::commitment_to_inner(commitment),
827        };
828
829        // Check for block locally.
830        let block = match key {
831            BlockSubscriptionKey::Digest(digest) => self.find_block_by_digest(buffer, digest).await,
832            BlockSubscriptionKey::Commitment(commitment) => {
833                self.find_block_by_commitment(buffer, commitment).await
834            }
835        };
836        if let Some(block) = block {
837            response.send_lossy(block);
838            return;
839        }
840
841        // We don't have the block locally, so fetch by round if we have one.
842        // If we only have a digest (no round), do not issue a resolver request:
843        // we would not know when to drop it, and it could stay pending forever
844        // for a block that never finalizes.
845        if let Some(round) = round {
846            if round < self.last_processed_round {
847                // At this point, we have failed to find the block locally, and
848                // we know that its round is less than the last processed round.
849                // This means that something else was finalized in that round,
850                // so we drop the response to indicate that the block may never
851                // be available.
852                return;
853            }
854            // Attempt to fetch the block (with notarization) from the resolver.
855            // If this is a valid view, this request should be fine to keep open
856            // until resolution or pruning (even if the oneshot is canceled).
857            debug!(?round, ?digest, "requested block missing");
858            resolver
859                .fetch(Request::<V::Commitment>::Notarized { round })
860                .await;
861        }
862
863        // Register subscriber.
864        match key {
865            BlockSubscriptionKey::Digest(digest) => {
866                debug!(?round, ?digest, "registering subscriber");
867            }
868            BlockSubscriptionKey::Commitment(commitment) => {
869                debug!(?round, ?commitment, ?digest, "registering subscriber");
870            }
871        }
872        match self.block_subscriptions.entry(key) {
873            Entry::Occupied(mut entry) => {
874                entry.get_mut().subscribers.push(response);
875            }
876            Entry::Vacant(entry) => {
877                let rx = match key {
878                    BlockSubscriptionKey::Digest(digest) => {
879                        buffer.subscribe_by_digest(digest).await
880                    }
881                    BlockSubscriptionKey::Commitment(commitment) => {
882                        buffer.subscribe_by_commitment(commitment).await
883                    }
884                };
885                let waiter_key = key;
886                let aborter = waiters.push(async move {
887                    rx.await
888                        .map_or_else(|_| Err(waiter_key), |block| Ok(block.into_block()))
889                });
890                entry.insert(BlockSubscription {
891                    subscribers: vec![response],
892                    _aborter: aborter,
893                });
894            }
895        }
896    }
897
898    /// Handle a deliver message from the resolver. Block delivers are handled
899    /// immediately. Finalized/Notarized delivers are parsed and structurally
900    /// validated, then collected into `delivers` for batch certificate verification.
901    /// Returns true if finalization archives were written and need syncing.
902    async fn handle_deliver<Buf: Buffer<V>>(
903        &mut self,
904        key: Request<V::Commitment>,
905        value: Bytes,
906        response: oneshot::Sender<bool>,
907        delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
908        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
909        buffer: &mut Buf,
910    ) -> bool {
911        match key {
912            Request::Block(commitment) => {
913                let Ok(block) = V::Block::decode_cfg(value.as_ref(), &self.block_codec_config)
914                else {
915                    response.send_lossy(false);
916                    return false;
917                };
918                if V::commitment(&block) != commitment {
919                    response.send_lossy(false);
920                    return false;
921                }
922
923                // Persist the block, also storing the finalization if we have it.
924                let height = block.height();
925                let digest = block.digest();
926                let finalization = self.cache.get_finalization_for(digest).await;
927                let wrote = self
928                    .store_finalization(height, digest, block, finalization, application, buffer)
929                    .await;
930                debug!(?digest, %height, "received block");
931                response.send_lossy(true); // if a valid block is received, we should still send true (even if it was stale)
932                wrote
933            }
934            Request::Finalized { height } => {
935                let Some(bounds) = self.epocher.containing(height) else {
936                    response.send_lossy(false);
937                    return false;
938                };
939                let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
940                    response.send_lossy(false);
941                    return false;
942                };
943
944                let Ok((finalization, block)) =
945                    <(Finalization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
946                        value,
947                        &(
948                            scheme.certificate_codec_config(),
949                            self.block_codec_config.clone(),
950                        ),
951                    )
952                else {
953                    response.send_lossy(false);
954                    return false;
955                };
956
957                let commitment = finalization.proposal.payload;
958                if block.height() != height
959                    || V::commitment(&block) != commitment
960                    || finalization.epoch() != bounds.epoch()
961                {
962                    response.send_lossy(false);
963                    return false;
964                }
965                delivers.push(PendingVerification::Finalized {
966                    finalization,
967                    block,
968                    response,
969                });
970                false
971            }
972            Request::Notarized { round } => {
973                let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
974                    response.send_lossy(false);
975                    return false;
976                };
977
978                let Ok((notarization, block)) =
979                    <(Notarization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
980                        value,
981                        &(
982                            scheme.certificate_codec_config(),
983                            self.block_codec_config.clone(),
984                        ),
985                    )
986                else {
987                    response.send_lossy(false);
988                    return false;
989                };
990
991                if notarization.round() != round
992                    || V::commitment(&block) != notarization.proposal.payload
993                {
994                    response.send_lossy(false);
995                    return false;
996                }
997                delivers.push(PendingVerification::Notarized {
998                    notarization,
999                    block,
1000                    response,
1001                });
1002                false
1003            }
1004        }
1005    }
1006
1007    /// Batch verify pending certificates and process valid items. Returns true
1008    /// if finalization archives were written and need syncing.
1009    async fn verify_delivered<Buf: Buffer<V>>(
1010        &mut self,
1011        mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1012        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1013        buffer: &mut Buf,
1014    ) -> bool {
1015        if delivers.is_empty() {
1016            return false;
1017        }
1018
1019        // Extract (subject, certificate) pairs for batch verification.
1020        let certs: Vec<_> = delivers
1021            .iter()
1022            .map(|item| match item {
1023                PendingVerification::Finalized { finalization, .. } => (
1024                    Subject::Finalize {
1025                        proposal: &finalization.proposal,
1026                    },
1027                    &finalization.certificate,
1028                ),
1029                PendingVerification::Notarized { notarization, .. } => (
1030                    Subject::Notarize {
1031                        proposal: &notarization.proposal,
1032                    },
1033                    &notarization.certificate,
1034                ),
1035            })
1036            .collect();
1037
1038        // Batch verify using the all-epoch verifier if available, otherwise
1039        // batch verify per epoch using scoped verifiers.
1040        let verified = if let Some(scheme) = self.provider.all() {
1041            verify_certificates(&mut self.context, scheme.as_ref(), &certs, &self.strategy)
1042        } else {
1043            let mut verified = vec![false; delivers.len()];
1044
1045            // Group indices by epoch.
1046            let mut by_epoch: BTreeMap<Epoch, Vec<usize>> = BTreeMap::new();
1047            for (i, item) in delivers.iter().enumerate() {
1048                let epoch = match item {
1049                    PendingVerification::Notarized { notarization, .. } => notarization.epoch(),
1050                    PendingVerification::Finalized { finalization, .. } => finalization.epoch(),
1051                };
1052                by_epoch.entry(epoch).or_default().push(i);
1053            }
1054
1055            // Batch verify each epoch group.
1056            for (epoch, indices) in &by_epoch {
1057                let Some(scheme) = self.provider.scoped(*epoch) else {
1058                    continue;
1059                };
1060                let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect();
1061                let results =
1062                    verify_certificates(&mut self.context, scheme.as_ref(), &group, &self.strategy);
1063                for (j, &idx) in indices.iter().enumerate() {
1064                    verified[idx] = results[j];
1065                }
1066            }
1067            verified
1068        };
1069
1070        // Process each verified item, rejecting unverified ones.
1071        let mut wrote = false;
1072        for (index, item) in delivers.drain(..).enumerate() {
1073            if !verified[index] {
1074                match item {
1075                    PendingVerification::Finalized { response, .. }
1076                    | PendingVerification::Notarized { response, .. } => {
1077                        response.send_lossy(false);
1078                    }
1079                }
1080                continue;
1081            }
1082            match item {
1083                PendingVerification::Finalized {
1084                    finalization,
1085                    block,
1086                    response,
1087                } => {
1088                    // Valid finalization received.
1089                    response.send_lossy(true);
1090                    let round = finalization.round();
1091                    let height = block.height();
1092                    let digest = block.digest();
1093                    debug!(?round, %height, "received finalization");
1094
1095                    wrote |= self
1096                        .store_finalization(
1097                            height,
1098                            digest,
1099                            block,
1100                            Some(finalization),
1101                            application,
1102                            buffer,
1103                        )
1104                        .await;
1105                }
1106                PendingVerification::Notarized {
1107                    notarization,
1108                    block,
1109                    response,
1110                } => {
1111                    // Valid notarization received.
1112                    response.send_lossy(true);
1113                    let round = notarization.round();
1114                    let commitment = notarization.proposal.payload;
1115                    let digest = V::commitment_to_inner(commitment);
1116                    debug!(?round, ?digest, "received notarization");
1117
1118                    // If there exists a finalization certificate for this block, we
1119                    // should finalize it. This could finalize the block faster when
1120                    // a notarization then a finalization are received via consensus
1121                    // and we resolve the notarization request before the block request.
1122                    let height = block.height();
1123                    if let Some(finalization) = self.cache.get_finalization_for(digest).await {
1124                        // SAFETY: `digest` identifies a unique `commitment`, so this
1125                        // cached finalization payload must match `V::commitment(&block)`.
1126                        wrote |= self
1127                            .store_finalization(
1128                                height,
1129                                digest,
1130                                block.clone(),
1131                                Some(finalization),
1132                                application,
1133                                buffer,
1134                            )
1135                            .await;
1136                    }
1137
1138                    // Cache the notarization and block.
1139                    self.cache_block(round, digest, block).await;
1140                    self.cache
1141                        .put_notarization(round, digest, notarization)
1142                        .await;
1143                }
1144            }
1145        }
1146
1147        wrote
1148    }
1149
1150    /// Returns a scheme suitable for verifying certificates at the given epoch.
1151    ///
1152    /// Prefers a certificate verifier if available, otherwise falls back
1153    /// to the scheme for the given epoch.
1154    fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
1155        self.provider.all().or_else(|| self.provider.scoped(epoch))
1156    }
1157
1158    // -------------------- Waiters --------------------
1159
1160    /// Notify any subscribers for the given digest with the provided block.
1161    fn notify_subscribers(&mut self, block: &V::Block) {
1162        if let Some(mut bs) = self
1163            .block_subscriptions
1164            .remove(&BlockSubscriptionKey::Digest(block.digest()))
1165        {
1166            for subscriber in bs.subscribers.drain(..) {
1167                subscriber.send_lossy(block.clone());
1168            }
1169        }
1170        if let Some(mut bs) = self
1171            .block_subscriptions
1172            .remove(&BlockSubscriptionKey::Commitment(V::commitment(block)))
1173        {
1174            for subscriber in bs.subscribers.drain(..) {
1175                subscriber.send_lossy(block.clone());
1176            }
1177        }
1178    }
1179
1180    // -------------------- Application Dispatch --------------------
1181
1182    /// Attempt to dispatch the next finalized block to the application if ready.
1183    ///
1184    /// Dispatch finalized blocks to the application until the pipeline is full
1185    /// or no more blocks are available.
1186    ///
1187    /// This does NOT advance `last_processed_height` or sync metadata. It only
1188    /// sends blocks to the application and enqueues pending acks. Metadata is
1189    /// updated later, in a subsequent `select_loop!` iteration, when acks
1190    /// arrive and [`Self::handle_block_processed`] calls
1191    /// [`Self::update_processed_height`].
1192    ///
1193    /// Acks are processed in FIFO order so `last_processed_height` always
1194    /// advances sequentially.
1195    ///
1196    /// # Crash safety
1197    ///
1198    /// Because `select_loop!` arms run to completion, the caller's
1199    /// [`Self::sync_finalized`] always executes before the ack handler runs. This
1200    /// guarantees archive data is durable before `last_processed_height`
1201    /// advances:
1202    ///
1203    /// ```text
1204    /// Iteration N (caller):
1205    ///   store_finalization  ->  Archive::put (buffered)
1206    ///   try_dispatch_blocks  ->  sends blocks to app, enqueues pending acks
1207    ///   sync_finalized      ->  archive durable
1208    ///
1209    /// Iteration M (ack handler, M > N):
1210    ///   handle_block_processed   ->  update_processed_height  ->  metadata buffered
1211    ///   application_metadata.sync ->  metadata durable
1212    /// ```
1213    async fn try_dispatch_blocks(
1214        &mut self,
1215        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1216    ) {
1217        while self.pending_acks.has_capacity() {
1218            let next_height = self
1219                .pending_acks
1220                .next_dispatch_height(self.last_processed_height);
1221            let Some(block) = self.get_finalized_block(next_height).await else {
1222                return;
1223            };
1224            assert_eq!(
1225                block.height(),
1226                next_height,
1227                "finalized block height mismatch"
1228            );
1229
1230            let (height, commitment) = (block.height(), V::commitment(&block));
1231            let (ack, ack_waiter) = A::handle();
1232            application
1233                .report(Update::Block(V::into_inner(block), ack))
1234                .await;
1235            self.pending_acks.enqueue(PendingAck {
1236                height,
1237                commitment,
1238                receiver: ack_waiter,
1239            });
1240        }
1241    }
1242
1243    /// Handle acknowledgement from the application that a block has been processed.
1244    ///
1245    /// Buffers the processed height update but does NOT sync to durable storage.
1246    /// The caller must sync metadata after processing all ready acks.
1247    async fn handle_block_processed(
1248        &mut self,
1249        height: Height,
1250        commitment: V::Commitment,
1251        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1252    ) {
1253        // Update the processed height (buffered, not synced)
1254        self.update_processed_height(height, resolver).await;
1255
1256        // Cancel any useless requests
1257        resolver
1258            .cancel(Request::<V::Commitment>::Block(commitment))
1259            .await;
1260
1261        if let Some(finalization) = self.get_finalization_by_height(height).await {
1262            // Trail the previous processed finalized block by the timeout
1263            let lpr = self.last_processed_round;
1264            let prune_round = Round::new(
1265                lpr.epoch(),
1266                lpr.view().saturating_sub(self.view_retention_timeout),
1267            );
1268
1269            // Prune archives
1270            self.cache.prune(prune_round).await;
1271
1272            // Update the last processed round
1273            let round = finalization.round();
1274            self.last_processed_round = round;
1275
1276            // Cancel useless requests
1277            resolver
1278                .retain(Request::<V::Commitment>::Notarized { round }.predicate())
1279                .await;
1280        }
1281    }
1282
1283    // -------------------- Prunable Storage --------------------
1284
1285    /// Add a verified block to the prunable archive.
1286    async fn cache_verified(
1287        &mut self,
1288        round: Round,
1289        digest: <V::Block as Digestible>::Digest,
1290        block: V::Block,
1291    ) {
1292        self.notify_subscribers(&block);
1293        self.cache.put_verified(round, digest, block.into()).await;
1294    }
1295
1296    /// Add a notarized block to the prunable archive.
1297    async fn cache_block(
1298        &mut self,
1299        round: Round,
1300        digest: <V::Block as Digestible>::Digest,
1301        block: V::Block,
1302    ) {
1303        self.notify_subscribers(&block);
1304        self.cache.put_block(round, digest, block.into()).await;
1305    }
1306
1307    /// Sync both finalization archives to durable storage.
1308    ///
1309    /// Must be called within the same `select_loop!` arm as any preceding
1310    /// [`Self::store_finalization`] / [`Self::try_repair_gaps`] writes, before yielding back
1311    /// to the loop. This ensures archives are durable before the ack handler
1312    /// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for details.
1313    async fn sync_finalized(&mut self) {
1314        if let Err(e) = try_join!(
1315            async {
1316                self.finalized_blocks.sync().await.map_err(Box::new)?;
1317                Ok::<_, BoxedError>(())
1318            },
1319            async {
1320                self.finalizations_by_height
1321                    .sync()
1322                    .await
1323                    .map_err(Box::new)?;
1324                Ok::<_, BoxedError>(())
1325            },
1326        ) {
1327            panic!("failed to sync finalization archives: {e}");
1328        }
1329    }
1330
1331    // -------------------- Immutable Storage --------------------
1332
1333    /// Get a finalized block from the immutable archive.
1334    async fn get_finalized_block(&self, height: Height) -> Option<V::Block> {
1335        match self
1336            .finalized_blocks
1337            .get(ArchiveID::Index(height.get()))
1338            .await
1339        {
1340            Ok(stored) => stored.map(|stored| stored.into()),
1341            Err(e) => panic!("failed to get block: {e}"),
1342        }
1343    }
1344
1345    /// Get a finalization from the archive by height.
1346    async fn get_finalization_by_height(
1347        &self,
1348        height: Height,
1349    ) -> Option<Finalization<P::Scheme, V::Commitment>> {
1350        match self
1351            .finalizations_by_height
1352            .get(ArchiveID::Index(height.get()))
1353            .await
1354        {
1355            Ok(finalization) => finalization,
1356            Err(e) => panic!("failed to get finalization: {e}"),
1357        }
1358    }
1359
1360    /// Add a finalized block, and optionally a finalization, to the archive.
1361    ///
1362    /// After persisting the block, attempt to dispatch the next contiguous block to the application.
1363    ///
1364    /// Writes are buffered and not synced. The caller must call
1365    /// [sync_finalized](Self::sync_finalized) before yielding to the
1366    /// `select_loop!` so that archive data is durable before the ack handler
1367    /// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for the
1368    /// crash safety invariant.
1369    async fn store_finalization<Buf: Buffer<V>>(
1370        &mut self,
1371        height: Height,
1372        digest: <V::Block as Digestible>::Digest,
1373        block: V::Block,
1374        finalization: Option<Finalization<P::Scheme, V::Commitment>>,
1375        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1376        buffer: &mut Buf,
1377    ) -> bool {
1378        // Blocks below the last processed height are not useful to us, so we ignore them (this
1379        // has the nice byproduct of ensuring we don't call a backing store with a block below the
1380        // pruning boundary)
1381        if height <= self.last_processed_height {
1382            debug!(
1383                %height,
1384                floor = %self.last_processed_height,
1385                ?digest,
1386                "dropping finalization at or below processed height floor"
1387            );
1388            return false;
1389        }
1390        self.notify_subscribers(&block);
1391
1392        // Convert block to storage format
1393        let commitment = V::commitment(&block);
1394        let stored: V::StoredBlock = block.into();
1395        let round = finalization.as_ref().map(|f| f.round());
1396
1397        // In parallel, update the finalized blocks and finalizations archives
1398        if let Err(e) = try_join!(
1399            // Update the finalized blocks archive
1400            async {
1401                self.finalized_blocks.put(stored).await.map_err(Box::new)?;
1402                Ok::<_, BoxedError>(())
1403            },
1404            // Update the finalizations archive (if provided)
1405            async {
1406                if let Some(finalization) = finalization {
1407                    self.finalizations_by_height
1408                        .put(height, digest, finalization)
1409                        .await
1410                        .map_err(Box::new)?;
1411                }
1412                Ok::<_, BoxedError>(())
1413            }
1414        ) {
1415            panic!("failed to finalize: {e}");
1416        }
1417
1418        // Update metrics, buffer, and application
1419        if let Some(round) = round.filter(|_| height > self.tip) {
1420            application.report(Update::Tip(round, height, digest)).await;
1421            self.tip = height;
1422            let _ = self.finalized_height.try_set(height.get());
1423        }
1424        buffer.finalized(commitment).await;
1425        self.try_dispatch_blocks(application).await;
1426
1427        true
1428    }
1429
1430    /// Get the latest finalized block information (height and digest tuple).
1431    ///
1432    /// Blocks are only finalized directly with a finalization or indirectly via a descendant
1433    /// block's finalization. Thus, the highest known finalized block must itself have a direct
1434    /// finalization.
1435    ///
1436    /// We return the height and digest using the highest known finalization that we know the
1437    /// block height for. While it's possible that we have a later finalization, if we do not have
1438    /// the full block for that finalization, we do not know its height and therefore it would not
1439    /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
1440    /// should have the associated block (in the `finalized_blocks` archive) for the information
1441    /// returned.
1442    async fn get_latest(&mut self) -> Option<(Height, <V::Block as Digestible>::Digest, Round)> {
1443        let height = self.finalizations_by_height.last_index()?;
1444        let finalization = self
1445            .get_finalization_by_height(height)
1446            .await
1447            .expect("finalization missing");
1448        Some((
1449            height,
1450            V::commitment_to_inner(finalization.proposal.payload),
1451            finalization.round(),
1452        ))
1453    }
1454
1455    // -------------------- Mixed Storage --------------------
1456
1457    /// Looks for a block in cache and finalized storage by digest.
1458    async fn find_block_in_storage(
1459        &self,
1460        digest: <V::Block as Digestible>::Digest,
1461    ) -> Option<V::Block> {
1462        // Check verified / notarized blocks via cache manager.
1463        if let Some(block) = self.cache.find_block(digest).await {
1464            return Some(block.into());
1465        }
1466        // Check finalized blocks.
1467        match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1468            Ok(stored) => stored.map(|stored| stored.into()),
1469            Err(e) => panic!("failed to get block: {e}"),
1470        }
1471    }
1472
1473    /// Looks for a block anywhere in local storage using only the digest.
1474    ///
1475    /// This is used when we only have a digest (e.g., during gap repair following
1476    /// parent links).
1477    async fn find_block_by_digest<Buf: Buffer<V>>(
1478        &self,
1479        buffer: &mut Buf,
1480        digest: <V::Block as Digestible>::Digest,
1481    ) -> Option<V::Block> {
1482        if let Some(block) = buffer.find_by_digest(digest).await {
1483            return Some(block.into_block());
1484        }
1485        self.find_block_in_storage(digest).await
1486    }
1487
1488    /// Looks for a block anywhere in local storage using the full commitment.
1489    ///
1490    /// This is used when we have a full commitment (e.g., from notarizations/finalizations).
1491    /// Having the full commitment may enable additional retrieval mechanisms.
1492    async fn find_block_by_commitment<Buf: Buffer<V>>(
1493        &self,
1494        buffer: &Buf,
1495        commitment: V::Commitment,
1496    ) -> Option<V::Block> {
1497        if let Some(block) = buffer.find_by_commitment(commitment).await {
1498            return Some(block.into_block());
1499        }
1500        self.find_block_in_storage(V::commitment_to_inner(commitment))
1501            .await
1502    }
1503
1504    /// Attempt to repair any identified gaps in the finalized blocks archive. The total
1505    /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
1506    /// though multiple gaps may be spanned.
1507    ///
1508    /// Writes are buffered. Returns `true` if this call wrote repaired blocks and
1509    /// needs a subsequent [`sync_finalized`](Self::sync_finalized).
1510    async fn try_repair_gaps<Buf: Buffer<V>>(
1511        &mut self,
1512        buffer: &mut Buf,
1513        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1514        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1515    ) -> bool {
1516        let mut wrote = false;
1517        let start = self.last_processed_height.next();
1518        'cache_repair: loop {
1519            let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1520                // No gaps detected
1521                return wrote;
1522            };
1523
1524            // Attempt to repair the gap backwards from the end of the gap, using
1525            // blocks from our local storage.
1526            let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1527                panic!("gapped block missing that should exist: {gap_end}");
1528            };
1529
1530            // Compute the lower bound of the recursive repair. `gap_start` is `Some`
1531            // if `start` is not in a gap. We add one to it to ensure we don't
1532            // re-persist it to the database in the repair loop below.
1533            let gap_start = gap_start.map(Height::next).unwrap_or(start);
1534
1535            // Iterate backwards, repairing blocks as we go.
1536            while cursor.height() > gap_start {
1537                let parent_digest = cursor.parent();
1538                let parent_commitment = V::parent_commitment(&cursor);
1539                if let Some(block) = self
1540                    .find_block_by_commitment(buffer, parent_commitment)
1541                    .await
1542                {
1543                    let finalization = self.cache.get_finalization_for(parent_digest).await;
1544                    wrote |= self
1545                        .store_finalization(
1546                            block.height(),
1547                            parent_digest,
1548                            block.clone(),
1549                            finalization,
1550                            application,
1551                            buffer,
1552                        )
1553                        .await;
1554                    debug!(height = %block.height(), "repaired block");
1555                    cursor = block;
1556                } else {
1557                    // Request the next missing commitment.
1558                    //
1559                    // SAFETY: We can rely on this derived parent commitment because
1560                    // the block is provably a member of the finalized chain due to the end
1561                    // boundary of the gap being finalized.
1562                    resolver
1563                        .fetch(Request::<V::Commitment>::Block(parent_commitment))
1564                        .await;
1565                    break 'cache_repair;
1566                }
1567            }
1568        }
1569
1570        // Request any finalizations for missing items in the archive, up to
1571        // the `max_repair` quota. This may help shrink the size of the gap
1572        // closest to the application's processed height if finalizations
1573        // for the requests' heights exist. If not, we rely on the recursive
1574        // digest fetches above.
1575        let missing_items = self
1576            .finalized_blocks
1577            .missing_items(start, self.max_repair.get());
1578        let requests = missing_items
1579            .into_iter()
1580            .map(|height| Request::<V::Commitment>::Finalized { height })
1581            .collect::<Vec<_>>();
1582        if !requests.is_empty() {
1583            resolver.fetch_all(requests).await
1584        }
1585        wrote
1586    }
1587
1588    /// Buffers a processed height update in memory and metrics. Does NOT sync
1589    /// to durable storage. Sync metadata after buffered updates to make them durable.
1590    async fn update_processed_height(
1591        &mut self,
1592        height: Height,
1593        resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1594    ) {
1595        self.application_metadata.put(LATEST_KEY, height);
1596        self.last_processed_height = height;
1597        let _ = self
1598            .processed_height
1599            .try_set(self.last_processed_height.get());
1600
1601        // Cancel any existing requests below the new floor.
1602        resolver
1603            .retain(Request::<V::Commitment>::Finalized { height }.predicate())
1604            .await;
1605    }
1606
1607    /// Prunes finalized blocks and certificates below the given height.
1608    async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
1609        try_join!(
1610            async {
1611                self.finalized_blocks
1612                    .prune(height)
1613                    .await
1614                    .map_err(Box::new)?;
1615                Ok::<_, BoxedError>(())
1616            },
1617            async {
1618                self.finalizations_by_height
1619                    .prune(height)
1620                    .await
1621                    .map_err(Box::new)?;
1622                Ok::<_, BoxedError>(())
1623            }
1624        )?;
1625        Ok(())
1626    }
1627}