1use super::{
2 config::Config,
3 finalizer::Finalizer,
4 ingress::{
5 handler::{self, Handler, Request},
6 mailbox::{Mailbox, Message},
7 orchestrator::{Orchestration, Orchestrator},
8 },
9};
10use crate::{
11 threshold_simplex::types::{Finalization, Notarization},
12 Block, Reporter,
13};
14use commonware_broadcast::{buffered, Broadcaster};
15use commonware_codec::{Codec, Decode, Encode};
16use commonware_cryptography::{bls12381::primitives::variant::Variant, PublicKey};
17use commonware_macros::select;
18use commonware_p2p::{utils::requester, Receiver, Recipients, Sender};
19use commonware_resolver::{
20 p2p::{self, Coordinator},
21 Resolver,
22};
23use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
24use commonware_storage::{
25 archive::{self, immutable, prunable, Archive as _, Identifier},
26 translator::TwoCap,
27};
28use commonware_utils::futures::{AbortablePool, Aborter};
29use futures::{
30 channel::{mpsc, oneshot},
31 try_join, StreamExt,
32};
33use governor::{clock::Clock as GClock, Quota};
34use prometheus_client::metrics::gauge::Gauge;
35use rand::Rng;
36use std::{
37 collections::{btree_map::Entry, BTreeMap},
38 marker::PhantomData,
39 time::{Duration, Instant},
40};
41use tracing::{debug, info, warn};
42
43struct BlockSubscription<B: Block> {
45 subscribers: Vec<oneshot::Sender<B>>,
47 _aborter: Aborter,
49}
50
51pub struct Actor<
64 B: Block,
65 R: Rng + Spawner + Metrics + Clock + GClock + Storage,
66 V: Variant,
67 P: PublicKey,
68 Z: Coordinator<PublicKey = P>,
69> {
70 context: R,
72
73 coordinator: Z,
76 mailbox: mpsc::Receiver<Message<V, B>>,
78
79 public_key: P,
82 identity: V::Public,
84 mailbox_size: usize,
86 backfill_quota: Quota,
88 namespace: Vec<u8>,
90 view_retention_timeout: u64,
92 max_repair: u64,
94 codec_config: B::Cfg,
96 partition_prefix: String,
98
99 last_processed_view: u64,
102
103 block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
105
106 verified_blocks: prunable::Archive<TwoCap, R, B::Commitment, B>,
109 notarized_blocks: prunable::Archive<TwoCap, R, B::Commitment, B>,
112 notarizations_by_view:
114 prunable::Archive<TwoCap, R, B::Commitment, Notarization<V, B::Commitment>>,
115 finalizations_by_view:
117 prunable::Archive<TwoCap, R, B::Commitment, Finalization<V, B::Commitment>>,
118
119 finalizations_by_height: immutable::Archive<R, B::Commitment, Finalization<V, B::Commitment>>,
122 finalized_blocks: immutable::Archive<R, B::Commitment, B>,
124
125 finalized_height: Gauge,
128 processed_height: Gauge,
130
131 _variant: PhantomData<V>,
133}
134
135impl<
136 B: Block,
137 R: Rng + Spawner + Metrics + Clock + GClock + Storage,
138 V: Variant,
139 P: PublicKey,
140 Z: Coordinator<PublicKey = P>,
141 > Actor<B, R, V, P, Z>
142{
143 pub async fn init(context: R, config: Config<V, P, Z, B>) -> (Self, Mailbox<V, B>) {
145 let verified_blocks = Self::init_prunable_archive(
147 &context,
148 "verified_blocks",
149 &config,
150 config.codec_config.clone(),
151 )
152 .await;
153 let notarized_blocks = Self::init_prunable_archive(
154 &context,
155 "notarized_blocks",
156 &config,
157 config.codec_config.clone(),
158 )
159 .await;
160 let notarizations_by_view =
161 Self::init_prunable_archive(&context, "notarizations_by_view", &config, ()).await;
162 let finalizations_by_view =
163 Self::init_prunable_archive(&context, "finalizations_by_view", &config, ()).await;
164
165 let start = Instant::now();
167 let finalizations_by_height = immutable::Archive::init(
168 context.with_label("finalizations_by_height"),
169 immutable::Config {
170 metadata_partition: format!(
171 "{}-finalizations-by-height-metadata",
172 config.partition_prefix
173 ),
174 freezer_table_partition: format!(
175 "{}-finalizations-by-height-freezer-table",
176 config.partition_prefix
177 ),
178 freezer_table_initial_size: config.freezer_table_initial_size,
179 freezer_table_resize_frequency: config.freezer_table_resize_frequency,
180 freezer_table_resize_chunk_size: config.freezer_table_resize_chunk_size,
181 freezer_journal_partition: format!(
182 "{}-finalizations-by-height-freezer-journal",
183 config.partition_prefix
184 ),
185 freezer_journal_target_size: config.freezer_journal_target_size,
186 freezer_journal_compression: config.freezer_journal_compression,
187 freezer_journal_buffer_pool: config.freezer_journal_buffer_pool.clone(),
188 ordinal_partition: format!(
189 "{}-finalizations-by-height-ordinal",
190 config.partition_prefix
191 ),
192 items_per_section: config.immutable_items_per_section,
193 codec_config: (),
194 replay_buffer: config.replay_buffer,
195 write_buffer: config.write_buffer,
196 },
197 )
198 .await
199 .expect("failed to initialize finalizations by height archive");
200 info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
201
202 let start = Instant::now();
204 let finalized_blocks = immutable::Archive::init(
205 context.with_label("finalized_blocks"),
206 immutable::Config {
207 metadata_partition: format!(
208 "{}-finalized_blocks-metadata",
209 config.partition_prefix
210 ),
211 freezer_table_partition: format!(
212 "{}-finalized_blocks-freezer-table",
213 config.partition_prefix
214 ),
215 freezer_table_initial_size: config.freezer_table_initial_size,
216 freezer_table_resize_frequency: config.freezer_table_resize_frequency,
217 freezer_table_resize_chunk_size: config.freezer_table_resize_chunk_size,
218 freezer_journal_partition: format!(
219 "{}-finalized_blocks-freezer-journal",
220 config.partition_prefix
221 ),
222 freezer_journal_target_size: config.freezer_journal_target_size,
223 freezer_journal_compression: config.freezer_journal_compression,
224 freezer_journal_buffer_pool: config.freezer_journal_buffer_pool,
225 ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
226 items_per_section: config.immutable_items_per_section,
227 codec_config: config.codec_config.clone(),
228 replay_buffer: config.replay_buffer,
229 write_buffer: config.write_buffer,
230 },
231 )
232 .await
233 .expect("failed to initialize finalized blocks archive");
234 info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
235
236 let finalized_height = Gauge::default();
238 context.register(
239 "finalized_height",
240 "Finalized height of application",
241 finalized_height.clone(),
242 );
243 let processed_height = Gauge::default();
244 context.register(
245 "processed_height",
246 "Processed height of application",
247 processed_height.clone(),
248 );
249
250 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
252 (
253 Self {
254 context,
255
256 coordinator: config.coordinator,
257 mailbox,
258
259 public_key: config.public_key,
260 identity: config.identity,
261 mailbox_size: config.mailbox_size,
262 backfill_quota: config.backfill_quota,
263 namespace: config.namespace.clone(),
264 view_retention_timeout: config.view_retention_timeout,
265 max_repair: config.max_repair,
266 codec_config: config.codec_config.clone(),
267 partition_prefix: config.partition_prefix,
268
269 last_processed_view: 0,
270 block_subscriptions: BTreeMap::new(),
271
272 verified_blocks,
273 notarized_blocks,
274 notarizations_by_view,
275 finalizations_by_view,
276 finalizations_by_height,
277 finalized_blocks,
278
279 finalized_height,
280 processed_height,
281
282 _variant: PhantomData,
283 },
284 Mailbox::new(sender),
285 )
286 }
287
288 pub fn start(
290 mut self,
291 application: impl Reporter<Activity = B>,
292 buffer: buffered::Mailbox<P, B>,
293 backfill: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
294 ) -> Handle<()> {
295 self.context.spawn_ref()(self.run(application, buffer, backfill))
296 }
297
298 async fn run(
300 mut self,
301 application: impl Reporter<Activity = B>,
302 mut buffer: buffered::Mailbox<P, B>,
303 backfill: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
304 ) {
305 let (mut resolver_rx, mut resolver) = self.init_resolver(backfill);
307
308 let (mut notifier_tx, notifier_rx) = mpsc::channel::<()>(1);
310 let (orchestrator_sender, mut orchestrator_receiver) = mpsc::channel(self.mailbox_size);
311 let orchestrator = Orchestrator::new(orchestrator_sender);
312 let finalizer = Finalizer::new(
313 self.context.with_label("finalizer"),
314 self.partition_prefix.clone(),
315 application,
316 orchestrator,
317 notifier_rx,
318 )
319 .await;
320 self.context
321 .with_label("finalizer")
322 .spawn(|_| finalizer.run());
323
324 let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
326
327 loop {
329 self.block_subscriptions.retain(|_, bs| {
331 bs.subscribers.retain(|tx| !tx.is_canceled());
332 !bs.subscribers.is_empty()
333 });
334
335 select! {
337 result = waiters.next_completed() => {
339 let Ok((commitment, block)) = result else {
340 continue; };
342 self.notify_subscribers(commitment, &block).await;
343 },
344 mailbox_message = self.mailbox.next() => {
346 let Some(message) = mailbox_message else {
347 info!("mailbox closed, shutting down");
348 return;
349 };
350 match message {
351 Message::Broadcast { block } => {
352 let ack = buffer.broadcast(Recipients::All, block).await;
353 drop(ack);
354 }
355 Message::Verified { view, block } => {
356 self.put_verified_block(view, block.commitment(), block).await;
357 }
358 Message::Notarization { notarization } => {
359 let view = notarization.proposal.view;
360 let commitment = notarization.proposal.payload;
361
362 self.put_notarization_by_view(view, commitment, notarization.clone()).await;
364
365 if let Some(block) = self.find_block(&mut buffer, commitment).await {
367 self.put_notarized_block(view, commitment, block).await;
369 continue;
370 } else {
371 debug!(view, "notarized block missing");
372 resolver.fetch(Request::<B>::Notarized { view }).await;
373 }
374 }
375 Message::Finalization { finalization } => {
376 let view = finalization.proposal.view;
378 let commitment = finalization.proposal.payload;
379 self.put_finalization_by_view(view, commitment, finalization.clone()).await;
380
381 if let Some(block) = self.find_block(&mut buffer, commitment).await {
383 let height = block.height();
385 self.put_finalized_block(height, commitment, block, &mut notifier_tx).await;
386 debug!(view, height, "finalized block stored");
387 self.finalized_height.set(height as i64);
388
389 resolver.retain(Request::<B>::Notarized { view }.predicate()).await;
391 } else {
392 debug!(view, ?commitment, "finalized block missing");
394 resolver.fetch(Request::<B>::Block(commitment)).await;
395 }
396 }
397 Message::Get { commitment, response } => {
398 let result = self.find_block(&mut buffer, commitment).await;
400 let _ = response.send(result);
401 }
402 Message::Subscribe { view, commitment, response } => {
403 if let Some(block) = self.find_block(&mut buffer, commitment).await {
405 let _ = response.send(block);
406 continue;
407 }
408
409 if let Some(view) = view {
414 debug!(view, ?commitment, "requested block missing");
419 resolver.fetch(Request::<B>::Notarized { view }).await;
420 }
421
422 debug!(view, ?commitment, "registering subscriber");
424 match self.block_subscriptions.entry(commitment) {
425 Entry::Occupied(mut entry) => {
426 entry.get_mut().subscribers.push(response);
427 }
428 Entry::Vacant(entry) => {
429 let (tx, rx) = oneshot::channel();
430 buffer.subscribe_prepared(None, commitment, None, tx).await;
431 let aborter = waiters.push(async move {
432 (commitment, rx.await.expect("buffer subscriber closed"))
433 });
434 entry.insert(BlockSubscription {
435 subscribers: vec![response],
436 _aborter: aborter,
437 });
438 }
439 }
440 }
441 }
442 },
443 message = orchestrator_receiver.next() => {
445 let Some(message) = message else {
446 info!("orchestrator closed, shutting down");
447 return;
448 };
449 match message {
450 Orchestration::Get { height, result } => {
451 let block = self.get_finalized_block(Identifier::Index(height)).await;
453 result.send(block).unwrap_or_else(|_| warn!(?height, "Failed to send block to orchestrator"));
454 }
455 Orchestration::Processed { height, digest } => {
456 self.processed_height.set(height as i64);
458
459 resolver.cancel(Request::<B>::Block(digest)).await;
461 resolver.retain(Request::<B>::Finalized { height }.predicate()).await;
462
463 if let Some(finalization) = self.get_finalization_by_height(Identifier::Index(height)).await {
465 let min_view = self.last_processed_view.saturating_sub(self.view_retention_timeout);
467
468 match try_join!(
470 self.verified_blocks.prune(min_view),
471 self.notarized_blocks.prune(min_view),
472 self.notarizations_by_view.prune(min_view),
473 self.finalizations_by_view.prune(min_view),
474 ) {
475 Ok(_) => debug!(min_view, "pruned archives"),
476 Err(e) => panic!("failed to prune archives: {e}"),
477 }
478
479 self.last_processed_view = finalization.proposal.view;
481 }
482 }
483 Orchestration::Repair { height } => {
484 let (_, Some(gap_end)) = self.finalized_blocks.next_gap(height) else {
486 continue;
488 };
489 assert!(gap_end > height, "gap end must be greater than height");
490
491 let Some(mut cursor) = self.get_finalized_block(Identifier::Index(gap_end)).await else {
494 panic!("gapped block missing that should exist: {gap_end}");
495 };
496
497 while cursor.height() > height {
499 let commitment = cursor.parent();
500 if let Some(block) = self.find_block(&mut buffer, commitment).await {
501 self.put_finalized_block(block.height(), commitment, block.clone(), &mut notifier_tx).await;
502 debug!(height = block.height(), "repaired block");
503 cursor = block;
504 } else {
505 resolver.fetch(Request::<B>::Block(commitment)).await;
507 break;
508 }
509 }
510
511 let gap_start = height;
516 let gap_end = std::cmp::min(cursor.height(), gap_start.saturating_add(self.max_repair));
517 debug!(gap_start, gap_end, "requesting any finalized blocks");
518 for height in gap_start..gap_end {
519 resolver.fetch(Request::<B>::Finalized { height }).await;
520 }
521 }
522 }
523 },
524 message = resolver_rx.next() => {
526 let Some(message) = message else {
527 info!("handler closed, shutting down");
528 return;
529 };
530 match message {
531 handler::Message::Produce { key, response } => {
532 match key {
533 Request::Block(commitment) => {
534 let Some(block) = self.find_block(&mut buffer, commitment).await else {
536 debug!(?commitment, "block missing on request");
537 continue;
538 };
539 let _ = response.send(block.encode().into());
540 }
541 Request::Finalized { height } => {
542 let Some(finalization) = self.get_finalization_by_height(Identifier::Index(height)).await else {
544 debug!(height, "finalization missing on request");
545 continue;
546 };
547
548 let Some(block) = self.get_finalized_block(Identifier::Index(height)).await else {
550 debug!(height, "finalized block missing on request");
551 continue;
552 };
553
554 let _ = response.send((finalization, block).encode().into());
556 }
557 Request::Notarized { view } => {
558 let Some(notarization) = self.get_notarization_by_view(Identifier::Index(view)).await else {
560 debug!(view, "notarization missing on request");
561 continue;
562 };
563
564 let commitment = notarization.proposal.payload;
566 let Some(block) = self.find_block(&mut buffer, commitment).await else {
567 debug!(?commitment, "block missing on request");
568 continue;
569 };
570 let _ = response.send((notarization, block).encode().into());
571 }
572 }
573 },
574 handler::Message::Deliver { key, value, response } => {
575 match key {
576 Request::Block(commitment) => {
577 let Ok(block) = B::decode_cfg(value.as_ref(), &self.codec_config) else {
579 let _ = response.send(false);
580 continue;
581 };
582
583 if block.commitment() != commitment {
585 let _ = response.send(false);
586 continue;
587 }
588
589 let height = block.height();
591 if let Some(finalization) = self.get_finalization_from_view(Identifier::Key(&commitment)).await {
592 self.put_finalization_and_finalized_block(height, commitment, finalization, block, &mut notifier_tx).await;
593 } else {
594 self.put_finalized_block(height, commitment, block, &mut notifier_tx).await;
595 }
596 debug!(?commitment, height, "received block");
597 let _ = response.send(true);
598 },
599 Request::Finalized { height } => {
600 let Ok((finalization, block)) = <(Finalization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {
602 let _ = response.send(false);
603 continue;
604 };
605
606 if block.height() != height
608 || finalization.proposal.payload != block.commitment()
609 || !finalization.verify(&self.namespace, &self.identity)
610 {
611 let _ = response.send(false);
612 continue;
613 }
614
615 debug!(height, "received finalization");
617 let _ = response.send(true);
618 self.put_finalization_and_finalized_block(height, block.commitment(), finalization, block, &mut notifier_tx).await;
619 },
620 Request::Notarized { view } => {
621 let Ok((notarization, block)) = <(Notarization<V, B::Commitment>, B)>::decode_cfg(value, &((), self.codec_config.clone())) else {
623 let _ = response.send(false);
624 continue;
625 };
626
627 if notarization.proposal.view != view
629 || notarization.proposal.payload != block.commitment()
630 || !notarization.verify(&self.namespace, &self.identity)
631 {
632 let _ = response.send(false);
633 continue;
634 }
635
636 let commitment = block.commitment();
638 debug!(view, ?commitment, "received notarization");
639 self.put_notarized_block(view, commitment, block).await;
640 self.put_notarization_by_view(view, commitment, notarization).await;
641 let _ = response.send(true);
642 },
643 }
644 },
645 }
646 },
647 }
648 }
649 }
650
651 async fn init_prunable_archive<T: Codec>(
655 context: &R,
656 name: &str,
657 config: &Config<V, P, Z, B>,
658 codec_config: T::Cfg,
659 ) -> prunable::Archive<TwoCap, R, B::Commitment, T> {
660 let start = Instant::now();
661 let prunable_config = prunable::Config {
662 partition: format!("{}-{name}", config.partition_prefix),
663 translator: TwoCap,
664 items_per_section: config.prunable_items_per_section,
665 compression: None,
666 codec_config,
667 buffer_pool: config.freezer_journal_buffer_pool.clone(),
668 replay_buffer: config.replay_buffer,
669 write_buffer: config.write_buffer,
670 };
671 let archive = prunable::Archive::init(context.with_label(name), prunable_config)
672 .await
673 .unwrap_or_else(|_| panic!("failed to initialize {name} archive"));
674 info!(elapsed = ?start.elapsed(), "restored {name} archive");
675 archive
676 }
677
678 fn init_resolver(
680 &self,
681 backfill: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
682 ) -> (
683 mpsc::Receiver<handler::Message<B>>,
684 p2p::Mailbox<Request<B>>,
685 ) {
686 let (handler, receiver) = mpsc::channel(self.mailbox_size);
687 let handler = Handler::new(handler);
688 let (resolver_engine, resolver) = p2p::Engine::new(
689 self.context.with_label("resolver"),
690 p2p::Config {
691 coordinator: self.coordinator.clone(),
692 consumer: handler.clone(),
693 producer: handler,
694 mailbox_size: self.mailbox_size,
695 requester_config: requester::Config {
696 public_key: self.public_key.clone(),
697 rate_limit: self.backfill_quota,
698 initial: Duration::from_secs(1),
699 timeout: Duration::from_secs(2),
700 },
701 fetch_retry_timeout: Duration::from_millis(100),
702 priority_requests: false,
703 priority_responses: false,
704 },
705 );
706 resolver_engine.start(backfill);
707 (receiver, resolver)
708 }
709
710 async fn notify_subscribers(&mut self, commitment: B::Commitment, block: &B) {
714 if let Some(mut bs) = self.block_subscriptions.remove(&commitment) {
715 for subscriber in bs.subscribers.drain(..) {
716 let _ = subscriber.send(block.clone());
717 }
718 }
719 }
720
721 async fn put_verified_block(&mut self, view: u64, commitment: B::Commitment, block: B) {
725 self.notify_subscribers(commitment, &block).await;
726
727 match self.verified_blocks.put_sync(view, commitment, block).await {
728 Ok(_) => {
729 debug!(view, "verified stored");
730 }
731 Err(archive::Error::AlreadyPrunedTo(_)) => {
732 debug!(view, "verified already pruned");
733 }
734 Err(e) => {
735 panic!("failed to insert verified block: {e}");
736 }
737 }
738 }
739
740 async fn put_notarization_by_view(
742 &mut self,
743 view: u64,
744 commitment: B::Commitment,
745 notarization: Notarization<V, B::Commitment>,
746 ) {
747 match self
748 .notarizations_by_view
749 .put_sync(view, commitment, notarization)
750 .await
751 {
752 Ok(_) => {
753 debug!(view, "notarization by view stored");
754 }
755 Err(archive::Error::AlreadyPrunedTo(_)) => {
756 debug!(view, "notarization by view already pruned");
757 }
758 Err(e) => {
759 panic!("failed to insert notarization by view: {e}");
760 }
761 }
762 }
763
764 async fn put_finalization_by_view(
766 &mut self,
767 view: u64,
768 commitment: B::Commitment,
769 finalization: Finalization<V, B::Commitment>,
770 ) {
771 match self
772 .finalizations_by_view
773 .put_sync(view, commitment, finalization)
774 .await
775 {
776 Ok(_) => {
777 debug!(view, "finalization by view stored");
778 }
779 Err(archive::Error::AlreadyPrunedTo(_)) => {
780 debug!(view, "finalization by view already pruned");
781 }
782 Err(e) => {
783 panic!("failed to insert finalization by view: {e}");
784 }
785 }
786 }
787
788 async fn put_notarized_block(&mut self, view: u64, commitment: B::Commitment, block: B) {
790 self.notify_subscribers(commitment, &block).await;
791
792 match self
793 .notarized_blocks
794 .put_sync(view, commitment, block)
795 .await
796 {
797 Ok(_) => {
798 debug!(view, "notarized stored");
799 }
800 Err(archive::Error::AlreadyPrunedTo(_)) => {
801 debug!(view, "notarized already pruned");
802 }
803 Err(e) => {
804 panic!("failed to insert notarization: {e}");
805 }
806 }
807 }
808
809 async fn put_finalized_block(
814 &mut self,
815 height: u64,
816 commitment: B::Commitment,
817 block: B,
818 notifier: &mut mpsc::Sender<()>,
819 ) {
820 self.notify_subscribers(commitment, &block).await;
821
822 if let Err(e) = self
823 .finalized_blocks
824 .put_sync(height, commitment, block)
825 .await
826 {
827 panic!("failed to insert block: {e}");
828 }
829 let _ = notifier.try_send(());
830 }
831
832 async fn put_finalization_and_finalized_block(
834 &mut self,
835 height: u64,
836 commitment: B::Commitment,
837 finalization: Finalization<V, B::Commitment>,
838 block: B,
839 notifier: &mut mpsc::Sender<()>,
840 ) {
841 self.notify_subscribers(commitment, &block).await;
842
843 if let Err(e) = try_join!(
844 self.finalizations_by_height
845 .put_sync(height, commitment, finalization),
846 self.finalized_blocks.put_sync(height, commitment, block),
847 ) {
848 panic!("failed to insert finalization: {e}");
849 }
850 let _ = notifier.try_send(());
851 }
852
853 async fn find_block(
855 &mut self,
856 buffer: &mut buffered::Mailbox<P, B>,
857 commitment: B::Commitment,
858 ) -> Option<B> {
859 if let Some(block) = buffer.get(None, commitment, None).await.into_iter().next() {
861 return Some(block);
862 }
863 if let Some(block) = self.get_verified_block(Identifier::Key(&commitment)).await {
865 return Some(block);
866 }
867 if let Some(block) = self.get_notarized_block(Identifier::Key(&commitment)).await {
869 return Some(block);
870 }
871 if let Some(block) = self.get_finalized_block(Identifier::Key(&commitment)).await {
873 return Some(block);
874 }
875 None
876 }
877
878 async fn get_finalized_block(&self, id: Identifier<'_, B::Commitment>) -> Option<B> {
880 match self.finalized_blocks.get(id).await {
881 Ok(block) => block,
882 Err(e) => panic!("failed to get block: {e}"),
883 }
884 }
885
886 async fn get_finalization_by_height(
888 &self,
889 id: Identifier<'_, B::Commitment>,
890 ) -> Option<Finalization<V, B::Commitment>> {
891 match self.finalizations_by_height.get(id).await {
892 Ok(finalization) => finalization,
893 Err(e) => panic!("failed to get finalization: {e}"),
894 }
895 }
896
897 async fn get_notarization_by_view(
899 &self,
900 id: Identifier<'_, B::Commitment>,
901 ) -> Option<Notarization<V, B::Commitment>> {
902 match self.notarizations_by_view.get(id).await {
903 Ok(notarization) => notarization,
904 Err(e) => panic!("failed to get notarization by view: {e}"),
905 }
906 }
907
908 async fn get_finalization_from_view(
910 &self,
911 id: Identifier<'_, B::Commitment>,
912 ) -> Option<Finalization<V, B::Commitment>> {
913 match self.finalizations_by_view.get(id).await {
914 Ok(finalization) => finalization,
915 Err(e) => panic!("failed to get finalization by view: {e}"),
916 }
917 }
918
919 async fn get_verified_block(&self, id: Identifier<'_, B::Commitment>) -> Option<B> {
921 match self.verified_blocks.get(id).await {
922 Ok(verified) => verified,
923 Err(e) => panic!("failed to get verified block: {e}"),
924 }
925 }
926
927 async fn get_notarized_block(&self, id: Identifier<'_, B::Commitment>) -> Option<B> {
929 match self.notarized_blocks.get(id).await {
930 Ok(block) => block,
931 Err(e) => panic!("failed to get notarized block: {e}"),
932 }
933 }
934}