commonware_consensus/marshal/
actor.rs

1use super::{
2    cache,
3    config::Config,
4    ingress::{
5        handler::{self, Request},
6        mailbox::{Mailbox, Message},
7    },
8};
9use crate::{
10    marshal::{
11        ingress::mailbox::Identifier as BlockID,
12        store::{Blocks, Certificates},
13        Update,
14    },
15    simplex::{
16        scheme::Scheme,
17        types::{Finalization, Notarization},
18    },
19    types::{Epoch, Epocher, Height, Round, ViewDelta},
20    Block, Reporter,
21};
22use commonware_broadcast::{buffered, Broadcaster};
23use commonware_codec::{Decode, Encode};
24use commonware_cryptography::{
25    certificate::{Provider, Scheme as CertificateScheme},
26    PublicKey,
27};
28use commonware_macros::select;
29use commonware_p2p::Recipients;
30use commonware_parallel::Strategy;
31use commonware_resolver::Resolver;
32use commonware_runtime::{
33    spawn_cell, telemetry::metrics::status::GaugeExt, Clock, ContextCell, Handle, Metrics, Spawner,
34    Storage,
35};
36use commonware_storage::{
37    archive::Identifier as ArchiveID,
38    metadata::{self, Metadata},
39};
40use commonware_utils::{
41    acknowledgement::Exact,
42    channels::fallible::OneshotExt,
43    futures::{AbortablePool, Aborter, OptionFuture},
44    sequence::U64,
45    Acknowledgement, BoxedError,
46};
47use futures::{
48    channel::{mpsc, oneshot},
49    try_join, StreamExt,
50};
51use pin_project::pin_project;
52use prometheus_client::metrics::gauge::Gauge;
53use rand_core::CryptoRngCore;
54use std::{
55    collections::{btree_map::Entry, BTreeMap},
56    future::Future,
57    num::NonZeroUsize,
58    sync::Arc,
59};
60use tracing::{debug, error, info, warn};
61
62/// The key used to store the last processed height in the metadata store.
63const LATEST_KEY: U64 = U64::new(0xFF);
64
65/// A pending acknowledgement from the application for processing a block at the contained height/commitment.
66#[pin_project]
67struct PendingAck<B: Block, A: Acknowledgement> {
68    height: Height,
69    commitment: B::Commitment,
70    #[pin]
71    receiver: A::Waiter,
72}
73
74impl<B: Block, A: Acknowledgement> Future for PendingAck<B, A> {
75    type Output = <A::Waiter as Future>::Output;
76
77    fn poll(
78        self: std::pin::Pin<&mut Self>,
79        cx: &mut std::task::Context<'_>,
80    ) -> std::task::Poll<Self::Output> {
81        self.project().receiver.poll(cx)
82    }
83}
84
85/// A struct that holds multiple subscriptions for a block.
86struct BlockSubscription<B: Block> {
87    // The subscribers that are waiting for the block
88    subscribers: Vec<oneshot::Sender<B>>,
89    // Aborter that aborts the waiter future when dropped
90    _aborter: Aborter,
91}
92
93/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
94/// receiving notarizations and finalizations from consensus, and reconstructing a total order
95/// of blocks.
96///
97/// The actor is designed to be used in a view-based model. Each view corresponds to a
98/// potential block in the chain. The actor will only finalize a block if it has a
99/// corresponding finalization.
100///
101/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
102/// finalization for a block that is ahead of its current view, it will request the missing blocks
103/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
104/// behind.
105pub struct Actor<E, B, P, FC, FB, ES, T, A = Exact>
106where
107    E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
108    B: Block,
109    P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
110    FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
111    FB: Blocks<Block = B>,
112    ES: Epocher,
113    T: Strategy,
114    A: Acknowledgement,
115{
116    // ---------- Context ----------
117    context: ContextCell<E>,
118
119    // ---------- Message Passing ----------
120    // Mailbox
121    mailbox: mpsc::Receiver<Message<P::Scheme, B>>,
122
123    // ---------- Configuration ----------
124    // Provider for epoch-specific signing schemes
125    provider: P,
126    // Epoch configuration
127    epocher: ES,
128    // Minimum number of views to retain temporary data after the application processes a block
129    view_retention_timeout: ViewDelta,
130    // Maximum number of blocks to repair at once
131    max_repair: NonZeroUsize,
132    // Codec configuration for block type
133    block_codec_config: B::Cfg,
134    // Strategy for parallel operations
135    strategy: T,
136
137    // ---------- State ----------
138    // Last view processed
139    last_processed_round: Round,
140    // Last height processed by the application
141    last_processed_height: Height,
142    // Pending application acknowledgement, if any
143    pending_ack: OptionFuture<PendingAck<B, A>>,
144    // Highest known finalized height
145    tip: Height,
146    // Outstanding subscriptions for blocks
147    block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
148
149    // ---------- Storage ----------
150    // Prunable cache
151    cache: cache::Manager<E, B, P::Scheme>,
152    // Metadata tracking application progress
153    application_metadata: Metadata<E, U64, Height>,
154    // Finalizations stored by height
155    finalizations_by_height: FC,
156    // Finalized blocks stored by height
157    finalized_blocks: FB,
158
159    // ---------- Metrics ----------
160    // Latest height metric
161    finalized_height: Gauge,
162    // Latest processed height
163    processed_height: Gauge,
164}
165
166impl<E, B, P, FC, FB, ES, T, A> Actor<E, B, P, FC, FB, ES, T, A>
167where
168    E: CryptoRngCore + Spawner + Metrics + Clock + Storage,
169    B: Block,
170    P: Provider<Scope = Epoch, Scheme: Scheme<B::Commitment>>,
171    FC: Certificates<Commitment = B::Commitment, Scheme = P::Scheme>,
172    FB: Blocks<Block = B>,
173    ES: Epocher,
174    T: Strategy,
175    A: Acknowledgement,
176{
177    /// Create a new application actor.
178    pub async fn init(
179        context: E,
180        finalizations_by_height: FC,
181        finalized_blocks: FB,
182        config: Config<B, P, ES, T>,
183    ) -> (Self, Mailbox<P::Scheme, B>, Height) {
184        // Initialize cache
185        let prunable_config = cache::Config {
186            partition_prefix: format!("{}-cache", config.partition_prefix.clone()),
187            prunable_items_per_section: config.prunable_items_per_section,
188            replay_buffer: config.replay_buffer,
189            key_write_buffer: config.key_write_buffer,
190            value_write_buffer: config.value_write_buffer,
191            key_buffer_pool: config.buffer_pool.clone(),
192        };
193        let cache = cache::Manager::init(
194            context.with_label("cache"),
195            prunable_config,
196            config.block_codec_config.clone(),
197        )
198        .await;
199
200        // Initialize metadata tracking application progress
201        let application_metadata = Metadata::init(
202            context.with_label("application_metadata"),
203            metadata::Config {
204                partition: format!("{}-application-metadata", config.partition_prefix),
205                codec_config: (),
206            },
207        )
208        .await
209        .expect("failed to initialize application metadata");
210        let last_processed_height = application_metadata
211            .get(&LATEST_KEY)
212            .copied()
213            .unwrap_or(Height::zero());
214
215        // Create metrics
216        let finalized_height = Gauge::default();
217        context.register(
218            "finalized_height",
219            "Finalized height of application",
220            finalized_height.clone(),
221        );
222        let processed_height = Gauge::default();
223        context.register(
224            "processed_height",
225            "Processed height of application",
226            processed_height.clone(),
227        );
228        let _ = processed_height.try_set(last_processed_height.get());
229
230        // Initialize mailbox
231        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
232        (
233            Self {
234                context: ContextCell::new(context),
235                mailbox,
236                provider: config.provider,
237                epocher: config.epocher,
238                view_retention_timeout: config.view_retention_timeout,
239                max_repair: config.max_repair,
240                block_codec_config: config.block_codec_config,
241                strategy: config.strategy,
242                last_processed_round: Round::zero(),
243                last_processed_height,
244                pending_ack: None.into(),
245                tip: Height::zero(),
246                block_subscriptions: BTreeMap::new(),
247                cache,
248                application_metadata,
249                finalizations_by_height,
250                finalized_blocks,
251                finalized_height,
252                processed_height,
253            },
254            Mailbox::new(sender),
255            last_processed_height,
256        )
257    }
258
259    /// Start the actor.
260    pub fn start<R, K>(
261        mut self,
262        application: impl Reporter<Activity = Update<B, A>>,
263        buffer: buffered::Mailbox<K, B>,
264        resolver: (mpsc::Receiver<handler::Message<B>>, R),
265    ) -> Handle<()>
266    where
267        R: Resolver<
268            Key = handler::Request<B>,
269            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
270        >,
271        K: PublicKey,
272    {
273        spawn_cell!(self.context, self.run(application, buffer, resolver).await)
274    }
275
276    /// Run the application actor.
277    async fn run<R, K>(
278        mut self,
279        mut application: impl Reporter<Activity = Update<B, A>>,
280        mut buffer: buffered::Mailbox<K, B>,
281        (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
282    ) where
283        R: Resolver<
284            Key = handler::Request<B>,
285            PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
286        >,
287        K: PublicKey,
288    {
289        // Create a local pool for waiter futures.
290        let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
291
292        // Get tip and send to application
293        let tip = self.get_latest().await;
294        if let Some((height, commitment)) = tip {
295            application.report(Update::Tip(height, commitment)).await;
296            self.tip = height;
297            let _ = self.finalized_height.try_set(height.get());
298        }
299
300        // Attempt to dispatch the next finalized block to the application, if it is ready.
301        self.try_dispatch_block(&mut application).await;
302
303        // Attempt to repair any gaps in the finalized blocks archive, if there are any.
304        self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
305            .await;
306
307        loop {
308            // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
309            self.block_subscriptions.retain(|_, bs| {
310                bs.subscribers.retain(|tx| !tx.is_canceled());
311                !bs.subscribers.is_empty()
312            });
313
314            // Select messages
315            select! {
316                // Handle waiter completions first
317                result = waiters.next_completed() => {
318                    let Ok((commitment, block)) = result else {
319                        continue; // Aborted future
320                    };
321                    self.notify_subscribers(commitment, &block).await;
322                },
323                // Handle application acknowledgements next
324                ack = &mut self.pending_ack => {
325                    let PendingAck { height, commitment, .. } = self.pending_ack.take().expect("ack state must be present");
326
327                    match ack {
328                        Ok(()) => {
329                            if let Err(e) = self
330                                .handle_block_processed(height, commitment, &mut resolver)
331                                .await
332                            {
333                                error!(?e, %height, "failed to update application progress");
334                                return;
335                            }
336                            self.try_dispatch_block(&mut application).await;
337                        }
338                        Err(e) => {
339                            error!(?e, %height, "application did not acknowledge block");
340                            return;
341                        }
342                    }
343                },
344                // Handle consensus inputs before backfill or resolver traffic
345                mailbox_message = self.mailbox.next() => {
346                    let Some(message) = mailbox_message else {
347                        info!("mailbox closed, shutting down");
348                        return;
349                    };
350                    match message {
351                        Message::GetInfo { identifier, response } => {
352                            let info = match identifier {
353                                // TODO: Instead of pulling out the entire block, determine the
354                                // height directly from the archive by mapping the commitment to
355                                // the index, which is the same as the height.
356                                BlockID::Commitment(commitment) => self
357                                    .finalized_blocks
358                                    .get(ArchiveID::Key(&commitment))
359                                    .await
360                                    .ok()
361                                    .flatten()
362                                    .map(|b| (b.height(), commitment)),
363                                BlockID::Height(height) => self
364                                    .finalizations_by_height
365                                    .get(ArchiveID::Index(height.get()))
366                                    .await
367                                    .ok()
368                                    .flatten()
369                                    .map(|f| (height, f.proposal.payload)),
370                                BlockID::Latest => self.get_latest().await,
371                            };
372                            response.send_lossy(info);
373                        }
374                        Message::Proposed { round, block } => {
375                            self.cache_verified(round, block.commitment(), block.clone()).await;
376                            let _peers = buffer.broadcast(Recipients::All, block).await;
377                        }
378                        Message::Verified { round, block } => {
379                            self.cache_verified(round, block.commitment(), block).await;
380                        }
381                        Message::Notarization { notarization } => {
382                            let round = notarization.round();
383                            let commitment = notarization.proposal.payload;
384
385                            // Store notarization by view
386                            self.cache.put_notarization(round, commitment, notarization.clone()).await;
387
388                            // Search for block locally, otherwise fetch it remotely
389                            if let Some(block) = self.find_block(&mut buffer, commitment).await {
390                                // If found, persist the block
391                                self.cache_block(round, commitment, block).await;
392                            } else {
393                                debug!(?round, "notarized block missing");
394                                resolver.fetch(Request::<B>::Notarized { round }).await;
395                            }
396                        }
397                        Message::Finalization { finalization } => {
398                            // Cache finalization by round
399                            let round = finalization.round();
400                            let commitment = finalization.proposal.payload;
401                            self.cache.put_finalization(round, commitment, finalization.clone()).await;
402
403                            // Search for block locally, otherwise fetch it remotely
404                            if let Some(block) = self.find_block(&mut buffer, commitment).await {
405                                // If found, persist the block
406                                let height = block.height();
407                                self.finalize(
408                                    height,
409                                    commitment,
410                                    block,
411                                    Some(finalization),
412                                    &mut application,
413                                    &mut buffer,
414                                    &mut resolver,
415                                )
416                                .await;
417                                debug!(?round, %height, "finalized block stored");
418                            } else {
419                                // Otherwise, fetch the block from the network.
420                                debug!(?round, ?commitment, "finalized block missing");
421                                resolver.fetch(Request::<B>::Block(commitment)).await;
422                            }
423                        }
424                        Message::GetBlock { identifier, response } => {
425                            match identifier {
426                                BlockID::Commitment(commitment) => {
427                                    let result = self.find_block(&mut buffer, commitment).await;
428                                    response.send_lossy(result);
429                                }
430                                BlockID::Height(height) => {
431                                    let result = self.get_finalized_block(height).await;
432                                    response.send_lossy(result);
433                                }
434                                BlockID::Latest => {
435                                    let block = match self.get_latest().await {
436                                        Some((_, commitment)) => self.find_block(&mut buffer, commitment).await,
437                                        None => None,
438                                    };
439                                    response.send_lossy(block);
440                                }
441                            }
442                        }
443                        Message::GetFinalization { height, response } => {
444                            let finalization = self.get_finalization_by_height(height).await;
445                            response.send_lossy(finalization);
446                        }
447                        Message::HintFinalized { height, targets } => {
448                            // Skip if height is at or below the floor
449                            if height <= self.last_processed_height {
450                                continue;
451                            }
452
453                            // Skip if finalization is already available locally
454                            if self.get_finalization_by_height(height).await.is_some() {
455                                continue;
456                            }
457
458                            // Trigger a targeted fetch via the resolver
459                            let request = Request::<B>::Finalized { height };
460                            resolver.fetch_targeted(request, targets).await;
461                        }
462                        Message::Subscribe { round, commitment, response } => {
463                            // Check for block locally
464                            if let Some(block) = self.find_block(&mut buffer, commitment).await {
465                                response.send_lossy(block);
466                                continue;
467                            }
468
469                            // We don't have the block locally, so fetch the block from the network
470                            // if we have an associated view. If we only have the digest, don't make
471                            // the request as we wouldn't know when to drop it, and the request may
472                            // never complete if the block is not finalized.
473                            if let Some(round) = round {
474                                if round < self.last_processed_round {
475                                    // At this point, we have failed to find the block locally, and
476                                    // we know that its round is less than the last processed round.
477                                    // This means that something else was finalized in that round,
478                                    // so we drop the response to indicate that the block may never
479                                    // be available.
480                                    continue;
481                                }
482                                // Attempt to fetch the block (with notarization) from the resolver.
483                                // If this is a valid view, this request should be fine to keep open
484                                // until resolution or pruning (even if the oneshot is canceled).
485                                debug!(?round, ?commitment, "requested block missing");
486                                resolver.fetch(Request::<B>::Notarized { round }).await;
487                            }
488
489                            // Register subscriber
490                            debug!(?round, ?commitment, "registering subscriber");
491                            match self.block_subscriptions.entry(commitment) {
492                                Entry::Occupied(mut entry) => {
493                                    entry.get_mut().subscribers.push(response);
494                                }
495                                Entry::Vacant(entry) => {
496                                    let (tx, rx) = oneshot::channel();
497                                    buffer.subscribe_prepared(None, commitment, None, tx).await;
498                                    let aborter = waiters.push(async move {
499                                        (commitment, rx.await.expect("buffer subscriber closed"))
500                                    });
501                                    entry.insert(BlockSubscription {
502                                        subscribers: vec![response],
503                                        _aborter: aborter,
504                                    });
505                                }
506                            }
507                        }
508                        Message::SetFloor { height } => {
509                            if let Some(stored_height) = self.application_metadata.get(&LATEST_KEY) {
510                                if *stored_height >= height {
511                                    warn!(%height, existing = %stored_height, "floor not updated, lower than existing");
512                                    continue;
513                                }
514                            }
515
516                            // Update the processed height
517                            if let Err(err) = self.set_processed_height(height, &mut resolver).await {
518                                error!(?err, %height, "failed to update floor");
519                                return;
520                            }
521
522                            // Drop the pending acknowledgement, if one exists. We must do this to prevent
523                            // an in-process block from being processed that is below the new floor
524                            // updating `last_processed_height`.
525                            self.pending_ack = None.into();
526
527                            // Prune the finalized block and finalization certificate archives in parallel.
528                            if let Err(err) = try_join!(
529                                // Prune the finalized blocks archive
530                                async {
531                                    self.finalized_blocks.prune(height).await.map_err(Box::new)?;
532                                    Ok::<_, BoxedError>(())
533                                },
534                                // Prune the finalization certificate archive
535                                async {
536                                    self.finalizations_by_height
537                                        .prune(height)
538                                        .await
539                                        .map_err(Box::new)?;
540                                    Ok::<_, BoxedError>(())
541                                }
542                            ) {
543                                error!(?err, %height, "failed to prune finalized archives");
544                                return;
545                            }
546                        }
547                    }
548                },
549                // Handle resolver messages last
550                message = resolver_rx.next() => {
551                    let Some(message) = message else {
552                        info!("handler closed, shutting down");
553                        return;
554                    };
555                    match message {
556                        handler::Message::Produce { key, response } => {
557                            match key {
558                                Request::Block(commitment) => {
559                                    // Check for block locally
560                                    let Some(block) = self.find_block(&mut buffer, commitment).await else {
561                                        debug!(?commitment, "block missing on request");
562                                        continue;
563                                    };
564                                    response.send_lossy(block.encode());
565                                }
566                                Request::Finalized { height } => {
567                                    // Get finalization
568                                    let Some(finalization) = self.get_finalization_by_height(height).await else {
569                                        debug!(%height, "finalization missing on request");
570                                        continue;
571                                    };
572
573                                    // Get block
574                                    let Some(block) = self.get_finalized_block(height).await else {
575                                        debug!(%height, "finalized block missing on request");
576                                        continue;
577                                    };
578
579                                    // Send finalization
580                                    response.send_lossy((finalization, block).encode());
581                                }
582                                Request::Notarized { round } => {
583                                    // Get notarization
584                                    let Some(notarization) = self.cache.get_notarization(round).await else {
585                                        debug!(?round, "notarization missing on request");
586                                        continue;
587                                    };
588
589                                    // Get block
590                                    let commitment = notarization.proposal.payload;
591                                    let Some(block) = self.find_block(&mut buffer, commitment).await else {
592                                        debug!(?commitment, "block missing on request");
593                                        continue;
594                                    };
595                                    response.send_lossy((notarization, block).encode());
596                                }
597                            }
598                        },
599                        handler::Message::Deliver { key, value, response } => {
600                            match key {
601                                Request::Block(commitment) => {
602                                    // Parse block
603                                    let Ok(block) = B::decode_cfg(value.as_ref(), &self.block_codec_config) else {
604                                        response.send_lossy(false);
605                                        continue;
606                                    };
607
608                                    // Validation
609                                    if block.commitment() != commitment {
610                                        response.send_lossy(false);
611                                        continue;
612                                    }
613
614                                    // Persist the block, also persisting the finalization if we have it
615                                    let height = block.height();
616                                    let finalization = self.cache.get_finalization_for(commitment).await;
617                                    self.finalize(
618                                        height,
619                                        commitment,
620                                        block,
621                                        finalization,
622                                        &mut application,
623                                        &mut buffer,
624                                        &mut resolver,
625                                    )
626                                    .await;
627                                    debug!(?commitment, %height, "received block");
628                                    response.send_lossy(true);
629                                },
630                                Request::Finalized { height } => {
631                                    let Some(bounds) = self.epocher.containing(height) else {
632                                        response.send_lossy(false);
633                                        continue;
634                                    };
635                                    let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
636                                        response.send_lossy(false);
637                                        continue;
638                                    };
639
640                                    // Parse finalization
641                                    let Ok((finalization, block)) =
642                                        <(Finalization<P::Scheme, B::Commitment>, B)>::decode_cfg(
643                                            value,
644                                            &(scheme.certificate_codec_config(), self.block_codec_config.clone()),
645                                        )
646                                    else {
647                                        response.send_lossy(false);
648                                        continue;
649                                    };
650
651                                    // Validation
652                                    if block.height() != height
653                                        || finalization.proposal.payload != block.commitment()
654                                        || !finalization.verify(&mut self.context, &scheme, &self.strategy)
655                                    {
656                                        response.send_lossy(false);
657                                        continue;
658                                    }
659
660                                    // Valid finalization received
661                                    debug!(%height, "received finalization");
662                                    response.send_lossy(true);
663                                    self.finalize(
664                                        height,
665                                        block.commitment(),
666                                        block,
667                                        Some(finalization),
668                                        &mut application,
669                                        &mut buffer,
670                                        &mut resolver,
671                                    )
672                                    .await;
673                                },
674                                Request::Notarized { round } => {
675                                    let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
676                                        response.send_lossy(false);
677                                        continue;
678                                    };
679
680                                    // Parse notarization
681                                    let Ok((notarization, block)) =
682                                        <(Notarization<P::Scheme, B::Commitment>, B)>::decode_cfg(
683                                            value,
684                                            &(scheme.certificate_codec_config(), self.block_codec_config.clone()),
685                                        )
686                                    else {
687                                        response.send_lossy(false);
688                                        continue;
689                                    };
690
691                                    // Validation
692                                    if notarization.round() != round
693                                        || notarization.proposal.payload != block.commitment()
694                                        || !notarization.verify(&mut self.context, &scheme, &self.strategy)
695                                    {
696                                        response.send_lossy(false);
697                                        continue;
698                                    }
699
700                                    // Valid notarization received
701                                    response.send_lossy(true);
702                                    let commitment = block.commitment();
703                                    debug!(?round, ?commitment, "received notarization");
704
705                                    // If there exists a finalization certificate for this block, we
706                                    // should finalize it. While not necessary, this could finalize
707                                    // the block faster in the case where a notarization then a
708                                    // finalization is received via the consensus engine and we
709                                    // resolve the request for the notarization before we resolve
710                                    // the request for the block.
711                                    let height = block.height();
712                                    if let Some(finalization) = self.cache.get_finalization_for(commitment).await {
713                                        self.finalize(
714                                            height,
715                                            commitment,
716                                            block.clone(),
717                                            Some(finalization),
718                                            &mut application,
719                                            &mut buffer,
720                                            &mut resolver,
721                                        )
722                                        .await;
723                                    }
724
725                                    // Cache the notarization and block
726                                    self.cache_block(round, commitment, block).await;
727                                    self.cache.put_notarization(round, commitment, notarization).await;
728                                },
729                            }
730                        },
731                    }
732                },
733            }
734        }
735    }
736
737    /// Returns a scheme suitable for verifying certificates at the given epoch.
738    ///
739    /// Prefers a certificate verifier if available, otherwise falls back
740    /// to the scheme for the given epoch.
741    fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
742        self.provider.all().or_else(|| self.provider.scoped(epoch))
743    }
744
745    // -------------------- Waiters --------------------
746
747    /// Notify any subscribers for the given commitment with the provided block.
748    async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
749        if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
750            for subscriber in bs.subscribers.drain(..) {
751                subscriber.send_lossy(block.clone());
752            }
753        }
754    }
755
756    // -------------------- Application Dispatch --------------------
757
758    /// Attempt to dispatch the next finalized block to the application if ready.
759    async fn try_dispatch_block(
760        &mut self,
761        application: &mut impl Reporter<Activity = Update<B, A>>,
762    ) {
763        if self.pending_ack.is_some() {
764            return;
765        }
766
767        let next_height = self.last_processed_height.next();
768        let Some(block) = self.get_finalized_block(next_height).await else {
769            return;
770        };
771        assert_eq!(
772            block.height(),
773            next_height,
774            "finalized block height mismatch"
775        );
776
777        let (height, commitment) = (block.height(), block.commitment());
778        let (ack, ack_waiter) = A::handle();
779        application.report(Update::Block(block, ack)).await;
780        self.pending_ack.replace(PendingAck {
781            height,
782            commitment,
783            receiver: ack_waiter,
784        });
785    }
786
787    /// Handle acknowledgement from the application that a block has been processed.
788    async fn handle_block_processed(
789        &mut self,
790        height: Height,
791        commitment: B::Commitment,
792        resolver: &mut impl Resolver<Key = Request<B>>,
793    ) -> Result<(), metadata::Error> {
794        // Update the processed height
795        self.set_processed_height(height, resolver).await?;
796
797        // Cancel any useless requests
798        resolver.cancel(Request::<B>::Block(commitment)).await;
799
800        if let Some(finalization) = self.get_finalization_by_height(height).await {
801            // Trail the previous processed finalized block by the timeout
802            let lpr = self.last_processed_round;
803            let prune_round = Round::new(
804                lpr.epoch(),
805                lpr.view().saturating_sub(self.view_retention_timeout),
806            );
807
808            // Prune archives
809            self.cache.prune(prune_round).await;
810
811            // Update the last processed round
812            let round = finalization.round();
813            self.last_processed_round = round;
814
815            // Cancel useless requests
816            resolver
817                .retain(Request::<B>::Notarized { round }.predicate())
818                .await;
819        }
820
821        Ok(())
822    }
823
824    // -------------------- Prunable Storage --------------------
825
826    /// Add a verified block to the prunable archive.
827    async fn cache_verified(&mut self, round: Round, commitment: B::Commitment, block: B) {
828        self.notify_subscribers(commitment, &block).await;
829        self.cache.put_verified(round, commitment, block).await;
830    }
831
832    /// Add a notarized block to the prunable archive.
833    async fn cache_block(&mut self, round: Round, commitment: B::Commitment, block: B) {
834        self.notify_subscribers(commitment, &block).await;
835        self.cache.put_block(round, commitment, block).await;
836    }
837
838    // -------------------- Immutable Storage --------------------
839
840    /// Get a finalized block from the immutable archive.
841    async fn get_finalized_block(&self, height: Height) -> Option<B> {
842        match self
843            .finalized_blocks
844            .get(ArchiveID::Index(height.get()))
845            .await
846        {
847            Ok(block) => block,
848            Err(e) => panic!("failed to get block: {e}"),
849        }
850    }
851
852    /// Get a finalization from the archive by height.
853    async fn get_finalization_by_height(
854        &self,
855        height: Height,
856    ) -> Option<Finalization<P::Scheme, B::Commitment>> {
857        match self
858            .finalizations_by_height
859            .get(ArchiveID::Index(height.get()))
860            .await
861        {
862            Ok(finalization) => finalization,
863            Err(e) => panic!("failed to get finalization: {e}"),
864        }
865    }
866
867    /// Add a finalized block, and optionally a finalization, to the archive, and
868    /// attempt to identify + repair any gaps in the archive.
869    #[allow(clippy::too_many_arguments)]
870    async fn finalize(
871        &mut self,
872        height: Height,
873        commitment: B::Commitment,
874        block: B,
875        finalization: Option<Finalization<P::Scheme, B::Commitment>>,
876        application: &mut impl Reporter<Activity = Update<B, A>>,
877        buffer: &mut buffered::Mailbox<impl PublicKey, B>,
878        resolver: &mut impl Resolver<Key = Request<B>>,
879    ) {
880        self.store_finalization(height, commitment, block, finalization, application)
881            .await;
882
883        self.try_repair_gaps(buffer, resolver, application).await;
884    }
885
886    /// Add a finalized block, and optionally a finalization, to the archive.
887    ///
888    /// After persisting the block, attempt to dispatch the next contiguous block to the
889    /// application.
890    async fn store_finalization(
891        &mut self,
892        height: Height,
893        commitment: B::Commitment,
894        block: B,
895        finalization: Option<Finalization<P::Scheme, B::Commitment>>,
896        application: &mut impl Reporter<Activity = Update<B, A>>,
897    ) {
898        self.notify_subscribers(commitment, &block).await;
899
900        // In parallel, update the finalized blocks and finalizations archives
901        if let Err(e) = try_join!(
902            // Update the finalized blocks archive
903            async {
904                self.finalized_blocks.put(block).await.map_err(Box::new)?;
905                Ok::<_, BoxedError>(())
906            },
907            // Update the finalizations archive (if provided)
908            async {
909                if let Some(finalization) = finalization {
910                    self.finalizations_by_height
911                        .put(height, commitment, finalization)
912                        .await
913                        .map_err(Box::new)?;
914                }
915                Ok::<_, BoxedError>(())
916            }
917        ) {
918            panic!("failed to finalize: {e}");
919        }
920
921        // Update metrics and send tip update to application
922        if height > self.tip {
923            application.report(Update::Tip(height, commitment)).await;
924            self.tip = height;
925            let _ = self.finalized_height.try_set(height.get());
926        }
927
928        self.try_dispatch_block(application).await;
929    }
930
931    /// Get the latest finalized block information (height and commitment tuple).
932    ///
933    /// Blocks are only finalized directly with a finalization or indirectly via a descendant
934    /// block's finalization. Thus, the highest known finalized block must itself have a direct
935    /// finalization.
936    ///
937    /// We return the height and commitment using the highest known finalization that we know the
938    /// block height for. While it's possible that we have a later finalization, if we do not have
939    /// the full block for that finalization, we do not know it's height and therefore it would not
940    /// yet be found in the `finalizations_by_height` archive. While not checked explicitly, we
941    /// should have the associated block (in the `finalized_blocks` archive) for the information
942    /// returned.
943    async fn get_latest(&mut self) -> Option<(Height, B::Commitment)> {
944        let height = self.finalizations_by_height.last_index()?;
945        let finalization = self
946            .get_finalization_by_height(height)
947            .await
948            .expect("finalization missing");
949        Some((height, finalization.proposal.payload))
950    }
951
952    // -------------------- Mixed Storage --------------------
953
954    /// Looks for a block anywhere in local storage.
955    async fn find_block<K: PublicKey>(
956        &mut self,
957        buffer: &mut buffered::Mailbox<K, B>,
958        commitment: B::Commitment,
959    ) -> Option<B> {
960        // Check buffer.
961        if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() {
962            return Some(block);
963        }
964        // Check verified / notarized blocks via cache manager.
965        if let Some(block) = self.cache.find_block(commitment).await {
966            return Some(block);
967        }
968        // Check finalized blocks.
969        match self.finalized_blocks.get(ArchiveID::Key(&commitment)).await {
970            Ok(block) => block, // may be None
971            Err(e) => panic!("failed to get block: {e}"),
972        }
973    }
974
975    /// Attempt to repair any identified gaps in the finalized blocks archive. The total
976    /// number of missing heights that can be repaired at once is bounded by `self.max_repair`,
977    /// though multiple gaps may be spanned.
978    async fn try_repair_gaps<K: PublicKey>(
979        &mut self,
980        buffer: &mut buffered::Mailbox<K, B>,
981        resolver: &mut impl Resolver<Key = Request<B>>,
982        application: &mut impl Reporter<Activity = Update<B, A>>,
983    ) {
984        let start = self.last_processed_height.next();
985        'cache_repair: loop {
986            let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
987                // No gaps detected
988                return;
989            };
990
991            // Attempt to repair the gap backwards from the end of the gap, using
992            // blocks from our local storage.
993            let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
994                panic!("gapped block missing that should exist: {gap_end}");
995            };
996
997            // Compute the lower bound of the recursive repair. `gap_start` is `Some`
998            // if `start` is not in a gap. We add one to it to ensure we don't
999            // re-persist it to the database in the repair loop below.
1000            let gap_start = gap_start.map(|s| s.next()).unwrap_or(start);
1001
1002            // Iterate backwards, repairing blocks as we go.
1003            while cursor.height() > gap_start {
1004                let commitment = cursor.parent();
1005                if let Some(block) = self.find_block(buffer, commitment).await {
1006                    let finalization = self.cache.get_finalization_for(commitment).await;
1007                    self.store_finalization(
1008                        block.height(),
1009                        commitment,
1010                        block.clone(),
1011                        finalization,
1012                        application,
1013                    )
1014                    .await;
1015                    debug!(height = %block.height(), "repaired block");
1016                    cursor = block;
1017                } else {
1018                    // Request the next missing block digest
1019                    resolver.fetch(Request::<B>::Block(commitment)).await;
1020                    break 'cache_repair;
1021                }
1022            }
1023        }
1024
1025        // Request any finalizations for missing items in the archive, up to
1026        // the `max_repair` quota. This may help shrink the size of the gap
1027        // closest to the application's processed height if finalizations
1028        // for the requests' heights exist. If not, we rely on the recursive
1029        // digest fetches above.
1030        let missing_items = self
1031            .finalized_blocks
1032            .missing_items(start, self.max_repair.get());
1033        let requests = missing_items
1034            .into_iter()
1035            .map(|height| Request::<B>::Finalized { height })
1036            .collect::<Vec<_>>();
1037        if !requests.is_empty() {
1038            resolver.fetch_all(requests).await
1039        }
1040    }
1041
1042    /// Sets the processed height in storage, metrics, and in-memory state. Also cancels any
1043    /// outstanding requests below the new processed height.
1044    async fn set_processed_height(
1045        &mut self,
1046        height: Height,
1047        resolver: &mut impl Resolver<Key = Request<B>>,
1048    ) -> Result<(), metadata::Error> {
1049        self.application_metadata
1050            .put_sync(LATEST_KEY.clone(), height)
1051            .await?;
1052        self.last_processed_height = height;
1053        let _ = self
1054            .processed_height
1055            .try_set(self.last_processed_height.get());
1056
1057        // Cancel any existing requests below the new floor.
1058        resolver
1059            .retain(Request::<B>::Finalized { height }.predicate())
1060            .await;
1061
1062        Ok(())
1063    }
1064}