Skip to main content

commonware_consensus/marshal/core/
actor.rs

1use super::{
2    acks::{PendingAck, PendingAcks},
3    cache,
4    delivery::PendingVerification,
5    floor::Floor,
6    mailbox::{CommitmentFallback, Mailbox, Message},
7    stream::Stream,
8    subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
9    variant::NoBuffer,
10    Buffer, Variant,
11};
12use crate::{
13    marshal::{
14        resolver::handler::{self, Annotation, Key, Request},
15        store::{Blocks, Certificates},
16        Config, Identifier as BlockID, Start, Update,
17    },
18    simplex::{
19        scheme::Scheme,
20        types::{verify_certificates, Finalization, Notarization, Subject},
21    },
22    types::{Epoch, Epocher, Height, Round, ViewDelta},
23    Block, Epochable, Heightable, Reporter,
24};
25use bytes::Bytes;
26use commonware_actor::mailbox;
27use commonware_codec::{Decode, Encode, Read};
28use commonware_cryptography::{
29    certificate::{Provider, Scheme as CertificateScheme},
30    Digestible,
31};
32use commonware_macros::select_loop;
33use commonware_p2p::Recipients;
34use commonware_parallel::Strategy;
35use commonware_resolver::{Delivery, Resolver, TargetedResolver};
36use commonware_runtime::{
37    spawn_cell,
38    telemetry::metrics::{Gauge, GaugeExt, MetricsExt as _},
39    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
40};
41use commonware_storage::archive::Identifier as ArchiveID;
42use commonware_utils::{
43    acknowledgement::Exact,
44    channel::{fallible::OneshotExt, oneshot},
45    futures::AbortablePool,
46    Acknowledgement, BoxedError,
47};
48use futures::{future::join_all, try_join};
49use rand_core::CryptoRngCore;
50use std::{collections::BTreeMap, future::Future, num::NonZeroUsize, sync::Arc};
51use tracing::{debug, warn};
52
53// Resolver request keys are expressed in the variant commitment type, which
54// may differ from the block digest for coded variants.
55type ResolverRequestFor<V> = Key<<V as Variant>::Commitment>;
56
57// A resolver delivery plus the peer-validity response channel. Local
58// annotations on the delivery decide how accepted data is used.
59struct ResolverDelivery<V: Variant> {
60    delivery: Delivery<ResolverRequestFor<V>, Annotation>,
61    value: Bytes,
62    response: oneshot::Sender<bool>,
63}
64
65/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
66/// receiving notarizations and finalizations from consensus, and reconstructing a total order
67/// of blocks.
68///
69/// The actor is designed to be used in a view-based model. Each view corresponds to a
70/// potential block in the chain. The actor will only finalize a block if it has a
71/// corresponding finalization.
72///
73/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
74/// finalization for a block that is ahead of its current view, it will request the missing blocks
75/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
76/// behind.
77pub struct Actor<E, V, P, FC, FB, ES, T, A = Exact>
78where
79    E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
80    V: Variant,
81    P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
82    FC: Certificates<
83        BlockDigest = <V::Block as Digestible>::Digest,
84        Commitment = V::Commitment,
85        Scheme = P::Scheme,
86    >,
87    FB: Blocks<Block = V::StoredBlock>,
88    ES: Epocher,
89    T: Strategy,
90    A: Acknowledgement,
91{
92    // ---------- Context ----------
93    context: ContextCell<E>,
94
95    // ---------- Message Passing ----------
96    // Mailbox
97    mailbox: mailbox::Receiver<Message<P::Scheme, V>>,
98
99    // ---------- Configuration ----------
100    // Provider for epoch-specific signing schemes
101    provider: P,
102    // Epoch configuration
103    epocher: ES,
104    // Minimum number of views to retain temporary data after the application processes a block
105    view_retention_timeout: ViewDelta,
106    // Maximum number of blocks to repair at once
107    max_repair: NonZeroUsize,
108    // Codec configuration for block type
109    block_codec_config: <V::ApplicationBlock as Read>::Cfg,
110    // Strategy for parallel operations
111    strategy: T,
112
113    // ---------- State ----------
114    // Last proposed block
115    last_proposed_block: Option<(Round, V::Commitment, V::Block)>,
116    // Current processed floor and any pending floor update
117    floor: Floor<P::Scheme, V::Commitment>,
118    // Application delivery cursor
119    stream: Stream<E>,
120    // Pending application acknowledgements
121    pending_acks: PendingAcks<V, A>,
122    // Highest known finalized height
123    tip: Height,
124    // Outstanding subscriptions for blocks
125    block_subscriptions: Subscriptions<V>,
126
127    // ---------- Storage ----------
128    // Prunable cache
129    cache: cache::Manager<E, V, P::Scheme>,
130    // Finalizations stored by height
131    finalizations_by_height: FC,
132    // Finalized blocks stored by height
133    finalized_blocks: FB,
134
135    // ---------- Metrics ----------
136    // Latest height metric
137    finalized_height: Gauge,
138    // Latest processed height
139    processed_height: Gauge,
140}
141
142impl<E, V, P, FC, FB, ES, T, A> Actor<E, V, P, FC, FB, ES, T, A>
143where
144    E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
145    V: Variant,
146    P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
147    FC: Certificates<
148        BlockDigest = <V::Block as Digestible>::Digest,
149        Commitment = V::Commitment,
150        Scheme = P::Scheme,
151    >,
152    FB: Blocks<Block = V::StoredBlock>,
153    ES: Epocher,
154    T: Strategy,
155    A: Acknowledgement,
156{
157    /// Create a new application actor.
158    pub async fn init(
159        context: E,
160        finalizations_by_height: FC,
161        mut finalized_blocks: FB,
162        config: Config<P, ES, T, V::ApplicationBlock, V::Block, V::Commitment>,
163    ) -> (Self, Mailbox<P::Scheme, V>, Option<Height>) {
164        // Initialize cache
165        let prunable_config = cache::Config {
166            partition_prefix: format!("{}-cache", config.partition_prefix),
167            prunable_items_per_section: config.prunable_items_per_section,
168            replay_buffer: config.replay_buffer,
169            key_write_buffer: config.key_write_buffer,
170            value_write_buffer: config.value_write_buffer,
171            key_page_cache: config.page_cache.clone(),
172        };
173        let cache = cache::Manager::init(
174            context.child("cache"),
175            prunable_config,
176            config.block_codec_config.clone(),
177        )
178        .await;
179
180        // The application metadata name is retained for legacy support.
181        let application_metadata_partition =
182            format!("{}-application-metadata", config.partition_prefix);
183        let stream = Stream::new(context.child("stream"), &application_metadata_partition).await;
184        let last_processed_height = stream.processed_height();
185
186        // Genesis is a local anchor. A floor finalization is verified and
187        // resolved after `run` receives the resolver and buffer.
188        let pending_floor_anchor = match config.start {
189            Start::Genesis(anchor) => {
190                assert_eq!(
191                    anchor.height(),
192                    Height::zero(),
193                    "genesis anchor must be at height zero"
194                );
195                Self::ensure_genesis_anchor(&mut finalized_blocks, anchor, last_processed_height)
196                    .await;
197                None
198            }
199            Start::Floor(finalization) => Some(finalization),
200        };
201        let last_processed_round =
202            Self::latest_processed_round(&finalizations_by_height, last_processed_height).await;
203
204        // Create metrics
205        let finalized_height = context.gauge("finalized_height", "Finalized height of application");
206        let processed_height = context.gauge("processed_height", "Processed height of application");
207        if let Some(last_processed_height) = last_processed_height {
208            let _ = processed_height.try_set(last_processed_height.get());
209        }
210        let floor = pending_floor_anchor.map_or_else(
211            || Floor::resolved(last_processed_height, last_processed_round),
212            |finalization| {
213                Floor::awaiting_anchor(last_processed_height, last_processed_round, finalization)
214            },
215        );
216
217        // Initialize mailbox
218        let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
219        (
220            Self {
221                context: ContextCell::new(context),
222                mailbox,
223                provider: config.provider,
224                epocher: config.epocher,
225                view_retention_timeout: config.view_retention_timeout,
226                max_repair: config.max_repair,
227                block_codec_config: config.block_codec_config,
228                strategy: config.strategy,
229                last_proposed_block: None,
230                floor,
231                stream,
232                pending_acks: PendingAcks::new(config.max_pending_acks.get()),
233                tip: Height::zero(),
234                block_subscriptions: Subscriptions::new(),
235                cache,
236                finalizations_by_height,
237                finalized_blocks,
238                finalized_height,
239                processed_height,
240            },
241            Mailbox::new(sender),
242            last_processed_height,
243        )
244    }
245
246    async fn ensure_genesis_anchor(
247        finalized_blocks: &mut FB,
248        anchor: V::Block,
249        last_processed_height: Option<Height>,
250    ) {
251        let anchor_height = anchor.height();
252        let anchor_commitment = V::commitment(&anchor);
253        match finalized_blocks
254            .get(ArchiveID::Index(anchor_height.get()))
255            .await
256        {
257            Ok(Some(stored)) => {
258                let stored: V::Block = stored.into();
259                assert_eq!(
260                    stored.height(),
261                    anchor_height,
262                    "stored genesis block height mismatch"
263                );
264                assert!(
265                    V::commitment(&stored) == anchor_commitment,
266                    "stored genesis block does not match configured anchor"
267                );
268            }
269            Ok(None) => {
270                if let Some(existing) =
271                    last_processed_height.filter(|height| anchor_height < *height)
272                {
273                    warn!(
274                        height = %anchor_height,
275                        %existing,
276                        "ignoring stale anchor"
277                    );
278                    return;
279                }
280
281                finalized_blocks
282                    .put(anchor.into())
283                    .await
284                    .expect("failed to store startup anchor");
285                finalized_blocks
286                    .sync()
287                    .await
288                    .expect("failed to sync startup anchor");
289                debug!(height = %anchor_height, "stored genesis block");
290            }
291            Err(err) => panic!("failed to check startup anchor: {err}"),
292        }
293    }
294
295    /// Start the actor.
296    pub fn start<R, Buf>(
297        mut self,
298        application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
299        buffer: Buf,
300        resolver: (handler::Receiver<V::Commitment>, R),
301    ) -> Handle<()>
302    where
303        R: TargetedResolver<
304            Key = ResolverRequestFor<V>,
305            Subscriber = Annotation,
306            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
307        >,
308        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
309    {
310        spawn_cell!(self.context, self.run(application, buffer, resolver))
311    }
312
313    /// Start the actor without a broadcast buffer.
314    pub fn start_unbuffered<R>(
315        self,
316        application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
317        resolver: (handler::Receiver<V::Commitment>, R),
318    ) -> Handle<()>
319    where
320        R: TargetedResolver<
321            Key = ResolverRequestFor<V>,
322            Subscriber = Annotation,
323            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
324        >,
325    {
326        self.start(
327            application,
328            NoBuffer::<<P::Scheme as CertificateScheme>::PublicKey>::new(),
329            resolver,
330        )
331    }
332
333    /// Run the application actor.
334    async fn run<R, Buf>(
335        mut self,
336        mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
337        mut buffer: Buf,
338        (mut resolver_rx, mut resolver): (handler::Receiver<V::Commitment>, R),
339    ) where
340        R: TargetedResolver<
341            Key = ResolverRequestFor<V>,
342            Subscriber = Annotation,
343            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
344        >,
345        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
346    {
347        // Create a local pool for waiter futures.
348        let mut waiters = AbortablePool::<Result<V::Block, SubscriptionKeyFor<V>>>::default();
349
350        // Get tip and send to application
351        let tip = self.get_latest().await;
352        if let Some((height, digest, round)) = tip {
353            application.report(Update::Tip(round, height, digest));
354            self.tip = height;
355            let _ = self.finalized_height.try_set(height.get());
356        }
357
358        // Load persisted cache epochs so find_block can discover blocks
359        // written before the last shutdown.
360        self.cache.load_persisted_epochs().await;
361
362        // A configured floor follows the same path as `SetFloor`: verify it,
363        // then apply a local anchor or fetch the anchor block.
364        if let Some(finalization) = self.floor.take_pending_anchor() {
365            self.install_floor(
366                finalization,
367                false,
368                &mut resolver,
369                &mut buffer,
370                &mut application,
371            )
372            .await;
373        }
374
375        // Attempt to repair any gaps in the finalized blocks archive, if there are any.
376        if self
377            .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
378            .await
379        {
380            self.sync_finalized().await;
381        }
382
383        // Attempt to dispatch the next finalized block to the application, if it is ready.
384        self.try_dispatch_blocks(&mut application).await;
385
386        select_loop! {
387            self.context,
388            on_start => {
389                // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
390                self.block_subscriptions.retain_open();
391            },
392            on_stopped => {
393                debug!("context shutdown, stopping marshal");
394            },
395            // Handle waiter completions first
396            Ok(completion) = waiters.next_completed() else continue => match completion {
397                Ok(block) => self.block_subscriptions.notify(&block),
398                Err(key) => {
399                    match key {
400                        SubscriptionKey::Digest(digest) => {
401                            debug!(
402                                ?digest,
403                                "buffer subscription closed, canceling local subscribers"
404                            );
405                        }
406                        SubscriptionKey::Commitment(commitment) => {
407                            debug!(
408                                ?commitment,
409                                "buffer subscription closed, canceling local subscribers"
410                            );
411                        }
412                    }
413                    self.block_subscriptions.remove(&key);
414                }
415            },
416            // Handle application acknowledgements (drain all ready acks, sync once)
417            result = self.pending_acks.current() => {
418                self.handle_ack(result, &mut application, &mut buffer, &mut resolver)
419                    .await;
420            },
421            // Handle consensus inputs before backfill or resolver traffic
422            Some(message) = self.mailbox.recv() else {
423                debug!("mailbox closed, shutting down");
424                break;
425            } => {
426                self.handle_mailbox_message(
427                    message,
428                    &mut resolver,
429                    &mut waiters,
430                    &mut buffer,
431                    &mut application,
432                )
433                .await;
434            },
435            // Handle resolver messages last (batched up to max_repair, sync once)
436            Some(message) = resolver_rx.recv() else {
437                debug!("handler closed, shutting down");
438                return;
439            } => {
440                self.handle_resolver_message(
441                    message,
442                    &mut resolver_rx,
443                    &mut resolver,
444                    &mut buffer,
445                    &mut application,
446                )
447                .await;
448            },
449        }
450    }
451
452    /// Handles one ready application acknowledgement and drains any queued acks
453    /// that are already complete.
454    async fn handle_ack<Buf, R>(
455        &mut self,
456        result: <A::Waiter as Future>::Output,
457        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
458        buffer: &mut Buf,
459        resolver: &mut R,
460    ) where
461        Buf: Buffer<V>,
462        R: Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
463    {
464        // Start with the ack that woke this `select_loop!` arm.
465        let mut pending = Some(self.pending_acks.complete_current(result));
466        let last_acked_commitment = loop {
467            let (height, commitment, result) = pending.take().expect("pending ack must exist");
468            match result {
469                Ok(()) => {
470                    // Apply in-memory progress updates for this acknowledged
471                    // block. The metadata sync below makes drained updates durable.
472                    self.update_processed_height(height, resolver);
473                    self.update_processed_round(height, resolver).await;
474                }
475                Err(e) => {
476                    // Ack failures are fatal for marshal/application coordination.
477                    panic!("application did not acknowledge block at height {height}: {e:?}");
478                }
479            }
480
481            // Opportunistically drain any additional already-ready acks so we
482            // can persist one metadata sync for the whole batch below.
483            match self.pending_acks.pop_ready() {
484                Some(next) => pending = Some(next),
485                None => break commitment,
486            }
487        };
488
489        // Persist buffered progress updates once after draining all ready acks.
490        self.stream
491            .sync()
492            .await
493            .expect("failed to sync application progress");
494
495        // Anything below the last acknowledged commitment is safe for the
496        // buffer to prune.
497        buffer.finalized(last_acked_commitment);
498
499        // Refill the application dispatch pipeline.
500        self.try_dispatch_blocks(application).await;
501    }
502
503    /// Handles a single mailbox message from local consensus/application callers.
504    async fn handle_mailbox_message<Buf, R>(
505        &mut self,
506        message: Message<P::Scheme, V>,
507        resolver: &mut R,
508        waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
509        buffer: &mut Buf,
510        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
511    ) where
512        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
513        R: TargetedResolver<
514            Key = ResolverRequestFor<V>,
515            Subscriber = Annotation,
516            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
517        >,
518    {
519        if message.response_closed() {
520            return;
521        }
522
523        match message {
524            Message::GetInfo {
525                identifier,
526                response,
527            } => {
528                let info = match identifier {
529                    // TODO: Instead of pulling out the entire block, determine the
530                    // height directly from the archive by mapping the digest to
531                    // the index, which is the same as the height.
532                    BlockID::Digest(digest) => self
533                        .finalized_blocks
534                        .get(ArchiveID::Key(&digest))
535                        .await
536                        .ok()
537                        .flatten()
538                        .map(|b| (b.height(), digest)),
539                    BlockID::Height(height) => self.get_info_by_height(height).await,
540                    BlockID::Latest => self.get_latest().await.map(|(h, d, _)| (h, d)),
541                };
542                response.send_lossy(info);
543            }
544            Message::GetVerified { round, response } => {
545                let block = self.cache.get_verified(round).await.map(Into::into);
546                response.send_lossy(block);
547            }
548            Message::Forward {
549                round,
550                commitment,
551                recipients,
552            } => {
553                if matches!(&recipients, Recipients::Some(peers) if peers.is_empty()) {
554                    return;
555                }
556                let block = match self.take_proposed(round, commitment) {
557                    Some(block) => block,
558                    None => {
559                        let Some(block) = self.find_block_by_commitment(buffer, commitment).await
560                        else {
561                            debug!(?commitment, "block not found for forwarding");
562                            return;
563                        };
564                        block
565                    }
566                };
567                buffer.send(round, block, recipients);
568            }
569            Message::Proposed { round, block, ack } => {
570                // If the round has already been pruned by tip advancement,
571                // `cache_verified` is a no-op because the round is below
572                // the retention floor (and no longer is required by consensus
573                // to make progress).
574                self.cache_verified(round, block.digest(), block.clone())
575                    .await;
576                self.apply_floor_anchor(&block, buffer, application, resolver)
577                    .await;
578
579                // Retain the block in memory so the subsequent `Forward` can
580                // broadcast it without reloading from storage. An older retained
581                // proposal (if any) is overwritten.
582                let commitment = V::commitment(&block);
583                self.last_proposed_block = Some((round, commitment, block));
584                ack.expect("durable ack present").send_lossy(());
585            }
586            Message::Verified { round, block, ack } => {
587                // If the round has already been pruned by tip advancement,
588                // `cache_verified` is a no-op because the round is below
589                // the retention floor (and no longer is required by consensus
590                // to make progress).
591                self.cache_verified(round, block.digest(), block.clone())
592                    .await;
593                self.apply_floor_anchor(&block, buffer, application, resolver)
594                    .await;
595                ack.expect("durable ack present").send_lossy(());
596            }
597            Message::Certified { round, block, ack } => {
598                // If the round has already been pruned by tip advancement,
599                // `cache_block` is a no-op because the round is below
600                // the retention floor (and no longer is required by consensus
601                // to make progress).
602                self.cache_block(round, block.digest(), block.clone()).await;
603                self.apply_floor_anchor(&block, buffer, application, resolver)
604                    .await;
605                ack.expect("durable ack present").send_lossy(());
606            }
607            Message::Notarization { notarization } => {
608                let round = notarization.round();
609                let commitment = notarization.proposal.payload;
610                let digest = V::commitment_to_inner(commitment);
611
612                // Cache notarization by round.
613                self.cache
614                    .put_notarization(round, digest, notarization.clone())
615                    .await;
616
617                // A notarization alone is not enough to fetch missing proposal
618                // data. If the block is not locally available, remember the
619                // certificate and wait for a later finalization/repair path.
620                if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
621                    self.cache_block(round, digest, block.clone()).await;
622                    self.apply_floor_anchor(&block, buffer, application, resolver)
623                        .await;
624                } else {
625                    debug!(?round, "notarized block unavailable locally");
626                }
627            }
628            Message::Finalization { finalization } => {
629                let round = finalization.round();
630                let commitment = finalization.proposal.payload;
631                let digest = V::commitment_to_inner(commitment);
632
633                // Cache finalization by round.
634                self.cache
635                    .put_finalization(round, digest, finalization.clone())
636                    .await;
637
638                // Search for the finalized block locally, otherwise fetch it remotely.
639                if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
640                    // The anchor path stores the floor block and finalization,
641                    // advances floors, prunes below them, and resumes dispatch.
642                    if self
643                        .apply_floor_anchor(&block, buffer, application, resolver)
644                        .await
645                    {
646                        return;
647                    }
648
649                    let height = block.height();
650                    self.update_processed_round_floor(height, round, resolver)
651                        .await;
652                    if self
653                        .store_finalization(height, digest, block, Some(finalization), application)
654                        .await
655                    {
656                        // If a floor anchor is pending, repair and dispatch are
657                        // no-ops until the anchor block is stored.
658                        self.try_repair_gaps(buffer, resolver, application).await;
659                        self.sync_finalized().await;
660                        self.try_dispatch_blocks(application).await;
661                        debug!(?round, %height, "finalized block stored");
662                    }
663                } else {
664                    // The finalization carries a round and commitment, but not a
665                    // height. Keep the request round-bound until the block is decoded.
666                    debug!(?round, ?commitment, "finalized block missing");
667                    self.floor
668                        .fetch_if_permitted(
669                            resolver,
670                            Request::finalized_block_by_round(commitment, round),
671                        )
672                        .ignore();
673                }
674            }
675            Message::GetBlock {
676                identifier,
677                response,
678            } => match identifier {
679                BlockID::Digest(digest) => {
680                    let result = self.find_block_by_digest(buffer, digest).await;
681                    response.send_lossy(result);
682                }
683                BlockID::Height(height) => {
684                    let result = self.get_finalized_block(height).await;
685                    response.send_lossy(result);
686                }
687                BlockID::Latest => {
688                    let block = match self.get_latest().await {
689                        Some((_, digest, _)) => self.find_block_by_digest(buffer, digest).await,
690                        None => None,
691                    };
692                    response.send_lossy(block);
693                }
694            },
695            Message::GetFinalization { height, response } => {
696                let finalization = self.get_finalization_by_height(height).await;
697                response.send_lossy(finalization);
698            }
699            Message::GetProcessedHeight { response } => {
700                response.send_lossy(self.stream.processed_height());
701            }
702            Message::HintFinalized { height, targets } => {
703                // Skip if finalization is already available locally.
704                if self.get_finalization_by_height(height).await.is_some() {
705                    return;
706                }
707
708                self.floor
709                    .fetch_targeted_if_permitted(resolver, Request::finalized(height), targets)
710                    .ignore();
711            }
712            Message::SubscribeByDigest {
713                digest,
714                fallback,
715                response,
716            } => {
717                self.handle_subscribe(
718                    fallback.into(),
719                    SubscriptionKey::Digest(digest),
720                    response,
721                    resolver,
722                    waiters,
723                    buffer,
724                )
725                .await;
726            }
727            Message::SubscribeByCommitment {
728                commitment,
729                fallback,
730                response,
731            } => {
732                self.handle_subscribe(
733                    fallback,
734                    SubscriptionKey::Commitment(commitment),
735                    response,
736                    resolver,
737                    waiters,
738                    buffer,
739                )
740                .await;
741            }
742            Message::HintNotarized { round, commitment } => {
743                if self
744                    .find_block_by_commitment(buffer, commitment)
745                    .await
746                    .is_none()
747                {
748                    self.floor
749                        .fetch_if_permitted(resolver, Request::notarized(round))
750                        .ignore();
751                }
752            }
753            Message::SetFloor { finalization } => {
754                self.install_floor(finalization, true, resolver, buffer, application)
755                    .await;
756            }
757            Message::Prune { height } => {
758                // Only allow pruning at or below the current floor.
759                if height > self.floor.processed_height() {
760                    warn!(%height, floor = %self.floor.processed_height(), "prune height above floor, ignoring");
761                    return;
762                }
763
764                self.prune_finalized_archives(height)
765                    .await
766                    .expect("failed to prune finalized archives");
767
768                // Intentionally keep existing block subscriptions alive. Canceling
769                // waiters can have catastrophic consequences because actors do not
770                // retry subscriptions on failed channels.
771            }
772        }
773    }
774
775    /// Handles a batch of resolver messages, syncing finalized archives once if
776    /// any accepted delivery buffered a write.
777    async fn handle_resolver_message<Buf, R>(
778        &mut self,
779        message: handler::Message<V::Commitment>,
780        resolver_rx: &mut handler::Receiver<V::Commitment>,
781        resolver: &mut R,
782        buffer: &mut Buf,
783        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
784    ) where
785        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
786        R: Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
787    {
788        let mut needs_sync = false;
789        let mut handled = false;
790        let mut produces = Vec::new();
791        let mut delivers = Vec::new();
792
793        // Drain up to max_repair resolver messages. Block deliveries are handled
794        // immediately, certificate-bearing deliveries are batched for verification,
795        // and produce responses wait until repair has had a chance to fill gaps.
796        for msg in std::iter::once(message)
797            .chain(std::iter::from_fn(|| resolver_rx.try_recv().ok()))
798            .take(self.max_repair.get())
799        {
800            if msg.response_closed() {
801                continue;
802            }
803            handled = true;
804
805            match msg {
806                handler::Message::Produce { key, response } => {
807                    produces.push((key, response));
808                }
809                handler::Message::Deliver {
810                    delivery,
811                    value,
812                    response,
813                } => {
814                    needs_sync |= self
815                        .handle_deliver(
816                            ResolverDelivery {
817                                delivery,
818                                value,
819                                response,
820                            },
821                            &mut delivers,
822                            buffer,
823                            application,
824                            resolver,
825                        )
826                        .await;
827                }
828            }
829        }
830        if !handled {
831            return;
832        }
833
834        // Batch verify and process all certificate-bearing deliveries.
835        needs_sync |= self
836            .verify_delivered(delivers, buffer, application, resolver)
837            .await;
838
839        // Attempt to fill gaps before handling produce requests so we can serve
840        // data received earlier in the same batch.
841        needs_sync |= self.try_repair_gaps(buffer, resolver, application).await;
842
843        if needs_sync {
844            // Sync archives before responding to peers so accepted repair data is
845            // durable before this node serves it.
846            self.sync_finalized().await;
847            self.try_dispatch_blocks(application).await;
848        }
849
850        // Handle produce requests in parallel.
851        join_all(
852            produces
853                .into_iter()
854                .map(|(key, response)| self.handle_produce(key, response, buffer)),
855        )
856        .await;
857    }
858
859    /// Handle a produce request from a remote peer.
860    async fn handle_produce<Buf: Buffer<V>>(
861        &self,
862        key: ResolverRequestFor<V>,
863        response: oneshot::Sender<Bytes>,
864        buffer: &Buf,
865    ) {
866        match key {
867            Key::Block(commitment) => {
868                let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
869                    debug!(?commitment, "block missing on request");
870                    return;
871                };
872                response.send_lossy(block.encode());
873            }
874            Key::Finalized { height } => {
875                let Some(finalization) = self.get_finalization_by_height(height).await else {
876                    debug!(%height, "finalization missing on request");
877                    return;
878                };
879                let Some(block) = self.get_finalized_block(height).await else {
880                    debug!(%height, "finalized block missing on request");
881                    return;
882                };
883                response.send_lossy((finalization, block).encode());
884            }
885            Key::Notarized { round } => {
886                let Some(notarization) = self.cache.get_notarization(round).await else {
887                    debug!(?round, "notarization missing on request");
888                    return;
889                };
890                let commitment = notarization.proposal.payload;
891                let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
892                    debug!(?commitment, "block missing on request");
893                    return;
894                };
895                response.send_lossy((notarization, block).encode());
896            }
897        }
898    }
899
900    /// Handle a local subscription request for a block.
901    async fn handle_subscribe<Buf: Buffer<V>>(
902        &mut self,
903        fallback: CommitmentFallback,
904        key: SubscriptionKeyFor<V>,
905        response: oneshot::Sender<V::Block>,
906        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
907        waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
908        buffer: &mut Buf,
909    ) {
910        let digest = match key {
911            SubscriptionKey::Digest(digest) => digest,
912            SubscriptionKey::Commitment(commitment) => V::commitment_to_inner(commitment),
913        };
914
915        // Check for block locally.
916        let block = match key {
917            SubscriptionKey::Digest(digest) => self.find_block_by_digest(buffer, digest).await,
918            SubscriptionKey::Commitment(commitment) => {
919                self.find_block_by_commitment(buffer, commitment).await
920            }
921        };
922        if let Some(block) = block {
923            response.send_lossy(block);
924            return;
925        }
926
927        // We don't have the block locally. Local-only waits reach this point
928        // without a round or height, so they only register a subscriber below.
929        //
930        // Round-based fetching is for notarized proposal lookups whose height is
931        // not known before the request. Height-based fetching is only for callers
932        // that already have a validated pruning height.
933        match fallback {
934            CommitmentFallback::FetchByRound { round } => {
935                // Fetch the notarized proposal for this round. The response
936                // must include a certificate so the commitment is tied to the
937                // certified round context. The decoded block is heightable, but
938                // that height is not known soon enough to key, coalesce, or prune
939                // the in-flight resolver request.
940                if self
941                    .floor
942                    .fetch_if_permitted(resolver, Request::notarized(round))
943                    .denied()
944                {
945                    return;
946                }
947                debug!(?round, ?digest, "requested block missing");
948            }
949            CommitmentFallback::FetchByCommitment { height } => {
950                let commitment = match key {
951                    SubscriptionKey::Commitment(commitment) => commitment,
952                    SubscriptionKey::Digest(_) => {
953                        unreachable!("digest subscriptions cannot request commitment fallback")
954                    }
955                };
956
957                // This path is only for accepted ancestry or finalized repair,
958                // never for a candidate block's immediate parent.
959                if self
960                    .floor
961                    .fetch_if_permitted(resolver, Request::certified_block(commitment, height))
962                    .denied()
963                {
964                    return;
965                }
966                debug!(%height, ?commitment, ?digest, "requested certified ancestry block missing");
967            }
968            CommitmentFallback::Wait => {}
969        }
970
971        let round = match fallback {
972            CommitmentFallback::FetchByRound { round } => Some(round),
973            CommitmentFallback::Wait | CommitmentFallback::FetchByCommitment { .. } => None,
974        };
975
976        // Register subscriber.
977        match key {
978            SubscriptionKey::Digest(digest) => {
979                debug!(?round, ?digest, "registering subscriber");
980            }
981            SubscriptionKey::Commitment(commitment) => {
982                debug!(?round, ?commitment, ?digest, "registering subscriber");
983            }
984        }
985        self.block_subscriptions
986            .insert(key, response, waiters, buffer);
987    }
988
989    /// Verifies and installs a floor, fetching the anchor block if needed.
990    async fn install_floor<Buf, R>(
991        &mut self,
992        finalization: Finalization<P::Scheme, V::Commitment>,
993        skip_if_superseded: bool,
994        resolver: &mut R,
995        buffer: &mut Buf,
996        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
997    ) where
998        Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
999        R: Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1000    {
1001        let round = finalization.round();
1002        if round <= self.floor.processed_round() {
1003            warn!(
1004                ?round,
1005                floor = ?self.floor.processed_round(),
1006                "floor not updated, below existing round floor"
1007            );
1008            return;
1009        }
1010
1011        let Some(scheme) = self.get_scheme_certificate_verifier(finalization.epoch()) else {
1012            panic!("floor finalization epoch unavailable");
1013        };
1014        assert!(
1015            finalization.verify(self.context.as_mut(), scheme.as_ref(), &self.strategy),
1016            "floor finalization must verify"
1017        );
1018
1019        let commitment = finalization.proposal.payload;
1020        let digest = V::commitment_to_inner(commitment);
1021        self.cache
1022            .put_finalization(round, digest, finalization.clone())
1023            .await;
1024
1025        // A pending anchor at the same or a newer floor already blocks
1026        // progress. Keep waiting for it instead of replacing it.
1027        if skip_if_superseded && self.floor.has_pending_anchor_at_or_after(round) {
1028            return;
1029        }
1030
1031        if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1032            self.floor.await_anchor(finalization);
1033            assert!(
1034                self.apply_floor_anchor(&block, buffer, application, resolver)
1035                    .await
1036            );
1037            return;
1038        }
1039
1040        // The pending floor owns the next application sync point. Drop any
1041        // in-flight acks before they can advance the processed height past it.
1042        self.pending_acks.clear();
1043
1044        debug!(?round, ?commitment, "starting fetch for floor block");
1045        self.floor.await_anchor(finalization);
1046        self.floor
1047            .fetch_if_permitted(
1048                resolver,
1049                Request::finalized_block_by_round(commitment, round),
1050            )
1051            .ignore();
1052    }
1053
1054    /// Applies a block if it satisfies the current floor transition.
1055    async fn apply_floor_anchor<Buf: Buffer<V>>(
1056        &mut self,
1057        block: &V::Block,
1058        buffer: &mut Buf,
1059        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1060        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1061    ) -> bool {
1062        let commitment = V::commitment(block);
1063        if !self.floor.matches_pending_anchor(commitment) {
1064            return false;
1065        }
1066        let block = (*block).clone();
1067
1068        // Floor anchors can bypass the local proposal-verification path. Check
1069        // the parent relationship before using a non-genesis anchor for walkback.
1070        let height = block.height();
1071        if height > Height::zero() {
1072            let parent_commitment = V::parent_commitment(&block);
1073            assert!(
1074                block.parent() == V::commitment_to_inner(parent_commitment),
1075                "floor block parent commitment mismatch"
1076            );
1077        }
1078
1079        // This anchor cannot move the application sync point, but its
1080        // finalization round can still prune round-bound resolver work.
1081        // Keep pending acks intact because processed_height is unchanged.
1082        if height <= self.floor.processed_height() {
1083            warn!(
1084                %height,
1085                existing = %self.floor.processed_height(),
1086                "floor not updated, at or below existing"
1087            );
1088            let finalization = self
1089                .floor
1090                .take_pending_anchor()
1091                .expect("pending floor anchor missing");
1092            self.update_processed_round_floor(height, finalization.round(), resolver)
1093                .await;
1094            if self.try_repair_gaps(buffer, resolver, application).await {
1095                self.sync_finalized().await;
1096            }
1097            self.try_dispatch_blocks(application).await;
1098            return true;
1099        }
1100
1101        let digest = block.digest();
1102        let finalization = self
1103            .floor
1104            .take_pending_anchor()
1105            .expect("pending floor anchor missing");
1106        let round = finalization.round();
1107        try_join!(
1108            async {
1109                self.finalized_blocks
1110                    .put(block.clone().into())
1111                    .await
1112                    .map_err(Box::new)?;
1113                Ok::<_, BoxedError>(())
1114            },
1115            async {
1116                self.finalizations_by_height
1117                    .put(height, digest, finalization)
1118                    .await
1119                    .map_err(Box::new)?;
1120                Ok::<_, BoxedError>(())
1121            }
1122        )
1123        .expect("failed to store floor anchor");
1124        self.sync_finalized().await;
1125        self.block_subscriptions.notify(&block);
1126
1127        if height > self.tip {
1128            application.report(Update::Tip(round, height, digest));
1129            self.tip = height;
1130            let _ = self.finalized_height.try_set(height.get());
1131        }
1132
1133        // The anchor is durable, but the application still needs to process it.
1134        // Record the previous height so dispatch resumes at the anchor itself.
1135        let dispatch_floor = height
1136            .previous()
1137            .expect("floor anchor above processed height must have predecessor");
1138        self.update_processed_height(dispatch_floor, resolver);
1139        self.update_processed_round_floor(dispatch_floor, round, resolver)
1140            .await;
1141        self.stream
1142            .sync()
1143            .await
1144            .expect("failed to sync floor metadata");
1145
1146        // Drop all pending acknowledgement waiters so any in-flight application
1147        // acks for blocks below the new floor cannot rewrite the processed floor.
1148        self.pending_acks.clear();
1149
1150        // The floor is durable, so cache/finalized data below it can be pruned.
1151        self.prune_after_floor(height)
1152            .await
1153            .expect("failed to prune data below floor");
1154
1155        // Intentionally keep existing block subscriptions alive. Canceling
1156        // waiters can have catastrophic consequences (nodes can get stuck in
1157        // different views) as actors do not retry subscriptions on failed channels.
1158        if self.try_repair_gaps(buffer, resolver, application).await {
1159            self.sync_finalized().await;
1160        }
1161        self.try_dispatch_blocks(application).await;
1162        true
1163    }
1164
1165    /// Handle a deliver message from the resolver. Block delivers are handled
1166    /// immediately. Finalized/Notarized delivers are parsed and structurally
1167    /// validated, then collected into `delivers` for batch certificate verification.
1168    /// Returns true if finalization archives were written and need syncing.
1169    async fn handle_deliver<Buf: Buffer<V>>(
1170        &mut self,
1171        message: ResolverDelivery<V>,
1172        delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
1173        buffer: &mut Buf,
1174        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1175        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1176    ) -> bool {
1177        let ResolverDelivery {
1178            delivery,
1179            mut value,
1180            response,
1181        } = message;
1182        let Delivery { key, subscribers } = delivery;
1183        match key {
1184            Key::Block(commitment) => {
1185                let block_cfg = V::block_cfg(&self.block_codec_config, commitment);
1186                let Ok(block) = V::Block::decode_cfg(value.as_ref(), &block_cfg) else {
1187                    response.send_lossy(false);
1188                    return false;
1189                };
1190                if V::commitment(&block) != commitment {
1191                    response.send_lossy(false);
1192                    return false;
1193                }
1194
1195                // This block may match the pending floor request. Whether it
1196                // installs or is rejected as the floor anchor, do not also
1197                // process it as an ordinary block delivery.
1198                if self
1199                    .apply_floor_anchor(&block, buffer, application, resolver)
1200                    .await
1201                {
1202                    response.send_lossy(true);
1203                    return false;
1204                }
1205
1206                // The commitment validates the peer response. Annotations are
1207                // local context attached to the request and do not affect peer
1208                // validity.
1209                self.block_subscriptions.notify(&block);
1210
1211                // The peer-visible request only says "give me this block".
1212                // Local annotations explain why the block was requested and
1213                // therefore where, if anywhere, it should be stored.
1214                let height = block.height();
1215                let digest = block.digest();
1216                let annotations = subscribers.into_vec();
1217
1218                // Round-bound proposal-parent fetches are `Key::Notarized`
1219                // deliveries and are handled below. In this block-keyed path,
1220                // `Finalized` means the block belongs in the finalized chain.
1221                let finalization = self.cache.get_finalization_for(digest).await;
1222                if let Some(finalization) = &finalization {
1223                    self.update_processed_round_floor(height, finalization.round(), resolver)
1224                        .await;
1225                }
1226                let wrote = if finalization.is_some()
1227                    || annotations
1228                        .iter()
1229                        .any(|annotation| matches!(annotation, Annotation::Finalized(_)))
1230                {
1231                    self.store_finalization(height, digest, block, finalization, application)
1232                        .await
1233                } else {
1234                    if annotations
1235                        .iter()
1236                        .any(|annotation| matches!(annotation, Annotation::Certified { .. }))
1237                        && height > self.floor.processed_height()
1238                    {
1239                        if let Some(bounds) = self.epocher.containing(height) {
1240                            self.cache
1241                                .put_certified(bounds.epoch(), height, digest, block.clone().into())
1242                                .await;
1243                        }
1244                    }
1245                    false
1246                };
1247                debug!(?digest, %height, "received block");
1248                response.send_lossy(true);
1249                wrote
1250            }
1251            Key::Finalized { height } => {
1252                let Some(bounds) = self.epocher.containing(height) else {
1253                    debug!(
1254                        %height,
1255                        floor = %self.floor.processed_height(),
1256                        "ignoring stale delivery"
1257                    );
1258                    response.send_lossy(true);
1259                    return false;
1260                };
1261                let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
1262                    debug!(
1263                        %height,
1264                        floor = %self.floor.processed_height(),
1265                        "ignoring stale delivery"
1266                    );
1267                    response.send_lossy(true);
1268                    return false;
1269                };
1270
1271                let certificate_codec_config = scheme.certificate_codec_config();
1272                let Ok(finalization) =
1273                    Finalization::read_cfg(&mut value, &certificate_codec_config)
1274                else {
1275                    response.send_lossy(false);
1276                    return false;
1277                };
1278
1279                let commitment = finalization.proposal.payload;
1280                let block_cfg = V::block_cfg(&self.block_codec_config, commitment);
1281                let Ok(block) = V::Block::decode_cfg(value, &block_cfg) else {
1282                    response.send_lossy(false);
1283                    return false;
1284                };
1285
1286                if block.height() != height
1287                    || V::commitment(&block) != commitment
1288                    || finalization.epoch() != bounds.epoch()
1289                {
1290                    response.send_lossy(false);
1291                    return false;
1292                }
1293                delivers.push(PendingVerification::Finalized {
1294                    finalization,
1295                    block,
1296                    response,
1297                });
1298                false
1299            }
1300            Key::Notarized { round } => {
1301                let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
1302                    debug!(
1303                        ?round,
1304                        floor = %self.floor.processed_height(),
1305                        "ignoring stale delivery"
1306                    );
1307                    response.send_lossy(true);
1308                    return false;
1309                };
1310
1311                let certificate_codec_config = scheme.certificate_codec_config();
1312                let Ok(notarization) =
1313                    Notarization::read_cfg(&mut value, &certificate_codec_config)
1314                else {
1315                    response.send_lossy(false);
1316                    return false;
1317                };
1318
1319                let commitment = notarization.proposal.payload;
1320                let block_cfg = V::block_cfg(&self.block_codec_config, commitment);
1321                let Ok(block) = V::Block::decode_cfg(value, &block_cfg) else {
1322                    response.send_lossy(false);
1323                    return false;
1324                };
1325
1326                if notarization.round() != round
1327                    || V::commitment(&block) != notarization.proposal.payload
1328                {
1329                    response.send_lossy(false);
1330                    return false;
1331                }
1332                delivers.push(PendingVerification::Notarized {
1333                    notarization,
1334                    block,
1335                    response,
1336                });
1337                false
1338            }
1339        }
1340    }
1341
1342    /// Batch verify pending certificates and process valid items. Returns true
1343    /// if finalization archives were written and need syncing.
1344    async fn verify_delivered<Buf: Buffer<V>>(
1345        &mut self,
1346        mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1347        buffer: &mut Buf,
1348        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1349        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1350    ) -> bool {
1351        delivers.retain(|item| !item.response_closed());
1352        if delivers.is_empty() {
1353            return false;
1354        }
1355
1356        // Extract (subject, certificate) pairs for batch verification.
1357        let certs: Vec<_> = delivers
1358            .iter()
1359            .map(|item| match item {
1360                PendingVerification::Finalized { finalization, .. } => (
1361                    Subject::Finalize {
1362                        proposal: &finalization.proposal,
1363                    },
1364                    &finalization.certificate,
1365                ),
1366                PendingVerification::Notarized { notarization, .. } => (
1367                    Subject::Notarize {
1368                        proposal: &notarization.proposal,
1369                    },
1370                    &notarization.certificate,
1371                ),
1372            })
1373            .collect();
1374
1375        // Batch verify using the all-epoch verifier if available, otherwise
1376        // batch verify per epoch using scoped verifiers.
1377        let verified = if let Some(scheme) = self.provider.all() {
1378            verify_certificates(
1379                self.context.as_mut(),
1380                scheme.as_ref(),
1381                &certs,
1382                &self.strategy,
1383            )
1384        } else {
1385            let mut verified = vec![false; delivers.len()];
1386
1387            // Group indices by epoch.
1388            let mut by_epoch: BTreeMap<Epoch, Vec<usize>> = BTreeMap::new();
1389            for (i, item) in delivers.iter().enumerate() {
1390                let epoch = match item {
1391                    PendingVerification::Notarized { notarization, .. } => notarization.epoch(),
1392                    PendingVerification::Finalized { finalization, .. } => finalization.epoch(),
1393                };
1394                by_epoch.entry(epoch).or_default().push(i);
1395            }
1396
1397            // Batch verify each epoch group.
1398            for (epoch, indices) in &by_epoch {
1399                let Some(scheme) = self.provider.scoped(*epoch) else {
1400                    continue;
1401                };
1402                let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect();
1403                let results = verify_certificates(
1404                    self.context.as_mut(),
1405                    scheme.as_ref(),
1406                    &group,
1407                    &self.strategy,
1408                );
1409                for (j, &idx) in indices.iter().enumerate() {
1410                    verified[idx] = results[j];
1411                }
1412            }
1413            verified
1414        };
1415
1416        // Process each verified item, rejecting unverified ones.
1417        let mut wrote = false;
1418        for (index, item) in delivers.drain(..).enumerate() {
1419            if !verified[index] {
1420                match item {
1421                    PendingVerification::Finalized { response, .. }
1422                    | PendingVerification::Notarized { response, .. } => {
1423                        response.send_lossy(false);
1424                    }
1425                }
1426                continue;
1427            }
1428            match item {
1429                PendingVerification::Finalized {
1430                    finalization,
1431                    block,
1432                    response,
1433                } => {
1434                    // Valid finalization received.
1435                    response.send_lossy(true);
1436                    let round = finalization.round();
1437                    let height = block.height();
1438                    let digest = block.digest();
1439                    debug!(?round, %height, "received finalization");
1440
1441                    // The floor-anchor path fully handles this finalization
1442                    // and moves the lower bound past it.
1443                    if self
1444                        .apply_floor_anchor(&block, buffer, application, resolver)
1445                        .await
1446                    {
1447                        continue;
1448                    }
1449
1450                    self.update_processed_round_floor(height, round, resolver)
1451                        .await;
1452
1453                    wrote |= self
1454                        .store_finalization(height, digest, block, Some(finalization), application)
1455                        .await;
1456                }
1457                PendingVerification::Notarized {
1458                    notarization,
1459                    block,
1460                    response,
1461                } => {
1462                    // Valid notarization received.
1463                    response.send_lossy(true);
1464                    let round = notarization.round();
1465                    let commitment = notarization.proposal.payload;
1466                    let digest = V::commitment_to_inner(commitment);
1467                    debug!(?round, ?digest, "received notarization");
1468
1469                    // Cache the notarization and block.
1470                    let height = block.height();
1471                    self.cache_block(round, digest, block.clone()).await;
1472                    self.cache
1473                        .put_notarization(round, digest, notarization)
1474                        .await;
1475
1476                    // A notarized delivery can carry the pending floor block
1477                    // after the finalization is cached.
1478                    if self
1479                        .apply_floor_anchor(&block, buffer, application, resolver)
1480                        .await
1481                    {
1482                        continue;
1483                    }
1484
1485                    // If there exists a finalization certificate for this block, we
1486                    // should finalize it. This could finalize the block faster when
1487                    // a notarization then a finalization are received via consensus
1488                    // and we resolve the notarization request before the block request.
1489                    if let Some(finalization) = self.cache.get_finalization_for(digest).await {
1490                        self.update_processed_round_floor(height, finalization.round(), resolver)
1491                            .await;
1492
1493                        // SAFETY: `digest` identifies a unique `commitment`, so this
1494                        // cached finalization payload must match `V::commitment(&block)`.
1495                        wrote |= self
1496                            .store_finalization(
1497                                height,
1498                                digest,
1499                                block.clone(),
1500                                Some(finalization),
1501                                application,
1502                            )
1503                            .await;
1504                    }
1505                }
1506            }
1507        }
1508
1509        wrote
1510    }
1511
1512    /// Returns a scheme suitable for verifying certificates at the given epoch.
1513    ///
1514    /// Prefers a certificate verifier if available, otherwise falls back
1515    /// to the scheme for the given epoch.
1516    fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
1517        self.provider.all().or_else(|| self.provider.scoped(epoch))
1518    }
1519
1520    // -------------------- Application Dispatch --------------------
1521
1522    /// Attempt to dispatch the next finalized block to the application if ready.
1523    ///
1524    /// Dispatch finalized blocks to the application until the pipeline is full
1525    /// or no more blocks are available.
1526    ///
1527    /// This does NOT advance the processed floor height or sync metadata. It only
1528    /// sends blocks to the application and enqueues pending acks. Metadata is
1529    /// updated later, in a subsequent `select_loop!` iteration, when the ack
1530    /// handler updates the processed height.
1531    ///
1532    /// Callers must only invoke this after [`Self::sync_finalized`] has made any
1533    /// preceding finalized-archive writes durable. In other words, anything fed
1534    /// to the application from this method is already durably persisted in marshal.
1535    ///
1536    /// Acks are processed in FIFO order so the processed floor height always
1537    /// advances sequentially.
1538    ///
1539    /// # Crash safety
1540    ///
1541    /// Because `select_loop!` arms run to completion, the caller's
1542    /// [`Self::sync_finalized`] always executes before the ack handler runs. This
1543    /// guarantees archive data is durable before the processed floor height
1544    /// advances:
1545    ///
1546    /// ```text
1547    /// Iteration N (caller):
1548    ///   store_finalization  ->  Archive::put (buffered)
1549    ///   sync_finalized      ->  archive durable
1550    ///   try_dispatch_blocks  ->  sends blocks to app, enqueues pending acks
1551    ///
1552    /// Iteration M (ack handler, M > N):
1553    ///   ack handler       ->  update_processed_height  ->  metadata buffered
1554    ///   stream.sync       ->  metadata durable
1555    /// ```
1556    async fn try_dispatch_blocks(
1557        &mut self,
1558        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1559    ) {
1560        // Dispatch resumes after the floor anchor is durably stored.
1561        if self.floor.blocks_progress() {
1562            return;
1563        }
1564
1565        while self.pending_acks.has_capacity() {
1566            let next_height = self
1567                .pending_acks
1568                .next_dispatch_height(self.stream.next_height());
1569            let Some(block) = self.get_finalized_block(next_height).await else {
1570                return;
1571            };
1572            assert_eq!(
1573                block.height(),
1574                next_height,
1575                "finalized block height mismatch"
1576            );
1577
1578            let (height, commitment) = (block.height(), V::commitment(&block));
1579            let (ack, ack_waiter) = A::handle();
1580            application.report(Update::Block(V::into_inner(block), ack));
1581            self.pending_acks.enqueue(PendingAck {
1582                height,
1583                commitment,
1584                receiver: ack_waiter,
1585            });
1586        }
1587    }
1588
1589    // -------------------- Prunable Storage --------------------
1590
1591    /// Add a verified block to the prunable archive.
1592    async fn cache_verified(
1593        &mut self,
1594        round: Round,
1595        digest: <V::Block as Digestible>::Digest,
1596        block: V::Block,
1597    ) {
1598        self.block_subscriptions.notify(&block);
1599        self.cache.put_verified(round, digest, block.into()).await;
1600    }
1601
1602    /// If a block previously accepted via [`Message::Proposed`] matches the
1603    /// supplied `(round, commitment)`, remove and return it.
1604    fn take_proposed(&mut self, round: Round, commitment: V::Commitment) -> Option<V::Block> {
1605        let (cached_round, cached_commitment, _) = self.last_proposed_block.as_ref()?;
1606        if *cached_round != round || *cached_commitment != commitment {
1607            return None;
1608        }
1609        self.last_proposed_block.take().map(|(_, _, block)| block)
1610    }
1611
1612    /// Add a notarized block to the prunable archive.
1613    async fn cache_block(
1614        &mut self,
1615        round: Round,
1616        digest: <V::Block as Digestible>::Digest,
1617        block: V::Block,
1618    ) {
1619        self.block_subscriptions.notify(&block);
1620        self.cache.put_block(round, digest, block.into()).await;
1621    }
1622
1623    /// Sync both finalization archives to durable storage.
1624    ///
1625    /// Must be called within the same `select_loop!` arm as any preceding
1626    /// [`Self::store_finalization`] / [`Self::try_repair_gaps`] writes, before yielding back
1627    /// to the loop. This is the durability barrier for application delivery:
1628    /// [`Self::try_dispatch_blocks`] must run only after this sync completes.
1629    /// It also ensures archives are durable before the ack handler advances
1630    /// the processed floor height. See [`Self::try_dispatch_blocks`] for details.
1631    async fn sync_finalized(&mut self) {
1632        if let Err(e) = try_join!(
1633            async {
1634                self.finalized_blocks.sync().await.map_err(Box::new)?;
1635                Ok::<_, BoxedError>(())
1636            },
1637            async {
1638                self.finalizations_by_height
1639                    .sync()
1640                    .await
1641                    .map_err(Box::new)?;
1642                Ok::<_, BoxedError>(())
1643            },
1644        ) {
1645            panic!("failed to sync finalization archives: {e}");
1646        }
1647    }
1648
1649    // -------------------- Immutable Storage --------------------
1650
1651    /// Get a finalized block from the immutable archive.
1652    async fn get_finalized_block(&self, height: Height) -> Option<V::Block> {
1653        match self
1654            .finalized_blocks
1655            .get(ArchiveID::Index(height.get()))
1656            .await
1657        {
1658            Ok(stored) => stored.map(|stored| stored.into()),
1659            Err(e) => panic!("failed to get block: {e}"),
1660        }
1661    }
1662
1663    /// Get a finalization from the archive by height.
1664    async fn get_finalization_by_height(
1665        &self,
1666        height: Height,
1667    ) -> Option<Finalization<P::Scheme, V::Commitment>> {
1668        match self
1669            .finalizations_by_height
1670            .get(ArchiveID::Index(height.get()))
1671            .await
1672        {
1673            Ok(finalization) => finalization,
1674            Err(e) => panic!("failed to get finalization: {e}"),
1675        }
1676    }
1677
1678    /// Get finalized block information from either the finalization archive or
1679    /// the finalized-block archive.
1680    async fn get_info_by_height(
1681        &self,
1682        height: Height,
1683    ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
1684        if let Some(finalization) = self.get_finalization_by_height(height).await {
1685            return Some((
1686                height,
1687                V::commitment_to_inner(finalization.proposal.payload),
1688            ));
1689        }
1690
1691        self.get_finalized_block(height)
1692            .await
1693            .map(|block| (block.height(), block.digest()))
1694    }
1695
1696    /// Add a finalized block, and optionally a finalization, to the archive.
1697    ///
1698    /// After persisting the block, the caller must sync finalized archives
1699    /// before dispatching the next contiguous block to the application. The
1700    /// buffered archive writes from this method are not a sufficient durability
1701    /// guarantee for downstream application state transitions on their own.
1702    ///
1703    /// Writes are buffered and not synced. The caller must call
1704    /// [sync_finalized](Self::sync_finalized) before yielding to the
1705    /// `select_loop!` so that archive data is durable before the ack handler
1706    /// advances the processed floor height. See [`Self::try_dispatch_blocks`] for the
1707    /// crash safety invariant.
1708    async fn store_finalization(
1709        &mut self,
1710        height: Height,
1711        digest: <V::Block as Digestible>::Digest,
1712        block: V::Block,
1713        finalization: Option<Finalization<P::Scheme, V::Commitment>>,
1714        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1715    ) -> bool {
1716        // Blocks below the last processed height are not useful to us, so we ignore them (this
1717        // has the nice byproduct of ensuring we don't call a backing store with a block below the
1718        // pruning boundary)
1719        if height <= self.floor.processed_height() {
1720            debug!(
1721                %height,
1722                floor = %self.floor.processed_height(),
1723                ?digest,
1724                "dropping finalization at or below processed height floor"
1725            );
1726            return false;
1727        }
1728        self.block_subscriptions.notify(&block);
1729
1730        // Convert block to storage format
1731        let stored: V::StoredBlock = block.into();
1732        let round = finalization.as_ref().map(|f| f.round());
1733
1734        // In parallel, update the finalized blocks and finalizations archives
1735        if let Err(e) = try_join!(
1736            // Update the finalized blocks archive
1737            async {
1738                self.finalized_blocks.put(stored).await.map_err(Box::new)?;
1739                Ok::<_, BoxedError>(())
1740            },
1741            // Update the finalizations archive (if provided)
1742            async {
1743                if let Some(finalization) = finalization {
1744                    self.finalizations_by_height
1745                        .put(height, digest, finalization)
1746                        .await
1747                        .map_err(Box::new)?;
1748                }
1749                Ok::<_, BoxedError>(())
1750            }
1751        ) {
1752            panic!("failed to finalize: {e}");
1753        }
1754
1755        // Update metrics and application
1756        if let Some(round) = round.filter(|_| height > self.tip) {
1757            application.report(Update::Tip(round, height, digest));
1758            self.tip = height;
1759            let _ = self.finalized_height.try_set(height.get());
1760        }
1761
1762        true
1763    }
1764
1765    /// Get the latest finalized block information (height and digest tuple).
1766    ///
1767    /// Blocks are only finalized directly with a finalization or indirectly via a descendant
1768    /// block's finalization. Thus, the highest known finalized block must itself have a direct
1769    /// finalization.
1770    ///
1771    /// We return the height and digest using the highest known finalization that we know the
1772    /// block height for. While it's possible that we have a later finalization, if we do not have
1773    /// the full block for that finalization, we do not know its height and therefore it would not
1774    /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
1775    /// should have the associated block (in the `finalized_blocks` archive) for the information
1776    /// returned.
1777    async fn get_latest(&mut self) -> Option<(Height, <V::Block as Digestible>::Digest, Round)> {
1778        let height = self.finalizations_by_height.last_index()?;
1779        let finalization = self
1780            .get_finalization_by_height(height)
1781            .await
1782            .expect("finalization missing");
1783        Some((
1784            height,
1785            V::commitment_to_inner(finalization.proposal.payload),
1786            finalization.round(),
1787        ))
1788    }
1789
1790    // -------------------- Mixed Storage --------------------
1791
1792    /// Looks for a block in cache and finalized storage by digest.
1793    async fn find_block_in_storage(
1794        &self,
1795        digest: <V::Block as Digestible>::Digest,
1796    ) -> Option<V::Block> {
1797        // Check verified / notarized blocks via cache manager.
1798        if let Some(block) = self.cache.find_block(digest).await {
1799            return Some(block.into());
1800        }
1801        // Check finalized blocks.
1802        match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1803            Ok(stored) => stored.map(|stored| stored.into()),
1804            Err(e) => panic!("failed to get block: {e}"),
1805        }
1806    }
1807
1808    /// Looks for a block in cache and finalized storage by inner digest, returning
1809    /// only blocks that match `predicate`.
1810    async fn find_block_in_storage_matching(
1811        &self,
1812        digest: <V::Block as Digestible>::Digest,
1813        mut predicate: impl FnMut(&V::Block) -> bool,
1814    ) -> Option<V::Block> {
1815        if let Some(block) = self
1816            .cache
1817            .find_block_matching(digest, |stored| {
1818                let block = stored.clone().into();
1819                predicate(&block)
1820            })
1821            .await
1822        {
1823            return Some(block.into());
1824        }
1825
1826        match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1827            Ok(Some(stored)) => {
1828                let block = stored.into();
1829                predicate(&block).then_some(block)
1830            }
1831            Ok(None) => None,
1832            Err(e) => panic!("failed to get block: {e}"),
1833        }
1834    }
1835
1836    /// Looks for a block anywhere in local storage using only the digest.
1837    ///
1838    /// This is used when we only have a digest (during gap repair following
1839    /// parent links).
1840    async fn find_block_by_digest<Buf: Buffer<V>>(
1841        &self,
1842        buffer: &Buf,
1843        digest: <V::Block as Digestible>::Digest,
1844    ) -> Option<V::Block> {
1845        if let Some(block) = buffer.find_by_digest(digest).await {
1846            return Some(block);
1847        }
1848        self.find_block_in_storage(digest).await
1849    }
1850
1851    /// Looks for a block anywhere in local storage using the full commitment.
1852    ///
1853    /// This is used when we have a full commitment (from notarizations/finalizations).
1854    /// Having the full commitment may enable additional retrieval mechanisms.
1855    async fn find_block_by_commitment<Buf: Buffer<V>>(
1856        &self,
1857        buffer: &Buf,
1858        commitment: V::Commitment,
1859    ) -> Option<V::Block> {
1860        if let Some(block) = buffer.find_by_commitment(commitment).await {
1861            return Some(block);
1862        }
1863        self.find_block_in_storage_matching(V::commitment_to_inner(commitment), |block| {
1864            V::commitment(block) == commitment
1865        })
1866        .await
1867    }
1868
1869    /// Attempt to repair any identified gaps in the finalized blocks archive. The total
1870    /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
1871    /// though multiple gaps may be spanned.
1872    ///
1873    /// This also handles the "trailing" case where finalizations exist beyond
1874    /// the last stored block (the block data was lost before a crash). The
1875    /// trailing block is anchored first so that backward gap repair can fill
1876    /// inward from it.
1877    ///
1878    /// Writes are buffered. Returns `true` if this call wrote repaired blocks and
1879    /// needs a subsequent [`sync_finalized`](Self::sync_finalized).
1880    async fn try_repair_gaps<Buf: Buffer<V>>(
1881        &mut self,
1882        buffer: &mut Buf,
1883        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1884        application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1885    ) -> bool {
1886        // Gap repair needs a known processed floor. A floor transition may
1887        // jump the lower bound once its anchor block arrives.
1888        if self.floor.blocks_progress() {
1889            return false;
1890        }
1891
1892        let mut wrote = false;
1893        let start = self.floor.processed_height().next();
1894
1895        // If finalizations extend beyond the last stored block, anchor the
1896        // trailing block so the gap repair loop below can walk backward from it.
1897        if let Some(last_finalized) = self.finalizations_by_height.last_index() {
1898            let have_block = self
1899                .finalized_blocks
1900                .last_index()
1901                .is_some_and(|last| last >= last_finalized);
1902            if last_finalized > self.floor.processed_height() && !have_block {
1903                // Get the finalization for the last finalized block.
1904                let finalization = self
1905                    .get_finalization_by_height(last_finalized)
1906                    .await
1907                    .expect("finalization missing");
1908                let commitment = finalization.proposal.payload;
1909                if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1910                    // If found, persist the block.
1911                    let digest = block.digest();
1912                    wrote |= self
1913                        .store_finalization(
1914                            last_finalized,
1915                            digest,
1916                            block,
1917                            Some(finalization),
1918                            application,
1919                        )
1920                        .await;
1921                } else {
1922                    // Request the missing block.
1923                    self.floor
1924                        .fetch_if_permitted(
1925                            resolver,
1926                            Request::finalized_block_by_height(commitment, last_finalized),
1927                        )
1928                        .ignore();
1929                }
1930            }
1931        }
1932
1933        // Fill internal gaps by walking backward from each gap's end block.
1934        'cache_repair: loop {
1935            let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1936                // No gaps detected
1937                return wrote;
1938            };
1939
1940            // Attempt to repair the gap backwards from the end of the gap, using
1941            // blocks from our local storage.
1942            let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1943                panic!("gapped block missing that should exist: {gap_end}");
1944            };
1945
1946            // Compute the lower bound of the recursive repair. `gap_start` is `Some`
1947            // if `start` is not in a gap. We add one to it to ensure we don't
1948            // re-persist it to the database in the repair loop below.
1949            let gap_start = gap_start.map(Height::next).unwrap_or(start);
1950
1951            // Iterate backwards, repairing blocks as we go.
1952            while cursor.height() > gap_start {
1953                let parent_digest = cursor.parent();
1954                let parent_commitment = V::parent_commitment(&cursor);
1955                if let Some(block) = self
1956                    .find_block_by_commitment(buffer, parent_commitment)
1957                    .await
1958                {
1959                    let finalization = self.cache.get_finalization_for(parent_digest).await;
1960                    wrote |= self
1961                        .store_finalization(
1962                            block.height(),
1963                            parent_digest,
1964                            block.clone(),
1965                            finalization,
1966                            application,
1967                        )
1968                        .await;
1969                    debug!(height = %block.height(), "repaired block");
1970                    cursor = block;
1971                } else {
1972                    // Request the next missing commitment.
1973                    //
1974                    // SAFETY: Finalized blocks are archived only after the
1975                    // parent relationship needed for walkback has been
1976                    // validated by marshal.
1977                    let parent_height = cursor
1978                        .height()
1979                        .previous()
1980                        .expect("cursor above gap start has a parent");
1981                    self.floor
1982                        .fetch_if_permitted(
1983                            resolver,
1984                            Request::finalized_block_by_height(parent_commitment, parent_height),
1985                        )
1986                        .ignore();
1987                    break 'cache_repair;
1988                }
1989            }
1990        }
1991
1992        // Request any finalizations for missing items in the archive, up to
1993        // the `max_repair` quota. This may help shrink the size of the gap
1994        // closest to the application's processed height if finalizations
1995        // for the requests' heights exist. If not, we rely on the recursive
1996        // digest fetches above.
1997        let missing_items = self
1998            .finalized_blocks
1999            .missing_items(start, self.max_repair.get());
2000        let requests: Vec<_> = missing_items.into_iter().map(Request::finalized).collect();
2001        if !requests.is_empty() {
2002            self.floor
2003                .fetch_all_if_permitted(resolver, requests)
2004                .ignore();
2005        }
2006        wrote
2007    }
2008
2009    /// Buffers a processed height update in memory and metrics. Does NOT sync
2010    /// to durable storage. Sync metadata after buffered updates to make them durable.
2011    fn update_processed_height(
2012        &mut self,
2013        height: Height,
2014        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
2015    ) {
2016        self.stream.acknowledge(height);
2017        self.floor.set_processed_height(height);
2018        let _ = self
2019            .processed_height
2020            .try_set(self.floor.processed_height().get());
2021
2022        // Prune any existing requests below the new floor.
2023        resolver.retain(handler::above_height_floor::<V::Commitment>(height));
2024    }
2025
2026    /// Returns the latest known finalization round at or below the processed height.
2027    async fn latest_processed_round(finalizations_by_height: &FC, height: Option<Height>) -> Round {
2028        let Some(height) = height else {
2029            return Round::zero();
2030        };
2031        let Some(finalization_height) = finalizations_by_height
2032            .ranges_from(Height::zero())
2033            .filter_map(|(start, end)| (start <= height).then_some(end.min(height)))
2034            .max()
2035        else {
2036            return Round::zero();
2037        };
2038
2039        match finalizations_by_height
2040            .get(ArchiveID::Index(finalization_height.get()))
2041            .await
2042        {
2043            Ok(Some(finalization)) => finalization.round(),
2044            Ok(None) => panic!("processed finalization missing from stored range"),
2045            Err(err) => panic!("failed to get processed finalization: {err}"),
2046        }
2047    }
2048
2049    /// Buffers a processed round update in memory and prunes round-bound requests.
2050    async fn update_processed_round(
2051        &mut self,
2052        height: Height,
2053        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
2054    ) {
2055        let Some(finalization) = self.get_finalization_by_height(height).await else {
2056            return;
2057        };
2058        self.update_processed_round_floor(height, finalization.round(), resolver)
2059            .await;
2060    }
2061
2062    /// Buffers a processed round floor update in memory and prunes round-bound requests.
2063    async fn update_processed_round_floor(
2064        &mut self,
2065        height: Height,
2066        round: Round,
2067        resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
2068    ) {
2069        if height > self.floor.processed_height() || round <= self.floor.processed_round() {
2070            return;
2071        }
2072
2073        let previous = self.floor.processed_round();
2074        self.floor.set_processed_round(round);
2075
2076        // Retain view-indexed cache data for a window behind the previously
2077        // processed finalized block.
2078        let prune_round = Round::new(
2079            previous.epoch(),
2080            previous.view().saturating_sub(self.view_retention_timeout),
2081        );
2082        self.cache.prune_by_view(prune_round).await;
2083
2084        // Prune round-bound requests at or below the processed round.
2085        resolver.retain(handler::above_round_floor::<V::Commitment>(
2086            self.floor.processed_round(),
2087        ));
2088    }
2089
2090    /// Prunes finalized blocks and certificates below the given height.
2091    async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
2092        // Prune the finalized block and finalization certificate archives in parallel.
2093        try_join!(
2094            async {
2095                self.finalized_blocks
2096                    .prune(height)
2097                    .await
2098                    .map_err(Box::new)?;
2099                Ok::<_, BoxedError>(())
2100            },
2101            async {
2102                self.finalizations_by_height
2103                    .prune(height)
2104                    .await
2105                    .map_err(Box::new)?;
2106                Ok::<_, BoxedError>(())
2107            }
2108        )?;
2109        Ok(())
2110    }
2111
2112    /// Prunes finalized archives and height-indexed certified cache data below the durable floor.
2113    async fn prune_after_floor(&mut self, height: Height) -> Result<(), BoxedError> {
2114        let cache = &mut self.cache;
2115        let finalized_blocks = &mut self.finalized_blocks;
2116        let finalizations_by_height = &mut self.finalizations_by_height;
2117        try_join!(
2118            async {
2119                cache.prune_by_height(height).await;
2120                Ok::<_, BoxedError>(())
2121            },
2122            async {
2123                finalized_blocks.prune(height).await.map_err(Box::new)?;
2124                Ok::<_, BoxedError>(())
2125            },
2126            async {
2127                finalizations_by_height
2128                    .prune(height)
2129                    .await
2130                    .map_err(Box::new)?;
2131                Ok::<_, BoxedError>(())
2132            }
2133        )?;
2134        Ok(())
2135    }
2136}