commonware_consensus/marshal/
actor.rs

1use super::{
2    config::Config,
3    finalizer::Finalizer,
4    ingress::{
5        handler::{self, Handler, Request},
6        mailbox::{Mailbox, Message},
7        orchestrator::{Orchestration, Orchestrator},
8    },
9};
10use crate::{
11    threshold_simplex::types::{Finalization, Notarization},
12    Block, Reporter,
13};
14use commonware_broadcast::{buffered, Broadcaster};
15use commonware_codec::{Codec, Decode, Encode};
16use commonware_cryptography::{bls12381::primitives::variant::Variant, PublicKey};
17use commonware_macros::select;
18use commonware_p2p::{utils::requester, Receiver, Recipients, Sender};
19use commonware_resolver::{
20    p2p::{self, Coordinator},
21    Resolver,
22};
23use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
24use commonware_storage::{
25    archive::{self, immutable, prunable, Archive as _, Identifier},
26    translator::TwoCap,
27};
28use commonware_utils::futures::{AbortablePool, Aborter};
29use futures::{
30    channel::{mpsc, oneshot},
31    try_join, StreamExt,
32};
33use governor::{clock::Clock as GClock, Quota};
34use prometheus_client::metrics::gauge::Gauge;
35use rand::Rng;
36use std::{
37    collections::{btree_map::Entry, BTreeMap},
38    marker::PhantomData,
39    time::{Duration, Instant},
40};
41use tracing::{debug, info, warn};
42
43/// A struct that holds multiple subscriptions for a block.
44struct BlockSubscription<B: Block> {
45    // The subscribers that are waiting for the block
46    subscribers: Vec<oneshot::Sender<B>>,
47    // Aborter that aborts the waiter future when dropped
48    _aborter: Aborter,
49}
50
51/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
52/// receiving notarizations and finalizations from consensus, and reconstructing a total order
53/// of blocks.
54///
55/// The actor is designed to be used in a view-based model. Each view corresponds to a
56/// potential block in the chain. The actor will only finalize a block if it has a
57/// corresponding finalization.
58///
59/// The actor also provides a backfill mechanism for missing blocks. If the actor receives a
60/// finalization for a block that is ahead of its current view, it will request the missing blocks
61/// from its peers. This ensures that the actor can catch up to the rest of the network if it falls
62/// behind.
63pub struct Actor<
64    B: Block,
65    R: Rng + Spawner + Metrics + Clock + GClock + Storage,
66    V: Variant,
67    P: PublicKey,
68    Z: Coordinator<PublicKey = P>,
69> {
70    // ---------- Context ----------
71    context: R,
72
73    // ---------- Message Passing ----------
74    // Coordinator
75    coordinator: Z,
76    // Mailbox
77    mailbox: mpsc::Receiver<Message<V, B>>,
78
79    // ---------- Configuration ----------
80    // Public key
81    public_key: P,
82    // Identity
83    identity: V::Public,
84    // Mailbox size
85    mailbox_size: usize,
86    // Backfill quota
87    backfill_quota: Quota,
88    // Unique application namespace
89    namespace: Vec<u8>,
90    /// Minimum number of views to retain temporary data after the application processes a block
91    view_retention_timeout: u64,
92    // Maximum number of blocks to repair at once
93    max_repair: u64,
94    // Codec configuration
95    codec_config: B::Cfg,
96    // Partition prefix
97    partition_prefix: String,
98
99    // ---------- State ----------
100    // Last view processed
101    last_processed_view: u64,
102
103    // Outstanding subscriptions for blocks
104    block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
105
106    // ---------- Prunable Storage ----------
107    // Verified blocks stored by view
108    verified_blocks: prunable::Archive<TwoCap, R, B::Commitment, B>,
109    // Notarized blocks stored by view. Stored separately from the verified blocks since they may
110    // be different (e.g. from an equivocation).
111    notarized_blocks: prunable::Archive<TwoCap, R, B::Commitment, B>,
112    // Notarizations stored by view
113    notarizations_by_view:
114        prunable::Archive<TwoCap, R, B::Commitment, Notarization<V, B::Commitment>>,
115    // Finalizations stored by view
116    finalizations_by_view:
117        prunable::Archive<TwoCap, R, B::Commitment, Finalization<V, B::Commitment>>,
118
119    // ---------- Immutable Storage ----------
120    // Finalizations stored by height
121    finalizations_by_height: immutable::Archive<R, B::Commitment, Finalization<V, B::Commitment>>,
122    // Finalized blocks stored by height
123    finalized_blocks: immutable::Archive<R, B::Commitment, B>,
124
125    // ---------- Metrics ----------
126    // Latest height metric
127    finalized_height: Gauge,
128    // Latest processed height
129    processed_height: Gauge,
130
131    // ---------- Phantom data ----------
132    _variant: PhantomData<V>,
133}
134
135impl<
136        B: Block,
137        R: Rng + Spawner + Metrics + Clock + GClock + Storage,
138        V: Variant,
139        P: PublicKey,
140        Z: Coordinator<PublicKey = P>,
141    > Actor<B, R, V, P, Z>
142{
143    /// Create a new application actor.
144    pub async fn init(context: R, config: Config<V, P, Z, B>) -> (Self, Mailbox<V, B>) {
145        // Initialize prunable
146        let verified_blocks = Self::init_prunable_archive(
147            &context,
148            "verified_blocks",
149            &config,
150            config.codec_config.clone(),
151        )
152        .await;
153        let notarized_blocks = Self::init_prunable_archive(
154            &context,
155            "notarized_blocks",
156            &config,
157            config.codec_config.clone(),
158        )
159        .await;
160        let notarizations_by_view =
161            Self::init_prunable_archive(&context, "notarizations_by_view", &config, ()).await;
162        let finalizations_by_view =
163            Self::init_prunable_archive(&context, "finalizations_by_view", &config, ()).await;
164
165        // Initialize finalizations by height
166        let start = Instant::now();
167        let finalizations_by_height = immutable::Archive::init(
168            context.with_label("finalizations_by_height"),
169            immutable::Config {
170                metadata_partition: format!(
171                    "{}-finalizations-by-height-metadata",
172                    config.partition_prefix
173                ),
174                freezer_table_partition: format!(
175                    "{}-finalizations-by-height-freezer-table",
176                    config.partition_prefix
177                ),
178                freezer_table_initial_size: config.freezer_table_initial_size,
179                freezer_table_resize_frequency: config.freezer_table_resize_frequency,
180                freezer_table_resize_chunk_size: config.freezer_table_resize_chunk_size,
181                freezer_journal_partition: format!(
182                    "{}-finalizations-by-height-freezer-journal",
183                    config.partition_prefix
184                ),
185                freezer_journal_target_size: config.freezer_journal_target_size,
186                freezer_journal_compression: config.freezer_journal_compression,
187                freezer_journal_buffer_pool: config.freezer_journal_buffer_pool.clone(),
188                ordinal_partition: format!(
189                    "{}-finalizations-by-height-ordinal",
190                    config.partition_prefix
191                ),
192                items_per_section: config.immutable_items_per_section,
193                codec_config: (),
194                replay_buffer: config.replay_buffer,
195                write_buffer: config.write_buffer,
196            },
197        )
198        .await
199        .expect("failed to initialize finalizations by height archive");
200        info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
201
202        // Initialize finalized blocks
203        let start = Instant::now();
204        let finalized_blocks = immutable::Archive::init(
205            context.with_label("finalized_blocks"),
206            immutable::Config {
207                metadata_partition: format!(
208                    "{}-finalized_blocks-metadata",
209                    config.partition_prefix
210                ),
211                freezer_table_partition: format!(
212                    "{}-finalized_blocks-freezer-table",
213                    config.partition_prefix
214                ),
215                freezer_table_initial_size: config.freezer_table_initial_size,
216                freezer_table_resize_frequency: config.freezer_table_resize_frequency,
217                freezer_table_resize_chunk_size: config.freezer_table_resize_chunk_size,
218                freezer_journal_partition: format!(
219                    "{}-finalized_blocks-freezer-journal",
220                    config.partition_prefix
221                ),
222                freezer_journal_target_size: config.freezer_journal_target_size,
223                freezer_journal_compression: config.freezer_journal_compression,
224                freezer_journal_buffer_pool: config.freezer_journal_buffer_pool,
225                ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
226                items_per_section: config.immutable_items_per_section,
227                codec_config: config.codec_config.clone(),
228                replay_buffer: config.replay_buffer,
229                write_buffer: config.write_buffer,
230            },
231        )
232        .await
233        .expect("failed to initialize finalized blocks archive");
234        info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
235
236        // Create metrics
237        let finalized_height = Gauge::default();
238        context.register(
239            "finalized_height",
240            "Finalized height of application",
241            finalized_height.clone(),
242        );
243        let processed_height = Gauge::default();
244        context.register(
245            "processed_height",
246            "Processed height of application",
247            processed_height.clone(),
248        );
249
250        // Initialize mailbox
251        let (sender, mailbox) = mpsc::channel(config.mailbox_size);
252        (
253            Self {
254                context,
255
256                coordinator: config.coordinator,
257                mailbox,
258
259                public_key: config.public_key,
260                identity: config.identity,
261                mailbox_size: config.mailbox_size,
262                backfill_quota: config.backfill_quota,
263                namespace: config.namespace.clone(),
264                view_retention_timeout: config.view_retention_timeout,
265                max_repair: config.max_repair,
266                codec_config: config.codec_config.clone(),
267                partition_prefix: config.partition_prefix,
268
269                last_processed_view: 0,
270                block_subscriptions: BTreeMap::new(),
271
272                verified_blocks,
273                notarized_blocks,
274                notarizations_by_view,
275                finalizations_by_view,
276                finalizations_by_height,
277                finalized_blocks,
278
279                finalized_height,
280                processed_height,
281
282                _variant: PhantomData,
283            },
284            Mailbox::new(sender),
285        )
286    }
287
288    /// Start the actor.
289    pub fn start(
290        mut self,
291        application: impl Reporter<Activity = B>,
292        buffer: buffered::Mailbox<P, B>,
293        backfill: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
294    ) -> Handle<()> {
295        self.context.spawn_ref()(self.run(application, buffer, backfill))
296    }
297
298    /// Run the application actor.
299    async fn run(
300        mut self,
301        application: impl Reporter<Activity = B>,
302        mut buffer: buffered::Mailbox<P, B>,
303        backfill: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
304    ) {
305        // Initialize resolvers
306        let (mut resolver_rx, mut resolver) = self.init_resolver(backfill);
307
308        // Process all finalized blocks in order (fetching any that are missing)
309        let (mut notifier_tx, notifier_rx) = mpsc::channel::<()>(1);
310        let (orchestrator_sender, mut orchestrator_receiver) = mpsc::channel(self.mailbox_size);
311        let orchestrator = Orchestrator::new(orchestrator_sender);
312        let finalizer = Finalizer::new(
313            self.context.with_label("finalizer"),
314            self.partition_prefix.clone(),
315            application,
316            orchestrator,
317            notifier_rx,
318        )
319        .await;
320        self.context
321            .with_label("finalizer")
322            .spawn(|_| finalizer.run());
323
324        // Create a local pool for waiter futures
325        let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
326
327        // Handle messages
328        loop {
329            // Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
330            self.block_subscriptions.retain(|_, bs| {
331                bs.subscribers.retain(|tx| !tx.is_canceled());
332                !bs.subscribers.is_empty()
333            });
334
335            // Select messages
336            select! {
337                // Handle waiter completions first
338                result = waiters.next_completed() => {
339                    let Ok((commitment, block)) = result else {
340                        continue; // Aborted future
341                    };
342                    self.notify_subscribers(commitment, &block).await;
343                },
344                // Handle consensus before finalizer or backfiller
345                mailbox_message = self.mailbox.next() => {
346                    let Some(message) = mailbox_message else {
347                        info!("mailbox closed, shutting down");
348                        return;
349                    };
350                    match message {
351                        Message::Broadcast { block } => {
352                            let ack = buffer.broadcast(Recipients::All, block).await;
353                            drop(ack);
354                        }
355                        Message::Verified { view, block } => {
356                            self.put_verified_block(view, block.commitment(), block).await;
357                        }
358                        Message::Notarization { notarization } => {
359                            let view = notarization.proposal.view;
360                            let commitment = notarization.proposal.payload;
361
362                            // Store notarization by view
363                            self.put_notarization_by_view(view, commitment, notarization.clone()).await;
364
365                            // Search for block locally, otherwise fetch it remotely
366                            if let Some(block) = self.find_block(&mut buffer, commitment).await {
367                                // If found, persist the block
368                                self.put_notarized_block(view, commitment, block).await;
369                                continue;
370                            } else {
371                                debug!(view, "notarized block missing");
372                                resolver.fetch(Request::<B>::Notarized { view }).await;
373                            }
374                        }
375                        Message::Finalization { finalization } => {
376                            // Store finalization by view
377                            let view = finalization.proposal.view;
378                            let commitment = finalization.proposal.payload;
379                            self.put_finalization_by_view(view, commitment, finalization.clone()).await;
380
381                            // Search for block locally, otherwise fetch it remotely
382                            if let Some(block) = self.find_block(&mut buffer, commitment).await {
383                                // If found, persist the block
384                                let height = block.height();
385                                self.put_finalized_block(height, commitment, block, &mut notifier_tx).await;
386                                debug!(view, height, "finalized block stored");
387                                self.finalized_height.set(height as i64);
388
389                                // Cancel useless requests
390                                resolver.retain(Request::<B>::Notarized { view }.predicate()).await;
391                            } else {
392                                // Otherwise, fetch the block from the network
393                                debug!(view, ?commitment, "finalized block missing");
394                                resolver.fetch(Request::<B>::Block(commitment)).await;
395                            }
396                        }
397                        Message::Get { commitment, response } => {
398                            // Check for block locally
399                            let result = self.find_block(&mut buffer, commitment).await;
400                            let _ = response.send(result);
401                        }
402                        Message::Subscribe { view, commitment, response } => {
403                            // Check for block locally
404                            if let Some(block) = self.find_block(&mut buffer, commitment).await {
405                                let _ = response.send(block);
406                                continue;
407                            }
408
409                            // We don't have the block locally, so fetch the block from the network
410                            // if we have an associated view. If we only have the digest, don't make
411                            // the request as we wouldn't know when to drop it, and the request may
412                            // never complete if the block is not finalized.
413                            if let Some(view) = view {
414                                // Fetch from network
415                                //
416                                // If this is a valid view, this request should be fine to "keep
417                                // open" even if the oneshot is cancelled.
418                                debug!(view, ?commitment, "requested block missing");
419                                resolver.fetch(Request::<B>::Notarized { view }).await;
420                            }
421
422                            // Register subscriber
423                            debug!(view, ?commitment, "registering subscriber");
424                            match self.block_subscriptions.entry(commitment) {
425                                Entry::Occupied(mut entry) => {
426                                    entry.get_mut().subscribers.push(response);
427                                }
428                                Entry::Vacant(entry) => {
429                                    let (tx, rx) = oneshot::channel();
430                                    buffer.subscribe_prepared(None, commitment, None, tx).await;
431                                    let aborter = waiters.push(async move {
432                                        (commitment, rx.await.expect("buffer subscriber closed"))
433                                    });
434                                    entry.insert(BlockSubscription {
435                                        subscribers: vec![response],
436                                        _aborter: aborter,
437                                    });
438                                }
439                            }
440                        }
441                    }
442                },
443                // Handle finalizer messages next
444                message = orchestrator_receiver.next() => {
445                    let Some(message) = message else {
446                        info!("orchestrator closed, shutting down");
447                        return;
448                    };
449                    match message {
450                        Orchestration::Get { height, result } => {
451                            // Check if in blocks
452                            let block = self.get_finalized_block(Identifier::Index(height)).await;
453                            result.send(block).unwrap_or_else(|_| warn!(?height, "Failed to send block to orchestrator"));
454                        }
455                        Orchestration::Processed { height, digest } => {
456                            // Update metrics
457                            self.processed_height.set(height as i64);
458
459                            // Cancel any outstanding requests (by height and by digest)
460                            resolver.cancel(Request::<B>::Block(digest)).await;
461                            resolver.retain(Request::<B>::Finalized { height }.predicate()).await;
462
463                            // If finalization exists, prune the archives
464                            if let Some(finalization) = self.get_finalization_by_height(Identifier::Index(height)).await {
465                                // Trail the previous processed finalized block by the timeout
466                                let min_view = self.last_processed_view.saturating_sub(self.view_retention_timeout);
467
468                                // Prune archives
469                                match try_join!(
470                                    self.verified_blocks.prune(min_view),
471                                    self.notarized_blocks.prune(min_view),
472                                    self.notarizations_by_view.prune(min_view),
473                                    self.finalizations_by_view.prune(min_view),
474                                ) {
475                                    Ok(_) => debug!(min_view, "pruned archives"),
476                                    Err(e) => panic!("failed to prune archives: {e}"),
477                                }
478
479                                // Update the last processed height and view
480                                self.last_processed_view = finalization.proposal.view;
481                            }
482                        }
483                        Orchestration::Repair { height } => {
484                            // Find the end of the "gap" of missing blocks, starting at `height`
485                            let (_, Some(gap_end)) = self.finalized_blocks.next_gap(height) else {
486                                // No gap found; height-1 is the last known finalized block
487                                continue;
488                            };
489                            assert!(gap_end > height, "gap end must be greater than height");
490
491                            // Attempt to repair the gap backwards from the end of the gap, using
492                            // blocks from our local storage.
493                            let Some(mut cursor) = self.get_finalized_block(Identifier::Index(gap_end)).await else {
494                                panic!("gapped block missing that should exist: {gap_end}");
495                            };
496
497                            // Iterate backwards, repairing blocks as we go.
498                            while cursor.height() > height {
499                                let commitment = cursor.parent();
500                                if let Some(block) = self.find_block(&mut buffer, commitment).await {
501                                    self.put_finalized_block(block.height(), commitment, block.clone(), &mut notifier_tx).await;
502                                    debug!(height = block.height(), "repaired block");
503                                    cursor = block;
504                                } else {
505                                    // Request the next missing block digest
506                                    resolver.fetch(Request::<B>::Block(commitment)).await;
507                                    break;
508                                }
509                            }
510
511                            // If we haven't fully repaired the gap, then also request any possible
512                            // finalizations for the blocks in the remaining gap. This may help
513                            // shrink the size of the gap if finalizations for the requests heights
514                            // exist. If not, we rely on the recursive digest fetch above.
515                            let gap_start = height;
516                            let gap_end = std::cmp::min(cursor.height(), gap_start.saturating_add(self.max_repair));
517                            debug!(gap_start, gap_end, "requesting any finalized blocks");
518                            for height in gap_start..gap_end {
519                                resolver.fetch(Request::<B>::Finalized { height }).await;
520                            }
521                        }
522                    }
523                },
524                // Handle resolver messages last
525                message = resolver_rx.next() => {
526                    let Some(message) = message else {
527                        info!("handler closed, shutting down");
528                        return;
529                    };
530                    match message {
531                        handler::Message::Produce { key, response } => {
532                            match key {
533                                Request::Block(commitment) => {
534                                    // Check for block locally
535                                    let Some(block) = self.find_block(&mut buffer, commitment).await else {
536                                        debug!(?commitment, "block missing on request");
537                                        continue;
538                                    };
539                                    let _ = response.send(block.encode().into());
540                                }
541                                Request::Finalized { height } => {
542                                    // Get finalization
543                                    let Some(finalization) = self.get_finalization_by_height(Identifier::Index(height)).await else {
544                                        debug!(height, "finalization missing on request");
545                                        continue;
546                                    };
547
548                                    // Get block
549                                    let Some(block) = self.get_finalized_block(Identifier::Index(height)).await else {
550                                        debug!(height, "finalized block missing on request");
551                                        continue;
552                                    };
553
554                                    // Send finalization
555                                    let _ = response.send((finalization, block).encode().into());
556                                }
557                                Request::Notarized { view } => {
558                                    // Get notarization
559                                    let Some(notarization) = self.get_notarization_by_view(Identifier::Index(view)).await else {
560                                        debug!(view, "notarization missing on request");
561                                        continue;
562                                    };
563
564                                    // Get block
565                                    let commitment = notarization.proposal.payload;
566                                    let Some(block) = self.find_block(&mut buffer, commitment).await else {
567                                        debug!(?commitment, "block missing on request");
568                                        continue;
569                                    };
570                                    let _ = response.send((notarization, block).encode().into());
571                                }
572                            }
573                        },
574                        handler::Message::Deliver { key, value, response } => {
575                            match key {
576                                Request::Block(commitment) => {
577                                    // Parse block
578                                    let Ok(block) = B::decode_cfg(value.as_ref(), &self.codec_config) else {
579                                        let _ = response.send(false);
580                                        continue;
581                                    };
582
583                                    // Validation
584                                    if block.commitment() != commitment {
585                                        let _ = response.send(false);
586                                        continue;
587                                    }
588
589                                    // Persist the block, also persisting the finalization if we have it
590                                    let height = block.height();
591                                    if let Some(finalization) = self.get_finalization_from_view(Identifier::Key(&commitment)).await {
592                                        self.put_finalization_and_finalized_block(height, commitment, finalization, block, &mut notifier_tx).await;
593                                    } else {
594                                        self.put_finalized_block(height, commitment, block, &mut notifier_tx).await;
595                                    }
596                                    debug!(?commitment, height, "received block");
597                                    let _ = response.send(true);
598                                },
599                                Request::Finalized { height } => {
600                                    // Parse finalization
601                                    let Ok((finalization, block)) = <(Finalization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {
602                                        let _ = response.send(false);
603                                        continue;
604                                    };
605
606                                    // Validation
607                                    if block.height() != height
608                                        || finalization.proposal.payload != block.commitment()
609                                        || !finalization.verify(&self.namespace, &self.identity)
610                                    {
611                                        let _ = response.send(false);
612                                        continue;
613                                    }
614
615                                    // Valid finalization received
616                                    debug!(height, "received finalization");
617                                    let _ = response.send(true);
618                                    self.put_finalization_and_finalized_block(height, block.commitment(), finalization, block, &mut notifier_tx).await;
619                                },
620                                Request::Notarized { view } => {
621                                    // Parse notarization
622                                    let Ok((notarization, block)) = <(Notarization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {
623                                        let _ = response.send(false);
624                                        continue;
625                                    };
626
627                                    // Validation
628                                    if notarization.proposal.view != view
629                                        || notarization.proposal.payload != block.commitment()
630                                        || !notarization.verify(&self.namespace, &self.identity)
631                                    {
632                                        let _ = response.send(false);
633                                        continue;
634                                    }
635
636                                    // Valid notarization received
637                                    let commitment = block.commitment();
638                                    debug!(view, ?commitment, "received notarization");
639                                    self.put_notarized_block(view, commitment, block).await;
640                                    self.put_notarization_by_view(view, commitment, notarization).await;
641                                    let _ = response.send(true);
642                                },
643                            }
644                        },
645                    }
646                },
647            }
648        }
649    }
650
651    // -------------------- Initialization --------------------
652
653    /// Helper to initialize an archive.
654    async fn init_prunable_archive<T: Codec>(
655        context: &R,
656        name: &str,
657        config: &Config<V, P, Z, B>,
658        codec_config: T::Cfg,
659    ) -> prunable::Archive<TwoCap, R, B::Commitment, T> {
660        let start = Instant::now();
661        let prunable_config = prunable::Config {
662            partition: format!("{}-{name}", config.partition_prefix),
663            translator: TwoCap,
664            items_per_section: config.prunable_items_per_section,
665            compression: None,
666            codec_config,
667            buffer_pool: config.freezer_journal_buffer_pool.clone(),
668            replay_buffer: config.replay_buffer,
669            write_buffer: config.write_buffer,
670        };
671        let archive = prunable::Archive::init(context.with_label(name), prunable_config)
672            .await
673            .unwrap_or_else(|_| panic!("failed to initialize {name} archive"));
674        info!(elapsed = ?start.elapsed(), "restored {name} archive");
675        archive
676    }
677
678    /// Helper to initialize a resolver.
679    fn init_resolver(
680        &self,
681        backfill: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
682    ) -> (
683        mpsc::Receiver<handler::Message<B>>,
684        p2p::Mailbox<Request<B>>,
685    ) {
686        let (handler, receiver) = mpsc::channel(self.mailbox_size);
687        let handler = Handler::new(handler);
688        let (resolver_engine, resolver) = p2p::Engine::new(
689            self.context.with_label("resolver"),
690            p2p::Config {
691                coordinator: self.coordinator.clone(),
692                consumer: handler.clone(),
693                producer: handler,
694                mailbox_size: self.mailbox_size,
695                requester_config: requester::Config {
696                    public_key: self.public_key.clone(),
697                    rate_limit: self.backfill_quota,
698                    initial: Duration::from_secs(1),
699                    timeout: Duration::from_secs(2),
700                },
701                fetch_retry_timeout: Duration::from_millis(100),
702                priority_requests: false,
703                priority_responses: false,
704            },
705        );
706        resolver_engine.start(backfill);
707        (receiver, resolver)
708    }
709
710    // -------------------- Waiters --------------------
711
712    /// Notify any subscribers for the given commitment with the provided block.
713    async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
714        if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
715            for subscriber in bs.subscribers.drain(..) {
716                let _ = subscriber.send(block.clone());
717            }
718        }
719    }
720
721    // -------------------- Storage --------------------
722
723    /// Add a verified block to the archive.
724    async fn put_verified_block(&mut self, view: u64, commitment: B::Commitment, block: B) {
725        self.notify_subscribers(commitment, &block).await;
726
727        match self.verified_blocks.put_sync(view, commitment, block).await {
728            Ok(_) => {
729                debug!(view, "verified stored");
730            }
731            Err(archive::Error::AlreadyPrunedTo(_)) => {
732                debug!(view, "verified already pruned");
733            }
734            Err(e) => {
735                panic!("failed to insert verified block: {e}");
736            }
737        }
738    }
739
740    /// Add a notarization to the archive by view.
741    async fn put_notarization_by_view(
742        &mut self,
743        view: u64,
744        commitment: B::Commitment,
745        notarization: Notarization<V, B::Commitment>,
746    ) {
747        match self
748            .notarizations_by_view
749            .put_sync(view, commitment, notarization)
750            .await
751        {
752            Ok(_) => {
753                debug!(view, "notarization by view stored");
754            }
755            Err(archive::Error::AlreadyPrunedTo(_)) => {
756                debug!(view, "notarization by view already pruned");
757            }
758            Err(e) => {
759                panic!("failed to insert notarization by view: {e}");
760            }
761        }
762    }
763
764    /// Add a finalization to the archive by view.
765    async fn put_finalization_by_view(
766        &mut self,
767        view: u64,
768        commitment: B::Commitment,
769        finalization: Finalization<V, B::Commitment>,
770    ) {
771        match self
772            .finalizations_by_view
773            .put_sync(view, commitment, finalization)
774            .await
775        {
776            Ok(_) => {
777                debug!(view, "finalization by view stored");
778            }
779            Err(archive::Error::AlreadyPrunedTo(_)) => {
780                debug!(view, "finalization by view already pruned");
781            }
782            Err(e) => {
783                panic!("failed to insert finalization by view: {e}");
784            }
785        }
786    }
787
788    /// Add a notarized block to the archive.
789    async fn put_notarized_block(&mut self, view: u64, commitment: B::Commitment, block: B) {
790        self.notify_subscribers(commitment, &block).await;
791
792        match self
793            .notarized_blocks
794            .put_sync(view, commitment, block)
795            .await
796        {
797            Ok(_) => {
798                debug!(view, "notarized stored");
799            }
800            Err(archive::Error::AlreadyPrunedTo(_)) => {
801                debug!(view, "notarized already pruned");
802            }
803            Err(e) => {
804                panic!("failed to insert notarization: {e}");
805            }
806        }
807    }
808
809    /// Add a finalized block to the archive.
810    ///
811    /// At the end of the method, the notifier is notified to indicate that there has been an update
812    /// to the archive of finalized blocks.
813    async fn put_finalized_block(
814        &mut self,
815        height: u64,
816        commitment: B::Commitment,
817        block: B,
818        notifier: &mut mpsc::Sender<()>,
819    ) {
820        self.notify_subscribers(commitment, &block).await;
821
822        if let Err(e) = self
823            .finalized_blocks
824            .put_sync(height, commitment, block)
825            .await
826        {
827            panic!("failed to insert block: {e}");
828        }
829        let _ = notifier.try_send(());
830    }
831
832    /// Add a finalization and finalized block to the archive.
833    async fn put_finalization_and_finalized_block(
834        &mut self,
835        height: u64,
836        commitment: B::Commitment,
837        finalization: Finalization<V, B::Commitment>,
838        block: B,
839        notifier: &mut mpsc::Sender<()>,
840    ) {
841        self.notify_subscribers(commitment, &block).await;
842
843        if let Err(e) = try_join!(
844            self.finalizations_by_height
845                .put_sync(height, commitment, finalization),
846            self.finalized_blocks.put_sync(height, commitment, block),
847        ) {
848            panic!("failed to insert finalization: {e}");
849        }
850        let _ = notifier.try_send(());
851    }
852
853    /// Looks for a block anywhere in local storage.
854    async fn find_block(
855        &mut self,
856        buffer: &mut buffered::Mailbox<P, B>,
857        commitment: B::Commitment,
858    ) -> Option<B> {
859        // Check buffer.
860        if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() {
861            return Some(block);
862        }
863        // Check verified.
864        if let Some(block) = self.get_verified_block(Identifier::Key(&commitment)).await {
865            return Some(block);
866        }
867        // Check notarized blocks.
868        if let Some(block) = self.get_notarized_block(Identifier::Key(&commitment)).await {
869            return Some(block);
870        }
871        // Check finalized blocks.
872        if let Some(block) = self.get_finalized_block(Identifier::Key(&commitment)).await {
873            return Some(block);
874        }
875        None
876    }
877
878    /// Get a finalized block from the archive.
879    async fn get_finalized_block(&self, id: Identifier<'_, B::Commitment>) -> Option<B> {
880        match self.finalized_blocks.get(id).await {
881            Ok(block) => block,
882            Err(e) => panic!("failed to get block: {e}"),
883        }
884    }
885
886    /// Get a finalization from the archive by height.
887    async fn get_finalization_by_height(
888        &self,
889        id: Identifier<'_, B::Commitment>,
890    ) -> Option<Finalization<V, B::Commitment>> {
891        match self.finalizations_by_height.get(id).await {
892            Ok(finalization) => finalization,
893            Err(e) => panic!("failed to get finalization: {e}"),
894        }
895    }
896
897    /// Get a notarization from the archive by view.
898    async fn get_notarization_by_view(
899        &self,
900        id: Identifier<'_, B::Commitment>,
901    ) -> Option<Notarization<V, B::Commitment>> {
902        match self.notarizations_by_view.get(id).await {
903            Ok(notarization) => notarization,
904            Err(e) => panic!("failed to get notarization by view: {e}"),
905        }
906    }
907
908    /// Get a finalization from the archive by view.
909    async fn get_finalization_from_view(
910        &self,
911        id: Identifier<'_, B::Commitment>,
912    ) -> Option<Finalization<V, B::Commitment>> {
913        match self.finalizations_by_view.get(id).await {
914            Ok(finalization) => finalization,
915            Err(e) => panic!("failed to get finalization by view: {e}"),
916        }
917    }
918
919    /// Get a verified block from the archive.
920    async fn get_verified_block(&self, id: Identifier<'_, B::Commitment>) -> Option<B> {
921        match self.verified_blocks.get(id).await {
922            Ok(verified) => verified,
923            Err(e) => panic!("failed to get verified block: {e}"),
924        }
925    }
926
927    /// Get a notarized block from the archive.
928    async fn get_notarized_block(&self, id: Identifier<'_, B::Commitment>) -> Option<B> {
929        match self.notarized_blocks.get(id).await {
930            Ok(block) => block,
931            Err(e) => panic!("failed to get notarized block: {e}"),
932        }
933    }
934}