1use super::{
2 cache,
3 mailbox::{Mailbox, Message},
4 Buffer, IntoBlock, Variant,
5};
6use crate::{
7 marshal::{
8 resolver::handler::{self, Request},
9 store::{Blocks, Certificates},
10 Config, Identifier as BlockID, Update,
11 },
12 simplex::{
13 scheme::Scheme,
14 types::{verify_certificates, Finalization, Notarization, Subject},
15 },
16 types::{Epoch, Epocher, Height, Round, ViewDelta},
17 Block, Epochable, Heightable, Reporter,
18};
19use bytes::Bytes;
20use commonware_codec::{Decode, Encode, Read};
21use commonware_cryptography::{
22 certificate::{Provider, Scheme as CertificateScheme},
23 Digestible,
24};
25use commonware_macros::select_loop;
26use commonware_parallel::Strategy;
27use commonware_resolver::Resolver;
28use commonware_runtime::{
29 spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle,
30 Metrics, Spawner, Storage,
31};
32use commonware_storage::{
33 archive::Identifier as ArchiveID,
34 metadata::{self, Metadata},
35};
36use commonware_utils::{
37 acknowledgement::Exact,
38 channel::{fallible::OneshotExt, mpsc, oneshot},
39 futures::{AbortablePool, Aborter, OptionFuture},
40 sequence::U64,
41 Acknowledgement, BoxedError,
42};
43use futures::{future::join_all, try_join, FutureExt};
44use pin_project::pin_project;
45use prometheus_client::metrics::gauge::Gauge;
46use rand_core::CryptoRngCore;
47use std::{
48 collections::{btree_map::Entry, BTreeMap, VecDeque},
49 future::Future,
50 num::NonZeroUsize,
51 pin::Pin,
52 sync::Arc,
53};
54use tracing::{debug, error, info, warn};
55
56const LATEST_KEY: U64 = U64::new(0xFF);
58
59enum PendingVerification<S: CertificateScheme, V: Variant> {
61 Notarized {
62 notarization: Notarization<S, V::Commitment>,
63 block: V::Block,
64 response: oneshot::Sender<bool>,
65 },
66 Finalized {
67 finalization: Finalization<S, V::Commitment>,
68 block: V::Block,
69 response: oneshot::Sender<bool>,
70 },
71}
72
73#[pin_project]
75struct PendingAck<V: Variant, A: Acknowledgement> {
76 height: Height,
77 commitment: V::Commitment,
78 #[pin]
79 receiver: A::Waiter,
80}
81
82impl<V: Variant, A: Acknowledgement> Future for PendingAck<V, A> {
83 type Output = <A::Waiter as Future>::Output;
84
85 fn poll(
86 self: std::pin::Pin<&mut Self>,
87 cx: &mut std::task::Context<'_>,
88 ) -> std::task::Poll<Self::Output> {
89 self.project().receiver.poll(cx)
90 }
91}
92
93struct PendingAcks<V: Variant, A: Acknowledgement> {
95 current: OptionFuture<PendingAck<V, A>>,
96 queue: VecDeque<PendingAck<V, A>>,
97 max: usize,
98}
99
100impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
101 fn new(max: usize) -> Self {
103 Self {
104 current: None.into(),
105 queue: VecDeque::with_capacity(max),
106 max,
107 }
108 }
109
110 fn clear(&mut self) {
112 self.current = None.into();
113 self.queue.clear();
114 }
115
116 const fn current(&mut self) -> &mut OptionFuture<PendingAck<V, A>> {
118 &mut self.current
119 }
120
121 fn has_capacity(&self) -> bool {
123 let reserved = usize::from(self.current.is_some());
124 self.queue.len() < self.max - reserved
125 }
126
127 fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
129 self.queue
130 .back()
131 .map(|ack| ack.height.next())
132 .or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
133 .unwrap_or_else(|| last_processed_height.next())
134 }
135
136 fn enqueue(&mut self, ack: PendingAck<V, A>) {
138 if self.current.is_none() {
139 self.current.replace(ack);
140 return;
141 }
142 self.queue.push_back(ack);
143 }
144
145 fn complete_current(
147 &mut self,
148 result: <A::Waiter as Future>::Output,
149 ) -> (Height, V::Commitment, <A::Waiter as Future>::Output) {
150 let PendingAck {
151 height, commitment, ..
152 } = self.current.take().expect("ack state must be present");
153 if let Some(next) = self.queue.pop_front() {
154 self.current.replace(next);
155 }
156 (height, commitment, result)
157 }
158
159 fn pop_ready(&mut self) -> Option<(Height, V::Commitment, <A::Waiter as Future>::Output)> {
161 let pending = self.current.as_mut()?;
162 let result = Pin::new(&mut pending.receiver).now_or_never()?;
163 Some(self.complete_current(result))
164 }
165}
166
167struct BlockSubscription<V: Variant> {
169 subscribers: Vec<oneshot::Sender<V::Block>>,
171 _aborter: Aborter,
173}
174
175#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
180enum BlockSubscriptionKey<C, D> {
181 Digest(D),
182 Commitment(C),
183}
184
185type BlockSubscriptionKeyFor<V> =
186 BlockSubscriptionKey<<V as Variant>::Commitment, <<V as Variant>::Block as Digestible>::Digest>;
187
188pub struct Actor<E, V, P, FC, FB, ES, T, A = Exact>
201where
202 E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
203 V: Variant,
204 P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
205 FC: Certificates<
206 BlockDigest = <V::Block as Digestible>::Digest,
207 Commitment = V::Commitment,
208 Scheme = P::Scheme,
209 >,
210 FB: Blocks<Block = V::StoredBlock>,
211 ES: Epocher,
212 T: Strategy,
213 A: Acknowledgement,
214{
215 context: ContextCell<E>,
217
218 mailbox: mpsc::Receiver<Message<P::Scheme, V>>,
221
222 provider: P,
225 epocher: ES,
227 view_retention_timeout: ViewDelta,
229 max_repair: NonZeroUsize,
231 block_codec_config: <V::Block as Read>::Cfg,
233 strategy: T,
235
236 last_processed_round: Round,
239 last_processed_height: Height,
241 pending_acks: PendingAcks<V, A>,
243 tip: Height,
245 block_subscriptions: BTreeMap<BlockSubscriptionKeyFor<V>, BlockSubscription<V>>,
247
248 cache: cache::Manager<E, V, P::Scheme>,
251 application_metadata: Metadata<E, U64, Height>,
253 finalizations_by_height: FC,
255 finalized_blocks: FB,
257
258 finalized_height: Gauge,
261 processed_height: Gauge,
263}
264
265impl<E, V, P, FC, FB, ES, T, A> Actor<E, V, P, FC, FB, ES, T, A>
266where
267 E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
268 V: Variant,
269 P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
270 FC: Certificates<
271 BlockDigest = <V::Block as Digestible>::Digest,
272 Commitment = V::Commitment,
273 Scheme = P::Scheme,
274 >,
275 FB: Blocks<Block = V::StoredBlock>,
276 ES: Epocher,
277 T: Strategy,
278 A: Acknowledgement,
279{
280 pub async fn init(
282 context: E,
283 finalizations_by_height: FC,
284 finalized_blocks: FB,
285 config: Config<V::Block, P, ES, T>,
286 ) -> (Self, Mailbox<P::Scheme, V>, Height) {
287 let prunable_config = cache::Config {
289 partition_prefix: format!("{}-cache", config.partition_prefix),
290 prunable_items_per_section: config.prunable_items_per_section,
291 replay_buffer: config.replay_buffer,
292 key_write_buffer: config.key_write_buffer,
293 value_write_buffer: config.value_write_buffer,
294 key_page_cache: config.page_cache.clone(),
295 };
296 let cache = cache::Manager::init(
297 context.with_label("cache"),
298 prunable_config,
299 config.block_codec_config.clone(),
300 )
301 .await;
302
303 let application_metadata = Metadata::init(
305 context.with_label("application_metadata"),
306 metadata::Config {
307 partition: format!("{}-application-metadata", config.partition_prefix),
308 codec_config: (),
309 },
310 )
311 .await
312 .expect("failed to initialize application metadata");
313 let last_processed_height = application_metadata
314 .get(&LATEST_KEY)
315 .copied()
316 .unwrap_or(Height::zero());
317
318 let finalized_height = Gauge::default();
320 context.register(
321 "finalized_height",
322 "Finalized height of application",
323 finalized_height.clone(),
324 );
325 let processed_height = Gauge::default();
326 context.register(
327 "processed_height",
328 "Processed height of application",
329 processed_height.clone(),
330 );
331 let _ = processed_height.try_set(last_processed_height.get());
332
333 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
335 (
336 Self {
337 context: ContextCell::new(context),
338 mailbox,
339 provider: config.provider,
340 epocher: config.epocher,
341 view_retention_timeout: config.view_retention_timeout,
342 max_repair: config.max_repair,
343 block_codec_config: config.block_codec_config,
344 strategy: config.strategy,
345 last_processed_round: Round::zero(),
346 last_processed_height,
347 pending_acks: PendingAcks::new(config.max_pending_acks.get()),
348 tip: Height::zero(),
349 block_subscriptions: BTreeMap::new(),
350 cache,
351 application_metadata,
352 finalizations_by_height,
353 finalized_blocks,
354 finalized_height,
355 processed_height,
356 },
357 Mailbox::new(sender),
358 last_processed_height,
359 )
360 }
361
362 pub fn start<R, Buf>(
364 mut self,
365 application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
366 buffer: Buf,
367 resolver: (mpsc::Receiver<handler::Message<V::Commitment>>, R),
368 ) -> Handle<()>
369 where
370 R: Resolver<
371 Key = handler::Request<V::Commitment>,
372 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
373 >,
374 Buf: Buffer<V>,
375 {
376 spawn_cell!(self.context, self.run(application, buffer, resolver).await)
377 }
378
379 async fn run<R, Buf>(
381 mut self,
382 mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
383 mut buffer: Buf,
384 (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<V::Commitment>>, R),
385 ) where
386 R: Resolver<
387 Key = handler::Request<V::Commitment>,
388 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
389 >,
390 Buf: Buffer<V>,
391 {
392 let mut waiters = AbortablePool::<Result<V::Block, BlockSubscriptionKeyFor<V>>>::default();
394
395 let tip = self.get_latest().await;
397 if let Some((height, digest, round)) = tip {
398 application.report(Update::Tip(round, height, digest)).await;
399 self.tip = height;
400 let _ = self.finalized_height.try_set(height.get());
401 }
402
403 self.try_dispatch_blocks(&mut application).await;
405
406 if self
408 .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
409 .await
410 {
411 self.sync_finalized().await;
412 }
413
414 select_loop! {
415 self.context,
416 on_start => {
417 self.block_subscriptions.retain(|_, bs| {
419 bs.subscribers.retain(|tx| !tx.is_closed());
420 !bs.subscribers.is_empty()
421 });
422 },
423 on_stopped => {
424 debug!("context shutdown, stopping marshal");
425 },
426 Ok(completion) = waiters.next_completed() else continue => match completion {
428 Ok(block) => self.notify_subscribers(&block),
429 Err(key) => {
430 match key {
431 BlockSubscriptionKey::Digest(digest) => {
432 debug!(
433 ?digest,
434 "buffer subscription closed, canceling local subscribers"
435 );
436 }
437 BlockSubscriptionKey::Commitment(commitment) => {
438 debug!(
439 ?commitment,
440 "buffer subscription closed, canceling local subscribers"
441 );
442 }
443 }
444 self.block_subscriptions.remove(&key);
445 }
446 },
447 result = self.pending_acks.current() => {
449 let mut pending = Some(self.pending_acks.complete_current(result));
451 loop {
452 let (height, commitment, result) =
453 pending.take().expect("pending ack must exist");
454 match result {
455 Ok(()) => {
456 self.handle_block_processed(height, commitment, &mut resolver)
458 .await;
459 }
460 Err(e) => {
461 error!(e = ?e, height = %height, "application did not acknowledge block");
463 return;
464 }
465 }
466
467 let Some(next) = self.pending_acks.pop_ready() else {
470 break;
471 };
472 pending = Some(next);
473 }
474
475 if let Err(e) = self.application_metadata.sync().await {
477 error!(?e, "failed to sync application progress");
478 return;
479 }
480
481 self.try_dispatch_blocks(&mut application).await;
483 },
484 Some(message) = self.mailbox.recv() else {
486 info!("mailbox closed, shutting down");
487 break;
488 } => {
489 match message {
490 Message::GetInfo {
491 identifier,
492 response,
493 } => {
494 let info = match identifier {
495 BlockID::Digest(digest) => self
499 .finalized_blocks
500 .get(ArchiveID::Key(&digest))
501 .await
502 .ok()
503 .flatten()
504 .map(|b| (b.height(), digest)),
505 BlockID::Height(height) => self
506 .finalizations_by_height
507 .get(ArchiveID::Index(height.get()))
508 .await
509 .ok()
510 .flatten()
511 .map(|f| (height, V::commitment_to_inner(f.proposal.payload))),
512 BlockID::Latest => self.get_latest().await.map(|(h, d, _)| (h, d)),
513 };
514 response.send_lossy(info);
515 }
516 Message::Proposed { round, block } => {
517 self.cache_verified(round, block.digest(), block.clone())
518 .await;
519 buffer.proposed(round, block).await;
520 }
521 Message::Verified { round, block } => {
522 self.cache_verified(round, block.digest(), block).await;
523 }
524 Message::Notarization { notarization } => {
525 let round = notarization.round();
526 let commitment = notarization.proposal.payload;
527 let digest = V::commitment_to_inner(commitment);
528
529 self.cache
531 .put_notarization(round, digest, notarization.clone())
532 .await;
533
534 if let Some(block) =
536 self.find_block_by_commitment(&buffer, commitment).await
537 {
538 self.cache_block(round, digest, block).await;
540 } else {
541 debug!(?round, "notarized block missing");
542 resolver
543 .fetch(Request::<V::Commitment>::Notarized { round })
544 .await;
545 }
546 }
547 Message::Finalization { finalization } => {
548 let round = finalization.round();
550 let commitment = finalization.proposal.payload;
551 let digest = V::commitment_to_inner(commitment);
552 self.cache
553 .put_finalization(round, digest, finalization.clone())
554 .await;
555
556 if let Some(block) =
558 self.find_block_by_commitment(&buffer, commitment).await
559 {
560 let height = block.height();
562 if self
563 .store_finalization(
564 height,
565 digest,
566 block,
567 Some(finalization),
568 &mut application,
569 &mut buffer,
570 )
571 .await
572 {
573 self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
574 .await;
575 self.sync_finalized().await;
576 debug!(?round, %height, "finalized block stored");
577 }
578 } else {
579 debug!(?round, ?commitment, "finalized block missing");
581 resolver
582 .fetch(Request::<V::Commitment>::Block(commitment))
583 .await;
584 }
585 }
586 Message::GetBlock {
587 identifier,
588 response,
589 } => match identifier {
590 BlockID::Digest(digest) => {
591 let result = self.find_block_by_digest(&mut buffer, digest).await;
592 response.send_lossy(result);
593 }
594 BlockID::Height(height) => {
595 let result = self.get_finalized_block(height).await;
596 response.send_lossy(result);
597 }
598 BlockID::Latest => {
599 let block = match self.get_latest().await {
600 Some((_, digest, _)) => {
601 self.find_block_by_digest(&mut buffer, digest).await
602 }
603 None => None,
604 };
605 response.send_lossy(block);
606 }
607 },
608 Message::GetFinalization { height, response } => {
609 let finalization = self.get_finalization_by_height(height).await;
610 response.send_lossy(finalization);
611 }
612 Message::HintFinalized { height, targets } => {
613 if height <= self.last_processed_height {
615 continue;
616 }
617
618 if self.get_finalization_by_height(height).await.is_some() {
620 continue;
621 }
622
623 let request = Request::<V::Commitment>::Finalized { height };
625 resolver.fetch_targeted(request, targets).await;
626 }
627 Message::SubscribeByDigest {
628 round,
629 digest,
630 response,
631 } => {
632 self.handle_subscribe(
633 round,
634 BlockSubscriptionKey::Digest(digest),
635 response,
636 &mut resolver,
637 &mut waiters,
638 &mut buffer,
639 )
640 .await;
641 }
642 Message::SubscribeByCommitment {
643 round,
644 commitment,
645 response,
646 } => {
647 self.handle_subscribe(
648 round,
649 BlockSubscriptionKey::Commitment(commitment),
650 response,
651 &mut resolver,
652 &mut waiters,
653 &mut buffer,
654 )
655 .await;
656 }
657 Message::SetFloor { height } => {
658 if self.last_processed_height >= height {
659 warn!(
660 %height,
661 existing = %self.last_processed_height,
662 "floor not updated, lower than existing"
663 );
664 continue;
665 }
666
667 self.update_processed_height(height, &mut resolver).await;
669 if let Err(err) = self.application_metadata.sync().await {
670 error!(?err, %height, "failed to update floor");
671 return;
672 }
673
674 self.pending_acks.clear();
678
679 if let Err(err) = self.prune_finalized_archives(height).await {
681 error!(?err, %height, "failed to prune finalized archives");
682 return;
683 }
684
685 }
689 Message::Prune { height } => {
690 if height > self.last_processed_height {
692 warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring");
693 continue;
694 }
695
696 if let Err(err) = self.prune_finalized_archives(height).await {
698 error!(?err, %height, "failed to prune finalized archives");
699 return;
700 }
701
702 }
706 }
707 },
708 Some(message) = resolver_rx.recv() else {
710 info!("handler closed, shutting down");
711 return;
712 } => {
713 let mut needs_sync = false;
716 let mut produces = Vec::new();
717 let mut delivers = Vec::new();
718 for msg in std::iter::once(message)
719 .chain(std::iter::from_fn(|| resolver_rx.try_recv().ok()))
720 .take(self.max_repair.get())
721 {
722 match msg {
723 handler::Message::Produce { key, response } => {
724 produces.push((key, response));
725 }
726 handler::Message::Deliver {
727 key,
728 value,
729 response,
730 } => {
731 needs_sync |= self
732 .handle_deliver(
733 key,
734 value,
735 response,
736 &mut delivers,
737 &mut application,
738 &mut buffer,
739 )
740 .await;
741 }
742 }
743 }
744
745 needs_sync |= self
747 .verify_delivered(delivers, &mut application, &mut buffer)
748 .await;
749
750 needs_sync |= self
753 .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
754 .await;
755
756 if needs_sync {
759 self.sync_finalized().await;
760 }
761
762 join_all(
764 produces
765 .into_iter()
766 .map(|(key, response)| self.handle_produce(key, response, &buffer)),
767 )
768 .await;
769 },
770 }
771 }
772
773 async fn handle_produce<Buf: Buffer<V>>(
775 &self,
776 key: Request<V::Commitment>,
777 response: oneshot::Sender<Bytes>,
778 buffer: &Buf,
779 ) {
780 match key {
781 Request::Block(commitment) => {
782 let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
783 debug!(?commitment, "block missing on request");
784 return;
785 };
786 response.send_lossy(block.encode());
787 }
788 Request::Finalized { height } => {
789 let Some(finalization) = self.get_finalization_by_height(height).await else {
790 debug!(%height, "finalization missing on request");
791 return;
792 };
793 let Some(block) = self.get_finalized_block(height).await else {
794 debug!(%height, "finalized block missing on request");
795 return;
796 };
797 response.send_lossy((finalization, block).encode());
798 }
799 Request::Notarized { round } => {
800 let Some(notarization) = self.cache.get_notarization(round).await else {
801 debug!(?round, "notarization missing on request");
802 return;
803 };
804 let commitment = notarization.proposal.payload;
805 let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
806 debug!(?commitment, "block missing on request");
807 return;
808 };
809 response.send_lossy((notarization, block).encode());
810 }
811 }
812 }
813
814 async fn handle_subscribe<Buf: Buffer<V>>(
816 &mut self,
817 round: Option<Round>,
818 key: BlockSubscriptionKeyFor<V>,
819 response: oneshot::Sender<V::Block>,
820 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
821 waiters: &mut AbortablePool<Result<V::Block, BlockSubscriptionKeyFor<V>>>,
822 buffer: &mut Buf,
823 ) {
824 let digest = match key {
825 BlockSubscriptionKey::Digest(digest) => digest,
826 BlockSubscriptionKey::Commitment(commitment) => V::commitment_to_inner(commitment),
827 };
828
829 let block = match key {
831 BlockSubscriptionKey::Digest(digest) => self.find_block_by_digest(buffer, digest).await,
832 BlockSubscriptionKey::Commitment(commitment) => {
833 self.find_block_by_commitment(buffer, commitment).await
834 }
835 };
836 if let Some(block) = block {
837 response.send_lossy(block);
838 return;
839 }
840
841 if let Some(round) = round {
846 if round < self.last_processed_round {
847 return;
853 }
854 debug!(?round, ?digest, "requested block missing");
858 resolver
859 .fetch(Request::<V::Commitment>::Notarized { round })
860 .await;
861 }
862
863 match key {
865 BlockSubscriptionKey::Digest(digest) => {
866 debug!(?round, ?digest, "registering subscriber");
867 }
868 BlockSubscriptionKey::Commitment(commitment) => {
869 debug!(?round, ?commitment, ?digest, "registering subscriber");
870 }
871 }
872 match self.block_subscriptions.entry(key) {
873 Entry::Occupied(mut entry) => {
874 entry.get_mut().subscribers.push(response);
875 }
876 Entry::Vacant(entry) => {
877 let rx = match key {
878 BlockSubscriptionKey::Digest(digest) => {
879 buffer.subscribe_by_digest(digest).await
880 }
881 BlockSubscriptionKey::Commitment(commitment) => {
882 buffer.subscribe_by_commitment(commitment).await
883 }
884 };
885 let waiter_key = key;
886 let aborter = waiters.push(async move {
887 rx.await
888 .map_or_else(|_| Err(waiter_key), |block| Ok(block.into_block()))
889 });
890 entry.insert(BlockSubscription {
891 subscribers: vec![response],
892 _aborter: aborter,
893 });
894 }
895 }
896 }
897
898 async fn handle_deliver<Buf: Buffer<V>>(
903 &mut self,
904 key: Request<V::Commitment>,
905 value: Bytes,
906 response: oneshot::Sender<bool>,
907 delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
908 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
909 buffer: &mut Buf,
910 ) -> bool {
911 match key {
912 Request::Block(commitment) => {
913 let Ok(block) = V::Block::decode_cfg(value.as_ref(), &self.block_codec_config)
914 else {
915 response.send_lossy(false);
916 return false;
917 };
918 if V::commitment(&block) != commitment {
919 response.send_lossy(false);
920 return false;
921 }
922
923 let height = block.height();
925 let digest = block.digest();
926 let finalization = self.cache.get_finalization_for(digest).await;
927 let wrote = self
928 .store_finalization(height, digest, block, finalization, application, buffer)
929 .await;
930 debug!(?digest, %height, "received block");
931 response.send_lossy(true); wrote
933 }
934 Request::Finalized { height } => {
935 let Some(bounds) = self.epocher.containing(height) else {
936 response.send_lossy(false);
937 return false;
938 };
939 let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
940 response.send_lossy(false);
941 return false;
942 };
943
944 let Ok((finalization, block)) =
945 <(Finalization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
946 value,
947 &(
948 scheme.certificate_codec_config(),
949 self.block_codec_config.clone(),
950 ),
951 )
952 else {
953 response.send_lossy(false);
954 return false;
955 };
956
957 let commitment = finalization.proposal.payload;
958 if block.height() != height
959 || V::commitment(&block) != commitment
960 || finalization.epoch() != bounds.epoch()
961 {
962 response.send_lossy(false);
963 return false;
964 }
965 delivers.push(PendingVerification::Finalized {
966 finalization,
967 block,
968 response,
969 });
970 false
971 }
972 Request::Notarized { round } => {
973 let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
974 response.send_lossy(false);
975 return false;
976 };
977
978 let Ok((notarization, block)) =
979 <(Notarization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
980 value,
981 &(
982 scheme.certificate_codec_config(),
983 self.block_codec_config.clone(),
984 ),
985 )
986 else {
987 response.send_lossy(false);
988 return false;
989 };
990
991 if notarization.round() != round
992 || V::commitment(&block) != notarization.proposal.payload
993 {
994 response.send_lossy(false);
995 return false;
996 }
997 delivers.push(PendingVerification::Notarized {
998 notarization,
999 block,
1000 response,
1001 });
1002 false
1003 }
1004 }
1005 }
1006
1007 async fn verify_delivered<Buf: Buffer<V>>(
1010 &mut self,
1011 mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1012 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1013 buffer: &mut Buf,
1014 ) -> bool {
1015 if delivers.is_empty() {
1016 return false;
1017 }
1018
1019 let certs: Vec<_> = delivers
1021 .iter()
1022 .map(|item| match item {
1023 PendingVerification::Finalized { finalization, .. } => (
1024 Subject::Finalize {
1025 proposal: &finalization.proposal,
1026 },
1027 &finalization.certificate,
1028 ),
1029 PendingVerification::Notarized { notarization, .. } => (
1030 Subject::Notarize {
1031 proposal: ¬arization.proposal,
1032 },
1033 ¬arization.certificate,
1034 ),
1035 })
1036 .collect();
1037
1038 let verified = if let Some(scheme) = self.provider.all() {
1041 verify_certificates(&mut self.context, scheme.as_ref(), &certs, &self.strategy)
1042 } else {
1043 let mut verified = vec![false; delivers.len()];
1044
1045 let mut by_epoch: BTreeMap<Epoch, Vec<usize>> = BTreeMap::new();
1047 for (i, item) in delivers.iter().enumerate() {
1048 let epoch = match item {
1049 PendingVerification::Notarized { notarization, .. } => notarization.epoch(),
1050 PendingVerification::Finalized { finalization, .. } => finalization.epoch(),
1051 };
1052 by_epoch.entry(epoch).or_default().push(i);
1053 }
1054
1055 for (epoch, indices) in &by_epoch {
1057 let Some(scheme) = self.provider.scoped(*epoch) else {
1058 continue;
1059 };
1060 let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect();
1061 let results =
1062 verify_certificates(&mut self.context, scheme.as_ref(), &group, &self.strategy);
1063 for (j, &idx) in indices.iter().enumerate() {
1064 verified[idx] = results[j];
1065 }
1066 }
1067 verified
1068 };
1069
1070 let mut wrote = false;
1072 for (index, item) in delivers.drain(..).enumerate() {
1073 if !verified[index] {
1074 match item {
1075 PendingVerification::Finalized { response, .. }
1076 | PendingVerification::Notarized { response, .. } => {
1077 response.send_lossy(false);
1078 }
1079 }
1080 continue;
1081 }
1082 match item {
1083 PendingVerification::Finalized {
1084 finalization,
1085 block,
1086 response,
1087 } => {
1088 response.send_lossy(true);
1090 let round = finalization.round();
1091 let height = block.height();
1092 let digest = block.digest();
1093 debug!(?round, %height, "received finalization");
1094
1095 wrote |= self
1096 .store_finalization(
1097 height,
1098 digest,
1099 block,
1100 Some(finalization),
1101 application,
1102 buffer,
1103 )
1104 .await;
1105 }
1106 PendingVerification::Notarized {
1107 notarization,
1108 block,
1109 response,
1110 } => {
1111 response.send_lossy(true);
1113 let round = notarization.round();
1114 let commitment = notarization.proposal.payload;
1115 let digest = V::commitment_to_inner(commitment);
1116 debug!(?round, ?digest, "received notarization");
1117
1118 let height = block.height();
1123 if let Some(finalization) = self.cache.get_finalization_for(digest).await {
1124 wrote |= self
1127 .store_finalization(
1128 height,
1129 digest,
1130 block.clone(),
1131 Some(finalization),
1132 application,
1133 buffer,
1134 )
1135 .await;
1136 }
1137
1138 self.cache_block(round, digest, block).await;
1140 self.cache
1141 .put_notarization(round, digest, notarization)
1142 .await;
1143 }
1144 }
1145 }
1146
1147 wrote
1148 }
1149
1150 fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
1155 self.provider.all().or_else(|| self.provider.scoped(epoch))
1156 }
1157
1158 fn notify_subscribers(&mut self, block: &V::Block) {
1162 if let Some(mut bs) = self
1163 .block_subscriptions
1164 .remove(&BlockSubscriptionKey::Digest(block.digest()))
1165 {
1166 for subscriber in bs.subscribers.drain(..) {
1167 subscriber.send_lossy(block.clone());
1168 }
1169 }
1170 if let Some(mut bs) = self
1171 .block_subscriptions
1172 .remove(&BlockSubscriptionKey::Commitment(V::commitment(block)))
1173 {
1174 for subscriber in bs.subscribers.drain(..) {
1175 subscriber.send_lossy(block.clone());
1176 }
1177 }
1178 }
1179
1180 async fn try_dispatch_blocks(
1214 &mut self,
1215 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1216 ) {
1217 while self.pending_acks.has_capacity() {
1218 let next_height = self
1219 .pending_acks
1220 .next_dispatch_height(self.last_processed_height);
1221 let Some(block) = self.get_finalized_block(next_height).await else {
1222 return;
1223 };
1224 assert_eq!(
1225 block.height(),
1226 next_height,
1227 "finalized block height mismatch"
1228 );
1229
1230 let (height, commitment) = (block.height(), V::commitment(&block));
1231 let (ack, ack_waiter) = A::handle();
1232 application
1233 .report(Update::Block(V::into_inner(block), ack))
1234 .await;
1235 self.pending_acks.enqueue(PendingAck {
1236 height,
1237 commitment,
1238 receiver: ack_waiter,
1239 });
1240 }
1241 }
1242
1243 async fn handle_block_processed(
1248 &mut self,
1249 height: Height,
1250 commitment: V::Commitment,
1251 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1252 ) {
1253 self.update_processed_height(height, resolver).await;
1255
1256 resolver
1258 .cancel(Request::<V::Commitment>::Block(commitment))
1259 .await;
1260
1261 if let Some(finalization) = self.get_finalization_by_height(height).await {
1262 let lpr = self.last_processed_round;
1264 let prune_round = Round::new(
1265 lpr.epoch(),
1266 lpr.view().saturating_sub(self.view_retention_timeout),
1267 );
1268
1269 self.cache.prune(prune_round).await;
1271
1272 let round = finalization.round();
1274 self.last_processed_round = round;
1275
1276 resolver
1278 .retain(Request::<V::Commitment>::Notarized { round }.predicate())
1279 .await;
1280 }
1281 }
1282
1283 async fn cache_verified(
1287 &mut self,
1288 round: Round,
1289 digest: <V::Block as Digestible>::Digest,
1290 block: V::Block,
1291 ) {
1292 self.notify_subscribers(&block);
1293 self.cache.put_verified(round, digest, block.into()).await;
1294 }
1295
1296 async fn cache_block(
1298 &mut self,
1299 round: Round,
1300 digest: <V::Block as Digestible>::Digest,
1301 block: V::Block,
1302 ) {
1303 self.notify_subscribers(&block);
1304 self.cache.put_block(round, digest, block.into()).await;
1305 }
1306
1307 async fn sync_finalized(&mut self) {
1314 if let Err(e) = try_join!(
1315 async {
1316 self.finalized_blocks.sync().await.map_err(Box::new)?;
1317 Ok::<_, BoxedError>(())
1318 },
1319 async {
1320 self.finalizations_by_height
1321 .sync()
1322 .await
1323 .map_err(Box::new)?;
1324 Ok::<_, BoxedError>(())
1325 },
1326 ) {
1327 panic!("failed to sync finalization archives: {e}");
1328 }
1329 }
1330
1331 async fn get_finalized_block(&self, height: Height) -> Option<V::Block> {
1335 match self
1336 .finalized_blocks
1337 .get(ArchiveID::Index(height.get()))
1338 .await
1339 {
1340 Ok(stored) => stored.map(|stored| stored.into()),
1341 Err(e) => panic!("failed to get block: {e}"),
1342 }
1343 }
1344
1345 async fn get_finalization_by_height(
1347 &self,
1348 height: Height,
1349 ) -> Option<Finalization<P::Scheme, V::Commitment>> {
1350 match self
1351 .finalizations_by_height
1352 .get(ArchiveID::Index(height.get()))
1353 .await
1354 {
1355 Ok(finalization) => finalization,
1356 Err(e) => panic!("failed to get finalization: {e}"),
1357 }
1358 }
1359
1360 async fn store_finalization<Buf: Buffer<V>>(
1370 &mut self,
1371 height: Height,
1372 digest: <V::Block as Digestible>::Digest,
1373 block: V::Block,
1374 finalization: Option<Finalization<P::Scheme, V::Commitment>>,
1375 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1376 buffer: &mut Buf,
1377 ) -> bool {
1378 if height <= self.last_processed_height {
1382 debug!(
1383 %height,
1384 floor = %self.last_processed_height,
1385 ?digest,
1386 "dropping finalization at or below processed height floor"
1387 );
1388 return false;
1389 }
1390 self.notify_subscribers(&block);
1391
1392 let commitment = V::commitment(&block);
1394 let stored: V::StoredBlock = block.into();
1395 let round = finalization.as_ref().map(|f| f.round());
1396
1397 if let Err(e) = try_join!(
1399 async {
1401 self.finalized_blocks.put(stored).await.map_err(Box::new)?;
1402 Ok::<_, BoxedError>(())
1403 },
1404 async {
1406 if let Some(finalization) = finalization {
1407 self.finalizations_by_height
1408 .put(height, digest, finalization)
1409 .await
1410 .map_err(Box::new)?;
1411 }
1412 Ok::<_, BoxedError>(())
1413 }
1414 ) {
1415 panic!("failed to finalize: {e}");
1416 }
1417
1418 if let Some(round) = round.filter(|_| height > self.tip) {
1420 application.report(Update::Tip(round, height, digest)).await;
1421 self.tip = height;
1422 let _ = self.finalized_height.try_set(height.get());
1423 }
1424 buffer.finalized(commitment).await;
1425 self.try_dispatch_blocks(application).await;
1426
1427 true
1428 }
1429
1430 async fn get_latest(&mut self) -> Option<(Height, <V::Block as Digestible>::Digest, Round)> {
1443 let height = self.finalizations_by_height.last_index()?;
1444 let finalization = self
1445 .get_finalization_by_height(height)
1446 .await
1447 .expect("finalization missing");
1448 Some((
1449 height,
1450 V::commitment_to_inner(finalization.proposal.payload),
1451 finalization.round(),
1452 ))
1453 }
1454
1455 async fn find_block_in_storage(
1459 &self,
1460 digest: <V::Block as Digestible>::Digest,
1461 ) -> Option<V::Block> {
1462 if let Some(block) = self.cache.find_block(digest).await {
1464 return Some(block.into());
1465 }
1466 match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1468 Ok(stored) => stored.map(|stored| stored.into()),
1469 Err(e) => panic!("failed to get block: {e}"),
1470 }
1471 }
1472
1473 async fn find_block_by_digest<Buf: Buffer<V>>(
1478 &self,
1479 buffer: &mut Buf,
1480 digest: <V::Block as Digestible>::Digest,
1481 ) -> Option<V::Block> {
1482 if let Some(block) = buffer.find_by_digest(digest).await {
1483 return Some(block.into_block());
1484 }
1485 self.find_block_in_storage(digest).await
1486 }
1487
1488 async fn find_block_by_commitment<Buf: Buffer<V>>(
1493 &self,
1494 buffer: &Buf,
1495 commitment: V::Commitment,
1496 ) -> Option<V::Block> {
1497 if let Some(block) = buffer.find_by_commitment(commitment).await {
1498 return Some(block.into_block());
1499 }
1500 self.find_block_in_storage(V::commitment_to_inner(commitment))
1501 .await
1502 }
1503
1504 async fn try_repair_gaps<Buf: Buffer<V>>(
1511 &mut self,
1512 buffer: &mut Buf,
1513 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1514 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1515 ) -> bool {
1516 let mut wrote = false;
1517 let start = self.last_processed_height.next();
1518 'cache_repair: loop {
1519 let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1520 return wrote;
1522 };
1523
1524 let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1527 panic!("gapped block missing that should exist: {gap_end}");
1528 };
1529
1530 let gap_start = gap_start.map(Height::next).unwrap_or(start);
1534
1535 while cursor.height() > gap_start {
1537 let parent_digest = cursor.parent();
1538 let parent_commitment = V::parent_commitment(&cursor);
1539 if let Some(block) = self
1540 .find_block_by_commitment(buffer, parent_commitment)
1541 .await
1542 {
1543 let finalization = self.cache.get_finalization_for(parent_digest).await;
1544 wrote |= self
1545 .store_finalization(
1546 block.height(),
1547 parent_digest,
1548 block.clone(),
1549 finalization,
1550 application,
1551 buffer,
1552 )
1553 .await;
1554 debug!(height = %block.height(), "repaired block");
1555 cursor = block;
1556 } else {
1557 resolver
1563 .fetch(Request::<V::Commitment>::Block(parent_commitment))
1564 .await;
1565 break 'cache_repair;
1566 }
1567 }
1568 }
1569
1570 let missing_items = self
1576 .finalized_blocks
1577 .missing_items(start, self.max_repair.get());
1578 let requests = missing_items
1579 .into_iter()
1580 .map(|height| Request::<V::Commitment>::Finalized { height })
1581 .collect::<Vec<_>>();
1582 if !requests.is_empty() {
1583 resolver.fetch_all(requests).await
1584 }
1585 wrote
1586 }
1587
1588 async fn update_processed_height(
1591 &mut self,
1592 height: Height,
1593 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1594 ) {
1595 self.application_metadata.put(LATEST_KEY, height);
1596 self.last_processed_height = height;
1597 let _ = self
1598 .processed_height
1599 .try_set(self.last_processed_height.get());
1600
1601 resolver
1603 .retain(Request::<V::Commitment>::Finalized { height }.predicate())
1604 .await;
1605 }
1606
1607 async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
1609 try_join!(
1610 async {
1611 self.finalized_blocks
1612 .prune(height)
1613 .await
1614 .map_err(Box::new)?;
1615 Ok::<_, BoxedError>(())
1616 },
1617 async {
1618 self.finalizations_by_height
1619 .prune(height)
1620 .await
1621 .map_err(Box::new)?;
1622 Ok::<_, BoxedError>(())
1623 }
1624 )?;
1625 Ok(())
1626 }
1627}