1use super::{
2 cache,
3 mailbox::{Mailbox, Message},
4 Buffer, IntoBlock, Variant,
5};
6use crate::{
7 marshal::{
8 resolver::handler::{self, Request},
9 store::{Blocks, Certificates},
10 Config, Identifier as BlockID, Update,
11 },
12 simplex::{
13 scheme::Scheme,
14 types::{verify_certificates, Finalization, Notarization, Subject},
15 },
16 types::{Epoch, Epocher, Height, Round, ViewDelta},
17 Block, Epochable, Heightable, Reporter,
18};
19use bytes::Bytes;
20use commonware_codec::{Decode, Encode, Read};
21use commonware_cryptography::{
22 certificate::{Provider, Scheme as CertificateScheme},
23 Digestible,
24};
25use commonware_macros::select_loop;
26use commonware_p2p::Recipients;
27use commonware_parallel::Strategy;
28use commonware_resolver::Resolver;
29use commonware_runtime::{
30 spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle,
31 Metrics, Spawner, Storage,
32};
33use commonware_storage::{
34 archive::Identifier as ArchiveID,
35 metadata::{self, Metadata},
36};
37use commonware_utils::{
38 acknowledgement::Exact,
39 channel::{fallible::OneshotExt, mpsc, oneshot},
40 futures::{AbortablePool, Aborter, OptionFuture},
41 sequence::U64,
42 Acknowledgement, BoxedError,
43};
44use futures::{future::join_all, try_join, FutureExt};
45use pin_project::pin_project;
46use prometheus_client::metrics::gauge::Gauge;
47use rand_core::CryptoRngCore;
48use std::{
49 collections::{btree_map::Entry, BTreeMap, VecDeque},
50 future::Future,
51 num::NonZeroUsize,
52 pin::Pin,
53 sync::Arc,
54};
55use tracing::{debug, error, info, warn};
56
57const LATEST_KEY: U64 = U64::new(0xFF);
59
60enum PendingVerification<S: CertificateScheme, V: Variant> {
62 Notarized {
63 notarization: Notarization<S, V::Commitment>,
64 block: V::Block,
65 response: oneshot::Sender<bool>,
66 },
67 Finalized {
68 finalization: Finalization<S, V::Commitment>,
69 block: V::Block,
70 response: oneshot::Sender<bool>,
71 },
72}
73
74#[pin_project]
76struct PendingAck<V: Variant, A: Acknowledgement> {
77 height: Height,
78 commitment: V::Commitment,
79 #[pin]
80 receiver: A::Waiter,
81}
82
83impl<V: Variant, A: Acknowledgement> Future for PendingAck<V, A> {
84 type Output = <A::Waiter as Future>::Output;
85
86 fn poll(
87 self: std::pin::Pin<&mut Self>,
88 cx: &mut std::task::Context<'_>,
89 ) -> std::task::Poll<Self::Output> {
90 self.project().receiver.poll(cx)
91 }
92}
93
94struct PendingAcks<V: Variant, A: Acknowledgement> {
96 current: OptionFuture<PendingAck<V, A>>,
97 queue: VecDeque<PendingAck<V, A>>,
98 max: usize,
99}
100
101impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
102 fn new(max: usize) -> Self {
104 Self {
105 current: None.into(),
106 queue: VecDeque::with_capacity(max),
107 max,
108 }
109 }
110
111 fn clear(&mut self) {
113 self.current = None.into();
114 self.queue.clear();
115 }
116
117 const fn current(&mut self) -> &mut OptionFuture<PendingAck<V, A>> {
119 &mut self.current
120 }
121
122 fn has_capacity(&self) -> bool {
124 let reserved = usize::from(self.current.is_some());
125 self.queue.len() < self.max - reserved
126 }
127
128 fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
130 self.queue
131 .back()
132 .map(|ack| ack.height.next())
133 .or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
134 .unwrap_or_else(|| last_processed_height.next())
135 }
136
137 fn enqueue(&mut self, ack: PendingAck<V, A>) {
139 if self.current.is_none() {
140 self.current.replace(ack);
141 return;
142 }
143 self.queue.push_back(ack);
144 }
145
146 fn complete_current(
148 &mut self,
149 result: <A::Waiter as Future>::Output,
150 ) -> (Height, V::Commitment, <A::Waiter as Future>::Output) {
151 let PendingAck {
152 height, commitment, ..
153 } = self.current.take().expect("ack state must be present");
154 if let Some(next) = self.queue.pop_front() {
155 self.current.replace(next);
156 }
157 (height, commitment, result)
158 }
159
160 fn pop_ready(&mut self) -> Option<(Height, V::Commitment, <A::Waiter as Future>::Output)> {
162 let pending = self.current.as_mut()?;
163 let result = Pin::new(&mut pending.receiver).now_or_never()?;
164 Some(self.complete_current(result))
165 }
166}
167
168struct BlockSubscription<V: Variant> {
170 subscribers: Vec<oneshot::Sender<V::Block>>,
172 _aborter: Aborter,
174}
175
176#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
181enum BlockSubscriptionKey<C, D> {
182 Digest(D),
183 Commitment(C),
184}
185
186type BlockSubscriptionKeyFor<V> =
187 BlockSubscriptionKey<<V as Variant>::Commitment, <<V as Variant>::Block as Digestible>::Digest>;
188
189pub struct Actor<E, V, P, FC, FB, ES, T, A = Exact>
202where
203 E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
204 V: Variant,
205 P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
206 FC: Certificates<
207 BlockDigest = <V::Block as Digestible>::Digest,
208 Commitment = V::Commitment,
209 Scheme = P::Scheme,
210 >,
211 FB: Blocks<Block = V::StoredBlock>,
212 ES: Epocher,
213 T: Strategy,
214 A: Acknowledgement,
215{
216 context: ContextCell<E>,
218
219 mailbox: mpsc::Receiver<Message<P::Scheme, V>>,
222
223 provider: P,
226 epocher: ES,
228 view_retention_timeout: ViewDelta,
230 max_repair: NonZeroUsize,
232 block_codec_config: <V::Block as Read>::Cfg,
234 strategy: T,
236
237 last_processed_round: Round,
240 last_processed_height: Height,
242 pending_acks: PendingAcks<V, A>,
244 tip: Height,
246 block_subscriptions: BTreeMap<BlockSubscriptionKeyFor<V>, BlockSubscription<V>>,
248
249 cache: cache::Manager<E, V, P::Scheme>,
252 application_metadata: Metadata<E, U64, Height>,
254 finalizations_by_height: FC,
256 finalized_blocks: FB,
258
259 finalized_height: Gauge,
262 processed_height: Gauge,
264}
265
266impl<E, V, P, FC, FB, ES, T, A> Actor<E, V, P, FC, FB, ES, T, A>
267where
268 E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
269 V: Variant,
270 P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
271 FC: Certificates<
272 BlockDigest = <V::Block as Digestible>::Digest,
273 Commitment = V::Commitment,
274 Scheme = P::Scheme,
275 >,
276 FB: Blocks<Block = V::StoredBlock>,
277 ES: Epocher,
278 T: Strategy,
279 A: Acknowledgement,
280{
281 pub async fn init(
283 context: E,
284 finalizations_by_height: FC,
285 finalized_blocks: FB,
286 config: Config<V::Block, P, ES, T>,
287 ) -> (Self, Mailbox<P::Scheme, V>, Height) {
288 let prunable_config = cache::Config {
290 partition_prefix: format!("{}-cache", config.partition_prefix),
291 prunable_items_per_section: config.prunable_items_per_section,
292 replay_buffer: config.replay_buffer,
293 key_write_buffer: config.key_write_buffer,
294 value_write_buffer: config.value_write_buffer,
295 key_page_cache: config.page_cache.clone(),
296 };
297 let cache = cache::Manager::init(
298 context.with_label("cache"),
299 prunable_config,
300 config.block_codec_config.clone(),
301 )
302 .await;
303
304 let application_metadata = Metadata::init(
306 context.with_label("application_metadata"),
307 metadata::Config {
308 partition: format!("{}-application-metadata", config.partition_prefix),
309 codec_config: (),
310 },
311 )
312 .await
313 .expect("failed to initialize application metadata");
314 let last_processed_height = application_metadata
315 .get(&LATEST_KEY)
316 .copied()
317 .unwrap_or(Height::zero());
318
319 let finalized_height = Gauge::default();
321 context.register(
322 "finalized_height",
323 "Finalized height of application",
324 finalized_height.clone(),
325 );
326 let processed_height = Gauge::default();
327 context.register(
328 "processed_height",
329 "Processed height of application",
330 processed_height.clone(),
331 );
332 let _ = processed_height.try_set(last_processed_height.get());
333
334 let (sender, mailbox) = mpsc::channel(config.mailbox_size);
336 (
337 Self {
338 context: ContextCell::new(context),
339 mailbox,
340 provider: config.provider,
341 epocher: config.epocher,
342 view_retention_timeout: config.view_retention_timeout,
343 max_repair: config.max_repair,
344 block_codec_config: config.block_codec_config,
345 strategy: config.strategy,
346 last_processed_round: Round::zero(),
347 last_processed_height,
348 pending_acks: PendingAcks::new(config.max_pending_acks.get()),
349 tip: Height::zero(),
350 block_subscriptions: BTreeMap::new(),
351 cache,
352 application_metadata,
353 finalizations_by_height,
354 finalized_blocks,
355 finalized_height,
356 processed_height,
357 },
358 Mailbox::new(sender),
359 last_processed_height,
360 )
361 }
362
363 pub fn start<R, Buf>(
365 mut self,
366 application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
367 buffer: Buf,
368 resolver: (mpsc::Receiver<handler::Message<V::Commitment>>, R),
369 ) -> Handle<()>
370 where
371 R: Resolver<
372 Key = handler::Request<V::Commitment>,
373 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
374 >,
375 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
376 {
377 spawn_cell!(self.context, self.run(application, buffer, resolver).await)
378 }
379
380 async fn run<R, Buf>(
382 mut self,
383 mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
384 mut buffer: Buf,
385 (mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<V::Commitment>>, R),
386 ) where
387 R: Resolver<
388 Key = handler::Request<V::Commitment>,
389 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
390 >,
391 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
392 {
393 let mut waiters = AbortablePool::<Result<V::Block, BlockSubscriptionKeyFor<V>>>::default();
395
396 let tip = self.get_latest().await;
398 if let Some((height, digest, round)) = tip {
399 application.report(Update::Tip(round, height, digest)).await;
400 self.tip = height;
401 let _ = self.finalized_height.try_set(height.get());
402 }
403
404 self.cache.load_persisted_epochs().await;
407
408 if self
410 .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
411 .await
412 {
413 self.sync_finalized().await;
414 }
415
416 self.try_dispatch_blocks(&mut application).await;
418
419 select_loop! {
420 self.context,
421 on_start => {
422 self.block_subscriptions.retain(|_, bs| {
424 bs.subscribers.retain(|tx| !tx.is_closed());
425 !bs.subscribers.is_empty()
426 });
427 },
428 on_stopped => {
429 debug!("context shutdown, stopping marshal");
430 },
431 Ok(completion) = waiters.next_completed() else continue => match completion {
433 Ok(block) => self.notify_subscribers(&block),
434 Err(key) => {
435 match key {
436 BlockSubscriptionKey::Digest(digest) => {
437 debug!(
438 ?digest,
439 "buffer subscription closed, canceling local subscribers"
440 );
441 }
442 BlockSubscriptionKey::Commitment(commitment) => {
443 debug!(
444 ?commitment,
445 "buffer subscription closed, canceling local subscribers"
446 );
447 }
448 }
449 self.block_subscriptions.remove(&key);
450 }
451 },
452 result = self.pending_acks.current() => {
454 let mut pending = Some(self.pending_acks.complete_current(result));
456 loop {
457 let (height, commitment, result) =
458 pending.take().expect("pending ack must exist");
459 match result {
460 Ok(()) => {
461 self.handle_block_processed(height, commitment, &mut resolver)
463 .await;
464 }
465 Err(e) => {
466 error!(e = ?e, height = %height, "application did not acknowledge block");
468 return;
469 }
470 }
471
472 let Some(next) = self.pending_acks.pop_ready() else {
475 break;
476 };
477 pending = Some(next);
478 }
479
480 if let Err(e) = self.application_metadata.sync().await {
482 error!(?e, "failed to sync application progress");
483 return;
484 }
485
486 self.try_dispatch_blocks(&mut application).await;
488 },
489 Some(message) = self.mailbox.recv() else {
491 info!("mailbox closed, shutting down");
492 break;
493 } => {
494 match message {
495 Message::GetInfo {
496 identifier,
497 response,
498 } => {
499 let info = match identifier {
500 BlockID::Digest(digest) => self
504 .finalized_blocks
505 .get(ArchiveID::Key(&digest))
506 .await
507 .ok()
508 .flatten()
509 .map(|b| (b.height(), digest)),
510 BlockID::Height(height) => self
511 .finalizations_by_height
512 .get(ArchiveID::Index(height.get()))
513 .await
514 .ok()
515 .flatten()
516 .map(|f| (height, V::commitment_to_inner(f.proposal.payload))),
517 BlockID::Latest => self.get_latest().await.map(|(h, d, _)| (h, d)),
518 };
519 response.send_lossy(info);
520 }
521 Message::Proposed { round, block } => {
522 self.cache_verified(round, block.digest(), block.clone())
523 .await;
524 buffer.send(round, block, Recipients::All).await;
525 }
526 Message::Forward {
527 round,
528 commitment,
529 peers,
530 } => {
531 if peers.is_empty() {
532 continue;
533 }
534 let Some(block) = self.find_block_by_commitment(&buffer, commitment).await
535 else {
536 debug!(?commitment, "block not found for forwarding");
537 continue;
538 };
539 buffer.send(round, block, Recipients::Some(peers)).await;
540 }
541 Message::Verified { round, block } => {
542 self.cache_verified(round, block.digest(), block).await;
543 }
544 Message::Notarization { notarization } => {
545 let round = notarization.round();
546 let commitment = notarization.proposal.payload;
547 let digest = V::commitment_to_inner(commitment);
548
549 self.cache
551 .put_notarization(round, digest, notarization.clone())
552 .await;
553
554 if let Some(block) =
556 self.find_block_by_commitment(&buffer, commitment).await
557 {
558 self.cache_block(round, digest, block).await;
560 } else {
561 debug!(?round, "notarized block missing");
562 resolver
563 .fetch(Request::<V::Commitment>::Notarized { round })
564 .await;
565 }
566 }
567 Message::Finalization { finalization } => {
568 let round = finalization.round();
570 let commitment = finalization.proposal.payload;
571 let digest = V::commitment_to_inner(commitment);
572 self.cache
573 .put_finalization(round, digest, finalization.clone())
574 .await;
575
576 if let Some(block) =
578 self.find_block_by_commitment(&buffer, commitment).await
579 {
580 let height = block.height();
582 if self
583 .store_finalization(
584 height,
585 digest,
586 block,
587 Some(finalization),
588 &mut application,
589 &mut buffer,
590 )
591 .await
592 {
593 self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
594 .await;
595 self.sync_finalized().await;
596 debug!(?round, %height, "finalized block stored");
597 }
598 } else {
599 debug!(?round, ?commitment, "finalized block missing");
601 resolver
602 .fetch(Request::<V::Commitment>::Block(commitment))
603 .await;
604 }
605 }
606 Message::GetBlock {
607 identifier,
608 response,
609 } => match identifier {
610 BlockID::Digest(digest) => {
611 let result = self.find_block_by_digest(&mut buffer, digest).await;
612 response.send_lossy(result);
613 }
614 BlockID::Height(height) => {
615 let result = self.get_finalized_block(height).await;
616 response.send_lossy(result);
617 }
618 BlockID::Latest => {
619 let block = match self.get_latest().await {
620 Some((_, digest, _)) => {
621 self.find_block_by_digest(&mut buffer, digest).await
622 }
623 None => None,
624 };
625 response.send_lossy(block);
626 }
627 },
628 Message::GetFinalization { height, response } => {
629 let finalization = self.get_finalization_by_height(height).await;
630 response.send_lossy(finalization);
631 }
632 Message::HintFinalized { height, targets } => {
633 if height <= self.last_processed_height {
635 continue;
636 }
637
638 if self.get_finalization_by_height(height).await.is_some() {
640 continue;
641 }
642
643 let request = Request::<V::Commitment>::Finalized { height };
645 resolver.fetch_targeted(request, targets).await;
646 }
647 Message::SubscribeByDigest {
648 round,
649 digest,
650 response,
651 } => {
652 self.handle_subscribe(
653 round,
654 BlockSubscriptionKey::Digest(digest),
655 response,
656 &mut resolver,
657 &mut waiters,
658 &mut buffer,
659 )
660 .await;
661 }
662 Message::SubscribeByCommitment {
663 round,
664 commitment,
665 response,
666 } => {
667 self.handle_subscribe(
668 round,
669 BlockSubscriptionKey::Commitment(commitment),
670 response,
671 &mut resolver,
672 &mut waiters,
673 &mut buffer,
674 )
675 .await;
676 }
677 Message::SetFloor { height } => {
678 if self.last_processed_height >= height {
679 warn!(
680 %height,
681 existing = %self.last_processed_height,
682 "floor not updated, lower than existing"
683 );
684 continue;
685 }
686
687 self.update_processed_height(height, &mut resolver).await;
689 if let Err(err) = self.application_metadata.sync().await {
690 error!(?err, %height, "failed to update floor");
691 return;
692 }
693
694 self.pending_acks.clear();
698
699 if let Err(err) = self.prune_finalized_archives(height).await {
701 error!(?err, %height, "failed to prune finalized archives");
702 return;
703 }
704
705 }
709 Message::Prune { height } => {
710 if height > self.last_processed_height {
712 warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring");
713 continue;
714 }
715
716 if let Err(err) = self.prune_finalized_archives(height).await {
718 error!(?err, %height, "failed to prune finalized archives");
719 return;
720 }
721
722 }
726 }
727 },
728 Some(message) = resolver_rx.recv() else {
730 info!("handler closed, shutting down");
731 return;
732 } => {
733 let mut needs_sync = false;
736 let mut produces = Vec::new();
737 let mut delivers = Vec::new();
738 for msg in std::iter::once(message)
739 .chain(std::iter::from_fn(|| resolver_rx.try_recv().ok()))
740 .take(self.max_repair.get())
741 {
742 match msg {
743 handler::Message::Produce { key, response } => {
744 produces.push((key, response));
745 }
746 handler::Message::Deliver {
747 key,
748 value,
749 response,
750 } => {
751 needs_sync |= self
752 .handle_deliver(
753 key,
754 value,
755 response,
756 &mut delivers,
757 &mut application,
758 &mut buffer,
759 )
760 .await;
761 }
762 }
763 }
764
765 needs_sync |= self
767 .verify_delivered(delivers, &mut application, &mut buffer)
768 .await;
769
770 needs_sync |= self
773 .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
774 .await;
775
776 if needs_sync {
779 self.sync_finalized().await;
780 }
781
782 join_all(
784 produces
785 .into_iter()
786 .map(|(key, response)| self.handle_produce(key, response, &buffer)),
787 )
788 .await;
789 },
790 }
791 }
792
793 async fn handle_produce<Buf: Buffer<V>>(
795 &self,
796 key: Request<V::Commitment>,
797 response: oneshot::Sender<Bytes>,
798 buffer: &Buf,
799 ) {
800 match key {
801 Request::Block(commitment) => {
802 let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
803 debug!(?commitment, "block missing on request");
804 return;
805 };
806 response.send_lossy(block.encode());
807 }
808 Request::Finalized { height } => {
809 let Some(finalization) = self.get_finalization_by_height(height).await else {
810 debug!(%height, "finalization missing on request");
811 return;
812 };
813 let Some(block) = self.get_finalized_block(height).await else {
814 debug!(%height, "finalized block missing on request");
815 return;
816 };
817 response.send_lossy((finalization, block).encode());
818 }
819 Request::Notarized { round } => {
820 let Some(notarization) = self.cache.get_notarization(round).await else {
821 debug!(?round, "notarization missing on request");
822 return;
823 };
824 let commitment = notarization.proposal.payload;
825 let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
826 debug!(?commitment, "block missing on request");
827 return;
828 };
829 response.send_lossy((notarization, block).encode());
830 }
831 }
832 }
833
834 async fn handle_subscribe<Buf: Buffer<V>>(
836 &mut self,
837 round: Option<Round>,
838 key: BlockSubscriptionKeyFor<V>,
839 response: oneshot::Sender<V::Block>,
840 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
841 waiters: &mut AbortablePool<Result<V::Block, BlockSubscriptionKeyFor<V>>>,
842 buffer: &mut Buf,
843 ) {
844 let digest = match key {
845 BlockSubscriptionKey::Digest(digest) => digest,
846 BlockSubscriptionKey::Commitment(commitment) => V::commitment_to_inner(commitment),
847 };
848
849 let block = match key {
851 BlockSubscriptionKey::Digest(digest) => self.find_block_by_digest(buffer, digest).await,
852 BlockSubscriptionKey::Commitment(commitment) => {
853 self.find_block_by_commitment(buffer, commitment).await
854 }
855 };
856 if let Some(block) = block {
857 response.send_lossy(block);
858 return;
859 }
860
861 if let Some(round) = round {
866 if round < self.last_processed_round {
867 return;
873 }
874 debug!(?round, ?digest, "requested block missing");
878 resolver
879 .fetch(Request::<V::Commitment>::Notarized { round })
880 .await;
881 }
882
883 match key {
885 BlockSubscriptionKey::Digest(digest) => {
886 debug!(?round, ?digest, "registering subscriber");
887 }
888 BlockSubscriptionKey::Commitment(commitment) => {
889 debug!(?round, ?commitment, ?digest, "registering subscriber");
890 }
891 }
892 match self.block_subscriptions.entry(key) {
893 Entry::Occupied(mut entry) => {
894 entry.get_mut().subscribers.push(response);
895 }
896 Entry::Vacant(entry) => {
897 let rx = match key {
898 BlockSubscriptionKey::Digest(digest) => {
899 buffer.subscribe_by_digest(digest).await
900 }
901 BlockSubscriptionKey::Commitment(commitment) => {
902 buffer.subscribe_by_commitment(commitment).await
903 }
904 };
905 let waiter_key = key;
906 let aborter = waiters.push(async move {
907 rx.await
908 .map_or_else(|_| Err(waiter_key), |block| Ok(block.into_block()))
909 });
910 entry.insert(BlockSubscription {
911 subscribers: vec![response],
912 _aborter: aborter,
913 });
914 }
915 }
916 }
917
918 async fn handle_deliver<Buf: Buffer<V>>(
923 &mut self,
924 key: Request<V::Commitment>,
925 value: Bytes,
926 response: oneshot::Sender<bool>,
927 delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
928 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
929 buffer: &mut Buf,
930 ) -> bool {
931 match key {
932 Request::Block(commitment) => {
933 let Ok(block) = V::Block::decode_cfg(value.as_ref(), &self.block_codec_config)
934 else {
935 response.send_lossy(false);
936 return false;
937 };
938 if V::commitment(&block) != commitment {
939 response.send_lossy(false);
940 return false;
941 }
942
943 let height = block.height();
945 let digest = block.digest();
946 let finalization = self.cache.get_finalization_for(digest).await;
947 let wrote = self
948 .store_finalization(height, digest, block, finalization, application, buffer)
949 .await;
950 debug!(?digest, %height, "received block");
951 response.send_lossy(true); wrote
953 }
954 Request::Finalized { height } => {
955 let Some(bounds) = self.epocher.containing(height) else {
956 debug!(
957 %height,
958 floor = %self.last_processed_height,
959 "ignoring stale delivery"
960 );
961 response.send_lossy(true);
962 return false;
963 };
964 let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
965 debug!(
966 %height,
967 floor = %self.last_processed_height,
968 "ignoring stale delivery"
969 );
970 response.send_lossy(true);
971 return false;
972 };
973
974 let Ok((finalization, block)) =
975 <(Finalization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
976 value,
977 &(
978 scheme.certificate_codec_config(),
979 self.block_codec_config.clone(),
980 ),
981 )
982 else {
983 response.send_lossy(false);
984 return false;
985 };
986
987 let commitment = finalization.proposal.payload;
988 if block.height() != height
989 || V::commitment(&block) != commitment
990 || finalization.epoch() != bounds.epoch()
991 {
992 response.send_lossy(false);
993 return false;
994 }
995 delivers.push(PendingVerification::Finalized {
996 finalization,
997 block,
998 response,
999 });
1000 false
1001 }
1002 Request::Notarized { round } => {
1003 let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
1004 debug!(
1005 ?round,
1006 floor = %self.last_processed_height,
1007 "ignoring stale delivery"
1008 );
1009 response.send_lossy(true);
1010 return false;
1011 };
1012
1013 let Ok((notarization, block)) =
1014 <(Notarization<P::Scheme, V::Commitment>, V::Block)>::decode_cfg(
1015 value,
1016 &(
1017 scheme.certificate_codec_config(),
1018 self.block_codec_config.clone(),
1019 ),
1020 )
1021 else {
1022 response.send_lossy(false);
1023 return false;
1024 };
1025
1026 if notarization.round() != round
1027 || V::commitment(&block) != notarization.proposal.payload
1028 {
1029 response.send_lossy(false);
1030 return false;
1031 }
1032 delivers.push(PendingVerification::Notarized {
1033 notarization,
1034 block,
1035 response,
1036 });
1037 false
1038 }
1039 }
1040 }
1041
1042 async fn verify_delivered<Buf: Buffer<V>>(
1045 &mut self,
1046 mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1047 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1048 buffer: &mut Buf,
1049 ) -> bool {
1050 if delivers.is_empty() {
1051 return false;
1052 }
1053
1054 let certs: Vec<_> = delivers
1056 .iter()
1057 .map(|item| match item {
1058 PendingVerification::Finalized { finalization, .. } => (
1059 Subject::Finalize {
1060 proposal: &finalization.proposal,
1061 },
1062 &finalization.certificate,
1063 ),
1064 PendingVerification::Notarized { notarization, .. } => (
1065 Subject::Notarize {
1066 proposal: ¬arization.proposal,
1067 },
1068 ¬arization.certificate,
1069 ),
1070 })
1071 .collect();
1072
1073 let verified = if let Some(scheme) = self.provider.all() {
1076 verify_certificates(&mut self.context, scheme.as_ref(), &certs, &self.strategy)
1077 } else {
1078 let mut verified = vec![false; delivers.len()];
1079
1080 let mut by_epoch: BTreeMap<Epoch, Vec<usize>> = BTreeMap::new();
1082 for (i, item) in delivers.iter().enumerate() {
1083 let epoch = match item {
1084 PendingVerification::Notarized { notarization, .. } => notarization.epoch(),
1085 PendingVerification::Finalized { finalization, .. } => finalization.epoch(),
1086 };
1087 by_epoch.entry(epoch).or_default().push(i);
1088 }
1089
1090 for (epoch, indices) in &by_epoch {
1092 let Some(scheme) = self.provider.scoped(*epoch) else {
1093 continue;
1094 };
1095 let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect();
1096 let results =
1097 verify_certificates(&mut self.context, scheme.as_ref(), &group, &self.strategy);
1098 for (j, &idx) in indices.iter().enumerate() {
1099 verified[idx] = results[j];
1100 }
1101 }
1102 verified
1103 };
1104
1105 let mut wrote = false;
1107 for (index, item) in delivers.drain(..).enumerate() {
1108 if !verified[index] {
1109 match item {
1110 PendingVerification::Finalized { response, .. }
1111 | PendingVerification::Notarized { response, .. } => {
1112 response.send_lossy(false);
1113 }
1114 }
1115 continue;
1116 }
1117 match item {
1118 PendingVerification::Finalized {
1119 finalization,
1120 block,
1121 response,
1122 } => {
1123 response.send_lossy(true);
1125 let round = finalization.round();
1126 let height = block.height();
1127 let digest = block.digest();
1128 debug!(?round, %height, "received finalization");
1129
1130 wrote |= self
1131 .store_finalization(
1132 height,
1133 digest,
1134 block,
1135 Some(finalization),
1136 application,
1137 buffer,
1138 )
1139 .await;
1140 }
1141 PendingVerification::Notarized {
1142 notarization,
1143 block,
1144 response,
1145 } => {
1146 response.send_lossy(true);
1148 let round = notarization.round();
1149 let commitment = notarization.proposal.payload;
1150 let digest = V::commitment_to_inner(commitment);
1151 debug!(?round, ?digest, "received notarization");
1152
1153 let height = block.height();
1158 if let Some(finalization) = self.cache.get_finalization_for(digest).await {
1159 wrote |= self
1162 .store_finalization(
1163 height,
1164 digest,
1165 block.clone(),
1166 Some(finalization),
1167 application,
1168 buffer,
1169 )
1170 .await;
1171 }
1172
1173 self.cache_block(round, digest, block).await;
1175 self.cache
1176 .put_notarization(round, digest, notarization)
1177 .await;
1178 }
1179 }
1180 }
1181
1182 wrote
1183 }
1184
1185 fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
1190 self.provider.all().or_else(|| self.provider.scoped(epoch))
1191 }
1192
1193 fn notify_subscribers(&mut self, block: &V::Block) {
1197 if let Some(mut bs) = self
1198 .block_subscriptions
1199 .remove(&BlockSubscriptionKey::Digest(block.digest()))
1200 {
1201 for subscriber in bs.subscribers.drain(..) {
1202 subscriber.send_lossy(block.clone());
1203 }
1204 }
1205 if let Some(mut bs) = self
1206 .block_subscriptions
1207 .remove(&BlockSubscriptionKey::Commitment(V::commitment(block)))
1208 {
1209 for subscriber in bs.subscribers.drain(..) {
1210 subscriber.send_lossy(block.clone());
1211 }
1212 }
1213 }
1214
1215 async fn try_dispatch_blocks(
1249 &mut self,
1250 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1251 ) {
1252 while self.pending_acks.has_capacity() {
1253 let next_height = self
1254 .pending_acks
1255 .next_dispatch_height(self.last_processed_height);
1256 let Some(block) = self.get_finalized_block(next_height).await else {
1257 return;
1258 };
1259 assert_eq!(
1260 block.height(),
1261 next_height,
1262 "finalized block height mismatch"
1263 );
1264
1265 let (height, commitment) = (block.height(), V::commitment(&block));
1266 let (ack, ack_waiter) = A::handle();
1267 application
1268 .report(Update::Block(V::into_inner(block), ack))
1269 .await;
1270 self.pending_acks.enqueue(PendingAck {
1271 height,
1272 commitment,
1273 receiver: ack_waiter,
1274 });
1275 }
1276 }
1277
1278 async fn handle_block_processed(
1283 &mut self,
1284 height: Height,
1285 commitment: V::Commitment,
1286 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1287 ) {
1288 self.update_processed_height(height, resolver).await;
1290
1291 resolver
1293 .cancel(Request::<V::Commitment>::Block(commitment))
1294 .await;
1295
1296 if let Some(finalization) = self.get_finalization_by_height(height).await {
1297 let lpr = self.last_processed_round;
1299 let prune_round = Round::new(
1300 lpr.epoch(),
1301 lpr.view().saturating_sub(self.view_retention_timeout),
1302 );
1303
1304 self.cache.prune(prune_round).await;
1306
1307 let round = finalization.round();
1309 self.last_processed_round = round;
1310
1311 resolver
1313 .retain(Request::<V::Commitment>::Notarized { round }.predicate())
1314 .await;
1315 }
1316 }
1317
1318 async fn cache_verified(
1322 &mut self,
1323 round: Round,
1324 digest: <V::Block as Digestible>::Digest,
1325 block: V::Block,
1326 ) {
1327 self.notify_subscribers(&block);
1328 self.cache.put_verified(round, digest, block.into()).await;
1329 }
1330
1331 async fn cache_block(
1333 &mut self,
1334 round: Round,
1335 digest: <V::Block as Digestible>::Digest,
1336 block: V::Block,
1337 ) {
1338 self.notify_subscribers(&block);
1339 self.cache.put_block(round, digest, block.into()).await;
1340 }
1341
1342 async fn sync_finalized(&mut self) {
1349 if let Err(e) = try_join!(
1350 async {
1351 self.finalized_blocks.sync().await.map_err(Box::new)?;
1352 Ok::<_, BoxedError>(())
1353 },
1354 async {
1355 self.finalizations_by_height
1356 .sync()
1357 .await
1358 .map_err(Box::new)?;
1359 Ok::<_, BoxedError>(())
1360 },
1361 ) {
1362 panic!("failed to sync finalization archives: {e}");
1363 }
1364 }
1365
1366 async fn get_finalized_block(&self, height: Height) -> Option<V::Block> {
1370 match self
1371 .finalized_blocks
1372 .get(ArchiveID::Index(height.get()))
1373 .await
1374 {
1375 Ok(stored) => stored.map(|stored| stored.into()),
1376 Err(e) => panic!("failed to get block: {e}"),
1377 }
1378 }
1379
1380 async fn get_finalization_by_height(
1382 &self,
1383 height: Height,
1384 ) -> Option<Finalization<P::Scheme, V::Commitment>> {
1385 match self
1386 .finalizations_by_height
1387 .get(ArchiveID::Index(height.get()))
1388 .await
1389 {
1390 Ok(finalization) => finalization,
1391 Err(e) => panic!("failed to get finalization: {e}"),
1392 }
1393 }
1394
1395 async fn store_finalization<Buf: Buffer<V>>(
1405 &mut self,
1406 height: Height,
1407 digest: <V::Block as Digestible>::Digest,
1408 block: V::Block,
1409 finalization: Option<Finalization<P::Scheme, V::Commitment>>,
1410 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1411 buffer: &mut Buf,
1412 ) -> bool {
1413 if height <= self.last_processed_height {
1417 debug!(
1418 %height,
1419 floor = %self.last_processed_height,
1420 ?digest,
1421 "dropping finalization at or below processed height floor"
1422 );
1423 return false;
1424 }
1425 self.notify_subscribers(&block);
1426
1427 let commitment = V::commitment(&block);
1429 let stored: V::StoredBlock = block.into();
1430 let round = finalization.as_ref().map(|f| f.round());
1431
1432 if let Err(e) = try_join!(
1434 async {
1436 self.finalized_blocks.put(stored).await.map_err(Box::new)?;
1437 Ok::<_, BoxedError>(())
1438 },
1439 async {
1441 if let Some(finalization) = finalization {
1442 self.finalizations_by_height
1443 .put(height, digest, finalization)
1444 .await
1445 .map_err(Box::new)?;
1446 }
1447 Ok::<_, BoxedError>(())
1448 }
1449 ) {
1450 panic!("failed to finalize: {e}");
1451 }
1452
1453 if let Some(round) = round.filter(|_| height > self.tip) {
1455 application.report(Update::Tip(round, height, digest)).await;
1456 self.tip = height;
1457 let _ = self.finalized_height.try_set(height.get());
1458 }
1459 buffer.finalized(commitment).await;
1460 self.try_dispatch_blocks(application).await;
1461
1462 true
1463 }
1464
1465 async fn get_latest(&mut self) -> Option<(Height, <V::Block as Digestible>::Digest, Round)> {
1478 let height = self.finalizations_by_height.last_index()?;
1479 let finalization = self
1480 .get_finalization_by_height(height)
1481 .await
1482 .expect("finalization missing");
1483 Some((
1484 height,
1485 V::commitment_to_inner(finalization.proposal.payload),
1486 finalization.round(),
1487 ))
1488 }
1489
1490 async fn find_block_in_storage(
1494 &self,
1495 digest: <V::Block as Digestible>::Digest,
1496 ) -> Option<V::Block> {
1497 if let Some(block) = self.cache.find_block(digest).await {
1499 return Some(block.into());
1500 }
1501 match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1503 Ok(stored) => stored.map(|stored| stored.into()),
1504 Err(e) => panic!("failed to get block: {e}"),
1505 }
1506 }
1507
1508 async fn find_block_by_digest<Buf: Buffer<V>>(
1513 &self,
1514 buffer: &mut Buf,
1515 digest: <V::Block as Digestible>::Digest,
1516 ) -> Option<V::Block> {
1517 if let Some(block) = buffer.find_by_digest(digest).await {
1518 return Some(block.into_block());
1519 }
1520 self.find_block_in_storage(digest).await
1521 }
1522
1523 async fn find_block_by_commitment<Buf: Buffer<V>>(
1528 &self,
1529 buffer: &Buf,
1530 commitment: V::Commitment,
1531 ) -> Option<V::Block> {
1532 if let Some(block) = buffer.find_by_commitment(commitment).await {
1533 return Some(block.into_block());
1534 }
1535 self.find_block_in_storage(V::commitment_to_inner(commitment))
1536 .await
1537 }
1538
1539 async fn try_repair_gaps<Buf: Buffer<V>>(
1551 &mut self,
1552 buffer: &mut Buf,
1553 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1554 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1555 ) -> bool {
1556 let mut wrote = false;
1557 let start = self.last_processed_height.next();
1558
1559 if let Some(last_finalized) = self.finalizations_by_height.last_index() {
1562 let have_block = self
1563 .finalized_blocks
1564 .last_index()
1565 .is_some_and(|last| last >= last_finalized);
1566 if last_finalized > self.last_processed_height && !have_block {
1567 let finalization = self
1569 .get_finalization_by_height(last_finalized)
1570 .await
1571 .expect("finalization missing");
1572 let commitment = finalization.proposal.payload;
1573 if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1574 let digest = block.digest();
1576 wrote |= self
1577 .store_finalization(
1578 last_finalized,
1579 digest,
1580 block,
1581 Some(finalization),
1582 application,
1583 buffer,
1584 )
1585 .await;
1586 } else {
1587 resolver
1589 .fetch(Request::<V::Commitment>::Block(commitment))
1590 .await;
1591 }
1592 }
1593 }
1594
1595 'cache_repair: loop {
1597 let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1598 return wrote;
1600 };
1601
1602 let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1605 panic!("gapped block missing that should exist: {gap_end}");
1606 };
1607
1608 let gap_start = gap_start.map(Height::next).unwrap_or(start);
1612
1613 while cursor.height() > gap_start {
1615 let parent_digest = cursor.parent();
1616 let parent_commitment = V::parent_commitment(&cursor);
1617 if let Some(block) = self
1618 .find_block_by_commitment(buffer, parent_commitment)
1619 .await
1620 {
1621 let finalization = self.cache.get_finalization_for(parent_digest).await;
1622 wrote |= self
1623 .store_finalization(
1624 block.height(),
1625 parent_digest,
1626 block.clone(),
1627 finalization,
1628 application,
1629 buffer,
1630 )
1631 .await;
1632 debug!(height = %block.height(), "repaired block");
1633 cursor = block;
1634 } else {
1635 resolver
1641 .fetch(Request::<V::Commitment>::Block(parent_commitment))
1642 .await;
1643 break 'cache_repair;
1644 }
1645 }
1646 }
1647
1648 let missing_items = self
1654 .finalized_blocks
1655 .missing_items(start, self.max_repair.get());
1656 let requests: Vec<_> = missing_items
1657 .into_iter()
1658 .map(|height| Request::<V::Commitment>::Finalized { height })
1659 .collect();
1660 if !requests.is_empty() {
1661 resolver.fetch_all(requests).await
1662 }
1663 wrote
1664 }
1665
1666 async fn update_processed_height(
1669 &mut self,
1670 height: Height,
1671 resolver: &mut impl Resolver<Key = Request<V::Commitment>>,
1672 ) {
1673 self.application_metadata.put(LATEST_KEY, height);
1674 self.last_processed_height = height;
1675 let _ = self
1676 .processed_height
1677 .try_set(self.last_processed_height.get());
1678
1679 resolver
1681 .retain(Request::<V::Commitment>::Finalized { height }.predicate())
1682 .await;
1683 }
1684
1685 async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
1687 try_join!(
1688 async {
1689 self.finalized_blocks
1690 .prune(height)
1691 .await
1692 .map_err(Box::new)?;
1693 Ok::<_, BoxedError>(())
1694 },
1695 async {
1696 self.finalizations_by_height
1697 .prune(height)
1698 .await
1699 .map_err(Box::new)?;
1700 Ok::<_, BoxedError>(())
1701 }
1702 )?;
1703 Ok(())
1704 }
1705}