1use super::{
2 acks::{PendingAck, PendingAcks},
3 cache,
4 delivery::PendingVerification,
5 floor::Floor,
6 mailbox::{CommitmentFallback, Mailbox, Message},
7 stream::Stream,
8 subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
9 variant::NoBuffer,
10 Buffer, Variant,
11};
12use crate::{
13 marshal::{
14 resolver::handler::{self, Annotation, Key, Request},
15 store::{Blocks, Certificates},
16 Config, Identifier as BlockID, Start, Update,
17 },
18 simplex::{
19 scheme::Scheme,
20 types::{verify_certificates, Finalization, Notarization, Subject},
21 },
22 types::{Epoch, Epocher, Height, Round, ViewDelta},
23 Block, Epochable, Heightable, Reporter,
24};
25use bytes::Bytes;
26use commonware_actor::mailbox;
27use commonware_codec::{Decode, Encode, Read};
28use commonware_cryptography::{
29 certificate::{Provider, Scheme as CertificateScheme},
30 Digestible,
31};
32use commonware_macros::select_loop;
33use commonware_p2p::Recipients;
34use commonware_parallel::Strategy;
35use commonware_resolver::{Delivery, Resolver, TargetedResolver};
36use commonware_runtime::{
37 spawn_cell,
38 telemetry::metrics::{Gauge, GaugeExt, MetricsExt as _},
39 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
40};
41use commonware_storage::archive::Identifier as ArchiveID;
42use commonware_utils::{
43 acknowledgement::Exact,
44 channel::{fallible::OneshotExt, oneshot},
45 futures::AbortablePool,
46 Acknowledgement, BoxedError,
47};
48use futures::{future::join_all, try_join};
49use rand_core::CryptoRngCore;
50use std::{collections::BTreeMap, future::Future, num::NonZeroUsize, sync::Arc};
51use tracing::{debug, warn};
52
53type ResolverRequestFor<V> = Key<<V as Variant>::Commitment>;
56
57struct ResolverDelivery<V: Variant> {
60 delivery: Delivery<ResolverRequestFor<V>, Annotation>,
61 value: Bytes,
62 response: oneshot::Sender<bool>,
63}
64
65pub struct Actor<E, V, P, FC, FB, ES, T, A = Exact>
78where
79 E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
80 V: Variant,
81 P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
82 FC: Certificates<
83 BlockDigest = <V::Block as Digestible>::Digest,
84 Commitment = V::Commitment,
85 Scheme = P::Scheme,
86 >,
87 FB: Blocks<Block = V::StoredBlock>,
88 ES: Epocher,
89 T: Strategy,
90 A: Acknowledgement,
91{
92 context: ContextCell<E>,
94
95 mailbox: mailbox::Receiver<Message<P::Scheme, V>>,
98
99 provider: P,
102 epocher: ES,
104 view_retention_timeout: ViewDelta,
106 max_repair: NonZeroUsize,
108 block_codec_config: <V::ApplicationBlock as Read>::Cfg,
110 strategy: T,
112
113 last_proposed_block: Option<(Round, V::Commitment, V::Block)>,
116 floor: Floor<P::Scheme, V::Commitment>,
118 stream: Stream<E>,
120 pending_acks: PendingAcks<V, A>,
122 tip: Height,
124 block_subscriptions: Subscriptions<V>,
126
127 cache: cache::Manager<E, V, P::Scheme>,
130 finalizations_by_height: FC,
132 finalized_blocks: FB,
134
135 finalized_height: Gauge,
138 processed_height: Gauge,
140}
141
142impl<E, V, P, FC, FB, ES, T, A> Actor<E, V, P, FC, FB, ES, T, A>
143where
144 E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage,
145 V: Variant,
146 P: Provider<Scope = Epoch, Scheme: Scheme<V::Commitment>>,
147 FC: Certificates<
148 BlockDigest = <V::Block as Digestible>::Digest,
149 Commitment = V::Commitment,
150 Scheme = P::Scheme,
151 >,
152 FB: Blocks<Block = V::StoredBlock>,
153 ES: Epocher,
154 T: Strategy,
155 A: Acknowledgement,
156{
157 pub async fn init(
159 context: E,
160 finalizations_by_height: FC,
161 mut finalized_blocks: FB,
162 config: Config<P, ES, T, V::ApplicationBlock, V::Block, V::Commitment>,
163 ) -> (Self, Mailbox<P::Scheme, V>, Option<Height>) {
164 let prunable_config = cache::Config {
166 partition_prefix: format!("{}-cache", config.partition_prefix),
167 prunable_items_per_section: config.prunable_items_per_section,
168 replay_buffer: config.replay_buffer,
169 key_write_buffer: config.key_write_buffer,
170 value_write_buffer: config.value_write_buffer,
171 key_page_cache: config.page_cache.clone(),
172 };
173 let cache = cache::Manager::init(
174 context.child("cache"),
175 prunable_config,
176 config.block_codec_config.clone(),
177 )
178 .await;
179
180 let application_metadata_partition =
182 format!("{}-application-metadata", config.partition_prefix);
183 let stream = Stream::new(context.child("stream"), &application_metadata_partition).await;
184 let last_processed_height = stream.processed_height();
185
186 let pending_floor_anchor = match config.start {
189 Start::Genesis(anchor) => {
190 assert_eq!(
191 anchor.height(),
192 Height::zero(),
193 "genesis anchor must be at height zero"
194 );
195 Self::ensure_genesis_anchor(&mut finalized_blocks, anchor, last_processed_height)
196 .await;
197 None
198 }
199 Start::Floor(finalization) => Some(finalization),
200 };
201 let last_processed_round =
202 Self::latest_processed_round(&finalizations_by_height, last_processed_height).await;
203
204 let finalized_height = context.gauge("finalized_height", "Finalized height of application");
206 let processed_height = context.gauge("processed_height", "Processed height of application");
207 if let Some(last_processed_height) = last_processed_height {
208 let _ = processed_height.try_set(last_processed_height.get());
209 }
210 let floor = pending_floor_anchor.map_or_else(
211 || Floor::resolved(last_processed_height, last_processed_round),
212 |finalization| {
213 Floor::awaiting_anchor(last_processed_height, last_processed_round, finalization)
214 },
215 );
216
217 let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
219 (
220 Self {
221 context: ContextCell::new(context),
222 mailbox,
223 provider: config.provider,
224 epocher: config.epocher,
225 view_retention_timeout: config.view_retention_timeout,
226 max_repair: config.max_repair,
227 block_codec_config: config.block_codec_config,
228 strategy: config.strategy,
229 last_proposed_block: None,
230 floor,
231 stream,
232 pending_acks: PendingAcks::new(config.max_pending_acks.get()),
233 tip: Height::zero(),
234 block_subscriptions: Subscriptions::new(),
235 cache,
236 finalizations_by_height,
237 finalized_blocks,
238 finalized_height,
239 processed_height,
240 },
241 Mailbox::new(sender),
242 last_processed_height,
243 )
244 }
245
246 async fn ensure_genesis_anchor(
247 finalized_blocks: &mut FB,
248 anchor: V::Block,
249 last_processed_height: Option<Height>,
250 ) {
251 let anchor_height = anchor.height();
252 let anchor_commitment = V::commitment(&anchor);
253 match finalized_blocks
254 .get(ArchiveID::Index(anchor_height.get()))
255 .await
256 {
257 Ok(Some(stored)) => {
258 let stored: V::Block = stored.into();
259 assert_eq!(
260 stored.height(),
261 anchor_height,
262 "stored genesis block height mismatch"
263 );
264 assert!(
265 V::commitment(&stored) == anchor_commitment,
266 "stored genesis block does not match configured anchor"
267 );
268 }
269 Ok(None) => {
270 if let Some(existing) =
271 last_processed_height.filter(|height| anchor_height < *height)
272 {
273 warn!(
274 height = %anchor_height,
275 %existing,
276 "ignoring stale anchor"
277 );
278 return;
279 }
280
281 finalized_blocks
282 .put(anchor.into())
283 .await
284 .expect("failed to store startup anchor");
285 finalized_blocks
286 .sync()
287 .await
288 .expect("failed to sync startup anchor");
289 debug!(height = %anchor_height, "stored genesis block");
290 }
291 Err(err) => panic!("failed to check startup anchor: {err}"),
292 }
293 }
294
295 pub fn start<R, Buf>(
297 mut self,
298 application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
299 buffer: Buf,
300 resolver: (handler::Receiver<V::Commitment>, R),
301 ) -> Handle<()>
302 where
303 R: TargetedResolver<
304 Key = ResolverRequestFor<V>,
305 Subscriber = Annotation,
306 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
307 >,
308 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
309 {
310 spawn_cell!(self.context, self.run(application, buffer, resolver))
311 }
312
313 pub fn start_unbuffered<R>(
315 self,
316 application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
317 resolver: (handler::Receiver<V::Commitment>, R),
318 ) -> Handle<()>
319 where
320 R: TargetedResolver<
321 Key = ResolverRequestFor<V>,
322 Subscriber = Annotation,
323 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
324 >,
325 {
326 self.start(
327 application,
328 NoBuffer::<<P::Scheme as CertificateScheme>::PublicKey>::new(),
329 resolver,
330 )
331 }
332
333 async fn run<R, Buf>(
335 mut self,
336 mut application: impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
337 mut buffer: Buf,
338 (mut resolver_rx, mut resolver): (handler::Receiver<V::Commitment>, R),
339 ) where
340 R: TargetedResolver<
341 Key = ResolverRequestFor<V>,
342 Subscriber = Annotation,
343 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
344 >,
345 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
346 {
347 let mut waiters = AbortablePool::<Result<V::Block, SubscriptionKeyFor<V>>>::default();
349
350 let tip = self.get_latest().await;
352 if let Some((height, digest, round)) = tip {
353 application.report(Update::Tip(round, height, digest));
354 self.tip = height;
355 let _ = self.finalized_height.try_set(height.get());
356 }
357
358 self.cache.load_persisted_epochs().await;
361
362 if let Some(finalization) = self.floor.take_pending_anchor() {
365 self.install_floor(
366 finalization,
367 false,
368 &mut resolver,
369 &mut buffer,
370 &mut application,
371 )
372 .await;
373 }
374
375 if self
377 .try_repair_gaps(&mut buffer, &mut resolver, &mut application)
378 .await
379 {
380 self.sync_finalized().await;
381 }
382
383 self.try_dispatch_blocks(&mut application).await;
385
386 select_loop! {
387 self.context,
388 on_start => {
389 self.block_subscriptions.retain_open();
391 },
392 on_stopped => {
393 debug!("context shutdown, stopping marshal");
394 },
395 Ok(completion) = waiters.next_completed() else continue => match completion {
397 Ok(block) => self.block_subscriptions.notify(&block),
398 Err(key) => {
399 match key {
400 SubscriptionKey::Digest(digest) => {
401 debug!(
402 ?digest,
403 "buffer subscription closed, canceling local subscribers"
404 );
405 }
406 SubscriptionKey::Commitment(commitment) => {
407 debug!(
408 ?commitment,
409 "buffer subscription closed, canceling local subscribers"
410 );
411 }
412 }
413 self.block_subscriptions.remove(&key);
414 }
415 },
416 result = self.pending_acks.current() => {
418 self.handle_ack(result, &mut application, &mut buffer, &mut resolver)
419 .await;
420 },
421 Some(message) = self.mailbox.recv() else {
423 debug!("mailbox closed, shutting down");
424 break;
425 } => {
426 self.handle_mailbox_message(
427 message,
428 &mut resolver,
429 &mut waiters,
430 &mut buffer,
431 &mut application,
432 )
433 .await;
434 },
435 Some(message) = resolver_rx.recv() else {
437 debug!("handler closed, shutting down");
438 return;
439 } => {
440 self.handle_resolver_message(
441 message,
442 &mut resolver_rx,
443 &mut resolver,
444 &mut buffer,
445 &mut application,
446 )
447 .await;
448 },
449 }
450 }
451
452 async fn handle_ack<Buf, R>(
455 &mut self,
456 result: <A::Waiter as Future>::Output,
457 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
458 buffer: &mut Buf,
459 resolver: &mut R,
460 ) where
461 Buf: Buffer<V>,
462 R: Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
463 {
464 let mut pending = Some(self.pending_acks.complete_current(result));
466 let last_acked_commitment = loop {
467 let (height, commitment, result) = pending.take().expect("pending ack must exist");
468 match result {
469 Ok(()) => {
470 self.update_processed_height(height, resolver);
473 self.update_processed_round(height, resolver).await;
474 }
475 Err(e) => {
476 panic!("application did not acknowledge block at height {height}: {e:?}");
478 }
479 }
480
481 match self.pending_acks.pop_ready() {
484 Some(next) => pending = Some(next),
485 None => break commitment,
486 }
487 };
488
489 self.stream
491 .sync()
492 .await
493 .expect("failed to sync application progress");
494
495 buffer.finalized(last_acked_commitment);
498
499 self.try_dispatch_blocks(application).await;
501 }
502
503 async fn handle_mailbox_message<Buf, R>(
505 &mut self,
506 message: Message<P::Scheme, V>,
507 resolver: &mut R,
508 waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
509 buffer: &mut Buf,
510 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
511 ) where
512 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
513 R: TargetedResolver<
514 Key = ResolverRequestFor<V>,
515 Subscriber = Annotation,
516 PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
517 >,
518 {
519 if message.response_closed() {
520 return;
521 }
522
523 match message {
524 Message::GetInfo {
525 identifier,
526 response,
527 } => {
528 let info = match identifier {
529 BlockID::Digest(digest) => self
533 .finalized_blocks
534 .get(ArchiveID::Key(&digest))
535 .await
536 .ok()
537 .flatten()
538 .map(|b| (b.height(), digest)),
539 BlockID::Height(height) => self.get_info_by_height(height).await,
540 BlockID::Latest => self.get_latest().await.map(|(h, d, _)| (h, d)),
541 };
542 response.send_lossy(info);
543 }
544 Message::GetVerified { round, response } => {
545 let block = self.cache.get_verified(round).await.map(Into::into);
546 response.send_lossy(block);
547 }
548 Message::Forward {
549 round,
550 commitment,
551 recipients,
552 } => {
553 if matches!(&recipients, Recipients::Some(peers) if peers.is_empty()) {
554 return;
555 }
556 let block = match self.take_proposed(round, commitment) {
557 Some(block) => block,
558 None => {
559 let Some(block) = self.find_block_by_commitment(buffer, commitment).await
560 else {
561 debug!(?commitment, "block not found for forwarding");
562 return;
563 };
564 block
565 }
566 };
567 buffer.send(round, block, recipients);
568 }
569 Message::Proposed { round, block, ack } => {
570 self.cache_verified(round, block.digest(), block.clone())
575 .await;
576 self.apply_floor_anchor(&block, buffer, application, resolver)
577 .await;
578
579 let commitment = V::commitment(&block);
583 self.last_proposed_block = Some((round, commitment, block));
584 ack.expect("durable ack present").send_lossy(());
585 }
586 Message::Verified { round, block, ack } => {
587 self.cache_verified(round, block.digest(), block.clone())
592 .await;
593 self.apply_floor_anchor(&block, buffer, application, resolver)
594 .await;
595 ack.expect("durable ack present").send_lossy(());
596 }
597 Message::Certified { round, block, ack } => {
598 self.cache_block(round, block.digest(), block.clone()).await;
603 self.apply_floor_anchor(&block, buffer, application, resolver)
604 .await;
605 ack.expect("durable ack present").send_lossy(());
606 }
607 Message::Notarization { notarization } => {
608 let round = notarization.round();
609 let commitment = notarization.proposal.payload;
610 let digest = V::commitment_to_inner(commitment);
611
612 self.cache
614 .put_notarization(round, digest, notarization.clone())
615 .await;
616
617 if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
621 self.cache_block(round, digest, block.clone()).await;
622 self.apply_floor_anchor(&block, buffer, application, resolver)
623 .await;
624 } else {
625 debug!(?round, "notarized block unavailable locally");
626 }
627 }
628 Message::Finalization { finalization } => {
629 let round = finalization.round();
630 let commitment = finalization.proposal.payload;
631 let digest = V::commitment_to_inner(commitment);
632
633 self.cache
635 .put_finalization(round, digest, finalization.clone())
636 .await;
637
638 if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
640 if self
643 .apply_floor_anchor(&block, buffer, application, resolver)
644 .await
645 {
646 return;
647 }
648
649 let height = block.height();
650 self.update_processed_round_floor(height, round, resolver)
651 .await;
652 if self
653 .store_finalization(height, digest, block, Some(finalization), application)
654 .await
655 {
656 self.try_repair_gaps(buffer, resolver, application).await;
659 self.sync_finalized().await;
660 self.try_dispatch_blocks(application).await;
661 debug!(?round, %height, "finalized block stored");
662 }
663 } else {
664 debug!(?round, ?commitment, "finalized block missing");
667 self.floor
668 .fetch_if_permitted(
669 resolver,
670 Request::finalized_block_by_round(commitment, round),
671 )
672 .ignore();
673 }
674 }
675 Message::GetBlock {
676 identifier,
677 response,
678 } => match identifier {
679 BlockID::Digest(digest) => {
680 let result = self.find_block_by_digest(buffer, digest).await;
681 response.send_lossy(result);
682 }
683 BlockID::Height(height) => {
684 let result = self.get_finalized_block(height).await;
685 response.send_lossy(result);
686 }
687 BlockID::Latest => {
688 let block = match self.get_latest().await {
689 Some((_, digest, _)) => self.find_block_by_digest(buffer, digest).await,
690 None => None,
691 };
692 response.send_lossy(block);
693 }
694 },
695 Message::GetFinalization { height, response } => {
696 let finalization = self.get_finalization_by_height(height).await;
697 response.send_lossy(finalization);
698 }
699 Message::GetProcessedHeight { response } => {
700 response.send_lossy(self.stream.processed_height());
701 }
702 Message::HintFinalized { height, targets } => {
703 if self.get_finalization_by_height(height).await.is_some() {
705 return;
706 }
707
708 self.floor
709 .fetch_targeted_if_permitted(resolver, Request::finalized(height), targets)
710 .ignore();
711 }
712 Message::SubscribeByDigest {
713 digest,
714 fallback,
715 response,
716 } => {
717 self.handle_subscribe(
718 fallback.into(),
719 SubscriptionKey::Digest(digest),
720 response,
721 resolver,
722 waiters,
723 buffer,
724 )
725 .await;
726 }
727 Message::SubscribeByCommitment {
728 commitment,
729 fallback,
730 response,
731 } => {
732 self.handle_subscribe(
733 fallback,
734 SubscriptionKey::Commitment(commitment),
735 response,
736 resolver,
737 waiters,
738 buffer,
739 )
740 .await;
741 }
742 Message::HintNotarized { round, commitment } => {
743 if self
744 .find_block_by_commitment(buffer, commitment)
745 .await
746 .is_none()
747 {
748 self.floor
749 .fetch_if_permitted(resolver, Request::notarized(round))
750 .ignore();
751 }
752 }
753 Message::SetFloor { finalization } => {
754 self.install_floor(finalization, true, resolver, buffer, application)
755 .await;
756 }
757 Message::Prune { height } => {
758 if height > self.floor.processed_height() {
760 warn!(%height, floor = %self.floor.processed_height(), "prune height above floor, ignoring");
761 return;
762 }
763
764 self.prune_finalized_archives(height)
765 .await
766 .expect("failed to prune finalized archives");
767
768 }
772 }
773 }
774
775 async fn handle_resolver_message<Buf, R>(
778 &mut self,
779 message: handler::Message<V::Commitment>,
780 resolver_rx: &mut handler::Receiver<V::Commitment>,
781 resolver: &mut R,
782 buffer: &mut Buf,
783 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
784 ) where
785 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
786 R: Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
787 {
788 let mut needs_sync = false;
789 let mut handled = false;
790 let mut produces = Vec::new();
791 let mut delivers = Vec::new();
792
793 for msg in std::iter::once(message)
797 .chain(std::iter::from_fn(|| resolver_rx.try_recv().ok()))
798 .take(self.max_repair.get())
799 {
800 if msg.response_closed() {
801 continue;
802 }
803 handled = true;
804
805 match msg {
806 handler::Message::Produce { key, response } => {
807 produces.push((key, response));
808 }
809 handler::Message::Deliver {
810 delivery,
811 value,
812 response,
813 } => {
814 needs_sync |= self
815 .handle_deliver(
816 ResolverDelivery {
817 delivery,
818 value,
819 response,
820 },
821 &mut delivers,
822 buffer,
823 application,
824 resolver,
825 )
826 .await;
827 }
828 }
829 }
830 if !handled {
831 return;
832 }
833
834 needs_sync |= self
836 .verify_delivered(delivers, buffer, application, resolver)
837 .await;
838
839 needs_sync |= self.try_repair_gaps(buffer, resolver, application).await;
842
843 if needs_sync {
844 self.sync_finalized().await;
847 self.try_dispatch_blocks(application).await;
848 }
849
850 join_all(
852 produces
853 .into_iter()
854 .map(|(key, response)| self.handle_produce(key, response, buffer)),
855 )
856 .await;
857 }
858
859 async fn handle_produce<Buf: Buffer<V>>(
861 &self,
862 key: ResolverRequestFor<V>,
863 response: oneshot::Sender<Bytes>,
864 buffer: &Buf,
865 ) {
866 match key {
867 Key::Block(commitment) => {
868 let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
869 debug!(?commitment, "block missing on request");
870 return;
871 };
872 response.send_lossy(block.encode());
873 }
874 Key::Finalized { height } => {
875 let Some(finalization) = self.get_finalization_by_height(height).await else {
876 debug!(%height, "finalization missing on request");
877 return;
878 };
879 let Some(block) = self.get_finalized_block(height).await else {
880 debug!(%height, "finalized block missing on request");
881 return;
882 };
883 response.send_lossy((finalization, block).encode());
884 }
885 Key::Notarized { round } => {
886 let Some(notarization) = self.cache.get_notarization(round).await else {
887 debug!(?round, "notarization missing on request");
888 return;
889 };
890 let commitment = notarization.proposal.payload;
891 let Some(block) = self.find_block_by_commitment(buffer, commitment).await else {
892 debug!(?commitment, "block missing on request");
893 return;
894 };
895 response.send_lossy((notarization, block).encode());
896 }
897 }
898 }
899
900 async fn handle_subscribe<Buf: Buffer<V>>(
902 &mut self,
903 fallback: CommitmentFallback,
904 key: SubscriptionKeyFor<V>,
905 response: oneshot::Sender<V::Block>,
906 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
907 waiters: &mut AbortablePool<Result<V::Block, SubscriptionKeyFor<V>>>,
908 buffer: &mut Buf,
909 ) {
910 let digest = match key {
911 SubscriptionKey::Digest(digest) => digest,
912 SubscriptionKey::Commitment(commitment) => V::commitment_to_inner(commitment),
913 };
914
915 let block = match key {
917 SubscriptionKey::Digest(digest) => self.find_block_by_digest(buffer, digest).await,
918 SubscriptionKey::Commitment(commitment) => {
919 self.find_block_by_commitment(buffer, commitment).await
920 }
921 };
922 if let Some(block) = block {
923 response.send_lossy(block);
924 return;
925 }
926
927 match fallback {
934 CommitmentFallback::FetchByRound { round } => {
935 if self
941 .floor
942 .fetch_if_permitted(resolver, Request::notarized(round))
943 .denied()
944 {
945 return;
946 }
947 debug!(?round, ?digest, "requested block missing");
948 }
949 CommitmentFallback::FetchByCommitment { height } => {
950 let commitment = match key {
951 SubscriptionKey::Commitment(commitment) => commitment,
952 SubscriptionKey::Digest(_) => {
953 unreachable!("digest subscriptions cannot request commitment fallback")
954 }
955 };
956
957 if self
960 .floor
961 .fetch_if_permitted(resolver, Request::certified_block(commitment, height))
962 .denied()
963 {
964 return;
965 }
966 debug!(%height, ?commitment, ?digest, "requested certified ancestry block missing");
967 }
968 CommitmentFallback::Wait => {}
969 }
970
971 let round = match fallback {
972 CommitmentFallback::FetchByRound { round } => Some(round),
973 CommitmentFallback::Wait | CommitmentFallback::FetchByCommitment { .. } => None,
974 };
975
976 match key {
978 SubscriptionKey::Digest(digest) => {
979 debug!(?round, ?digest, "registering subscriber");
980 }
981 SubscriptionKey::Commitment(commitment) => {
982 debug!(?round, ?commitment, ?digest, "registering subscriber");
983 }
984 }
985 self.block_subscriptions
986 .insert(key, response, waiters, buffer);
987 }
988
989 async fn install_floor<Buf, R>(
991 &mut self,
992 finalization: Finalization<P::Scheme, V::Commitment>,
993 skip_if_superseded: bool,
994 resolver: &mut R,
995 buffer: &mut Buf,
996 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
997 ) where
998 Buf: Buffer<V, PublicKey = <P::Scheme as CertificateScheme>::PublicKey>,
999 R: Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1000 {
1001 let round = finalization.round();
1002 if round <= self.floor.processed_round() {
1003 warn!(
1004 ?round,
1005 floor = ?self.floor.processed_round(),
1006 "floor not updated, below existing round floor"
1007 );
1008 return;
1009 }
1010
1011 let Some(scheme) = self.get_scheme_certificate_verifier(finalization.epoch()) else {
1012 panic!("floor finalization epoch unavailable");
1013 };
1014 assert!(
1015 finalization.verify(self.context.as_mut(), scheme.as_ref(), &self.strategy),
1016 "floor finalization must verify"
1017 );
1018
1019 let commitment = finalization.proposal.payload;
1020 let digest = V::commitment_to_inner(commitment);
1021 self.cache
1022 .put_finalization(round, digest, finalization.clone())
1023 .await;
1024
1025 if skip_if_superseded && self.floor.has_pending_anchor_at_or_after(round) {
1028 return;
1029 }
1030
1031 if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1032 self.floor.await_anchor(finalization);
1033 assert!(
1034 self.apply_floor_anchor(&block, buffer, application, resolver)
1035 .await
1036 );
1037 return;
1038 }
1039
1040 self.pending_acks.clear();
1043
1044 debug!(?round, ?commitment, "starting fetch for floor block");
1045 self.floor.await_anchor(finalization);
1046 self.floor
1047 .fetch_if_permitted(
1048 resolver,
1049 Request::finalized_block_by_round(commitment, round),
1050 )
1051 .ignore();
1052 }
1053
1054 async fn apply_floor_anchor<Buf: Buffer<V>>(
1056 &mut self,
1057 block: &V::Block,
1058 buffer: &mut Buf,
1059 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1060 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1061 ) -> bool {
1062 let commitment = V::commitment(block);
1063 if !self.floor.matches_pending_anchor(commitment) {
1064 return false;
1065 }
1066 let block = (*block).clone();
1067
1068 let height = block.height();
1071 if height > Height::zero() {
1072 let parent_commitment = V::parent_commitment(&block);
1073 assert!(
1074 block.parent() == V::commitment_to_inner(parent_commitment),
1075 "floor block parent commitment mismatch"
1076 );
1077 }
1078
1079 if height <= self.floor.processed_height() {
1083 warn!(
1084 %height,
1085 existing = %self.floor.processed_height(),
1086 "floor not updated, at or below existing"
1087 );
1088 let finalization = self
1089 .floor
1090 .take_pending_anchor()
1091 .expect("pending floor anchor missing");
1092 self.update_processed_round_floor(height, finalization.round(), resolver)
1093 .await;
1094 if self.try_repair_gaps(buffer, resolver, application).await {
1095 self.sync_finalized().await;
1096 }
1097 self.try_dispatch_blocks(application).await;
1098 return true;
1099 }
1100
1101 let digest = block.digest();
1102 let finalization = self
1103 .floor
1104 .take_pending_anchor()
1105 .expect("pending floor anchor missing");
1106 let round = finalization.round();
1107 try_join!(
1108 async {
1109 self.finalized_blocks
1110 .put(block.clone().into())
1111 .await
1112 .map_err(Box::new)?;
1113 Ok::<_, BoxedError>(())
1114 },
1115 async {
1116 self.finalizations_by_height
1117 .put(height, digest, finalization)
1118 .await
1119 .map_err(Box::new)?;
1120 Ok::<_, BoxedError>(())
1121 }
1122 )
1123 .expect("failed to store floor anchor");
1124 self.sync_finalized().await;
1125 self.block_subscriptions.notify(&block);
1126
1127 if height > self.tip {
1128 application.report(Update::Tip(round, height, digest));
1129 self.tip = height;
1130 let _ = self.finalized_height.try_set(height.get());
1131 }
1132
1133 let dispatch_floor = height
1136 .previous()
1137 .expect("floor anchor above processed height must have predecessor");
1138 self.update_processed_height(dispatch_floor, resolver);
1139 self.update_processed_round_floor(dispatch_floor, round, resolver)
1140 .await;
1141 self.stream
1142 .sync()
1143 .await
1144 .expect("failed to sync floor metadata");
1145
1146 self.pending_acks.clear();
1149
1150 self.prune_after_floor(height)
1152 .await
1153 .expect("failed to prune data below floor");
1154
1155 if self.try_repair_gaps(buffer, resolver, application).await {
1159 self.sync_finalized().await;
1160 }
1161 self.try_dispatch_blocks(application).await;
1162 true
1163 }
1164
1165 async fn handle_deliver<Buf: Buffer<V>>(
1170 &mut self,
1171 message: ResolverDelivery<V>,
1172 delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
1173 buffer: &mut Buf,
1174 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1175 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1176 ) -> bool {
1177 let ResolverDelivery {
1178 delivery,
1179 mut value,
1180 response,
1181 } = message;
1182 let Delivery { key, subscribers } = delivery;
1183 match key {
1184 Key::Block(commitment) => {
1185 let block_cfg = V::block_cfg(&self.block_codec_config, commitment);
1186 let Ok(block) = V::Block::decode_cfg(value.as_ref(), &block_cfg) else {
1187 response.send_lossy(false);
1188 return false;
1189 };
1190 if V::commitment(&block) != commitment {
1191 response.send_lossy(false);
1192 return false;
1193 }
1194
1195 if self
1199 .apply_floor_anchor(&block, buffer, application, resolver)
1200 .await
1201 {
1202 response.send_lossy(true);
1203 return false;
1204 }
1205
1206 self.block_subscriptions.notify(&block);
1210
1211 let height = block.height();
1215 let digest = block.digest();
1216 let annotations = subscribers.into_vec();
1217
1218 let finalization = self.cache.get_finalization_for(digest).await;
1222 if let Some(finalization) = &finalization {
1223 self.update_processed_round_floor(height, finalization.round(), resolver)
1224 .await;
1225 }
1226 let wrote = if finalization.is_some()
1227 || annotations
1228 .iter()
1229 .any(|annotation| matches!(annotation, Annotation::Finalized(_)))
1230 {
1231 self.store_finalization(height, digest, block, finalization, application)
1232 .await
1233 } else {
1234 if annotations
1235 .iter()
1236 .any(|annotation| matches!(annotation, Annotation::Certified { .. }))
1237 && height > self.floor.processed_height()
1238 {
1239 if let Some(bounds) = self.epocher.containing(height) {
1240 self.cache
1241 .put_certified(bounds.epoch(), height, digest, block.clone().into())
1242 .await;
1243 }
1244 }
1245 false
1246 };
1247 debug!(?digest, %height, "received block");
1248 response.send_lossy(true);
1249 wrote
1250 }
1251 Key::Finalized { height } => {
1252 let Some(bounds) = self.epocher.containing(height) else {
1253 debug!(
1254 %height,
1255 floor = %self.floor.processed_height(),
1256 "ignoring stale delivery"
1257 );
1258 response.send_lossy(true);
1259 return false;
1260 };
1261 let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else {
1262 debug!(
1263 %height,
1264 floor = %self.floor.processed_height(),
1265 "ignoring stale delivery"
1266 );
1267 response.send_lossy(true);
1268 return false;
1269 };
1270
1271 let certificate_codec_config = scheme.certificate_codec_config();
1272 let Ok(finalization) =
1273 Finalization::read_cfg(&mut value, &certificate_codec_config)
1274 else {
1275 response.send_lossy(false);
1276 return false;
1277 };
1278
1279 let commitment = finalization.proposal.payload;
1280 let block_cfg = V::block_cfg(&self.block_codec_config, commitment);
1281 let Ok(block) = V::Block::decode_cfg(value, &block_cfg) else {
1282 response.send_lossy(false);
1283 return false;
1284 };
1285
1286 if block.height() != height
1287 || V::commitment(&block) != commitment
1288 || finalization.epoch() != bounds.epoch()
1289 {
1290 response.send_lossy(false);
1291 return false;
1292 }
1293 delivers.push(PendingVerification::Finalized {
1294 finalization,
1295 block,
1296 response,
1297 });
1298 false
1299 }
1300 Key::Notarized { round } => {
1301 let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else {
1302 debug!(
1303 ?round,
1304 floor = %self.floor.processed_height(),
1305 "ignoring stale delivery"
1306 );
1307 response.send_lossy(true);
1308 return false;
1309 };
1310
1311 let certificate_codec_config = scheme.certificate_codec_config();
1312 let Ok(notarization) =
1313 Notarization::read_cfg(&mut value, &certificate_codec_config)
1314 else {
1315 response.send_lossy(false);
1316 return false;
1317 };
1318
1319 let commitment = notarization.proposal.payload;
1320 let block_cfg = V::block_cfg(&self.block_codec_config, commitment);
1321 let Ok(block) = V::Block::decode_cfg(value, &block_cfg) else {
1322 response.send_lossy(false);
1323 return false;
1324 };
1325
1326 if notarization.round() != round
1327 || V::commitment(&block) != notarization.proposal.payload
1328 {
1329 response.send_lossy(false);
1330 return false;
1331 }
1332 delivers.push(PendingVerification::Notarized {
1333 notarization,
1334 block,
1335 response,
1336 });
1337 false
1338 }
1339 }
1340 }
1341
1342 async fn verify_delivered<Buf: Buffer<V>>(
1345 &mut self,
1346 mut delivers: Vec<PendingVerification<P::Scheme, V>>,
1347 buffer: &mut Buf,
1348 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1349 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1350 ) -> bool {
1351 delivers.retain(|item| !item.response_closed());
1352 if delivers.is_empty() {
1353 return false;
1354 }
1355
1356 let certs: Vec<_> = delivers
1358 .iter()
1359 .map(|item| match item {
1360 PendingVerification::Finalized { finalization, .. } => (
1361 Subject::Finalize {
1362 proposal: &finalization.proposal,
1363 },
1364 &finalization.certificate,
1365 ),
1366 PendingVerification::Notarized { notarization, .. } => (
1367 Subject::Notarize {
1368 proposal: ¬arization.proposal,
1369 },
1370 ¬arization.certificate,
1371 ),
1372 })
1373 .collect();
1374
1375 let verified = if let Some(scheme) = self.provider.all() {
1378 verify_certificates(
1379 self.context.as_mut(),
1380 scheme.as_ref(),
1381 &certs,
1382 &self.strategy,
1383 )
1384 } else {
1385 let mut verified = vec![false; delivers.len()];
1386
1387 let mut by_epoch: BTreeMap<Epoch, Vec<usize>> = BTreeMap::new();
1389 for (i, item) in delivers.iter().enumerate() {
1390 let epoch = match item {
1391 PendingVerification::Notarized { notarization, .. } => notarization.epoch(),
1392 PendingVerification::Finalized { finalization, .. } => finalization.epoch(),
1393 };
1394 by_epoch.entry(epoch).or_default().push(i);
1395 }
1396
1397 for (epoch, indices) in &by_epoch {
1399 let Some(scheme) = self.provider.scoped(*epoch) else {
1400 continue;
1401 };
1402 let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect();
1403 let results = verify_certificates(
1404 self.context.as_mut(),
1405 scheme.as_ref(),
1406 &group,
1407 &self.strategy,
1408 );
1409 for (j, &idx) in indices.iter().enumerate() {
1410 verified[idx] = results[j];
1411 }
1412 }
1413 verified
1414 };
1415
1416 let mut wrote = false;
1418 for (index, item) in delivers.drain(..).enumerate() {
1419 if !verified[index] {
1420 match item {
1421 PendingVerification::Finalized { response, .. }
1422 | PendingVerification::Notarized { response, .. } => {
1423 response.send_lossy(false);
1424 }
1425 }
1426 continue;
1427 }
1428 match item {
1429 PendingVerification::Finalized {
1430 finalization,
1431 block,
1432 response,
1433 } => {
1434 response.send_lossy(true);
1436 let round = finalization.round();
1437 let height = block.height();
1438 let digest = block.digest();
1439 debug!(?round, %height, "received finalization");
1440
1441 if self
1444 .apply_floor_anchor(&block, buffer, application, resolver)
1445 .await
1446 {
1447 continue;
1448 }
1449
1450 self.update_processed_round_floor(height, round, resolver)
1451 .await;
1452
1453 wrote |= self
1454 .store_finalization(height, digest, block, Some(finalization), application)
1455 .await;
1456 }
1457 PendingVerification::Notarized {
1458 notarization,
1459 block,
1460 response,
1461 } => {
1462 response.send_lossy(true);
1464 let round = notarization.round();
1465 let commitment = notarization.proposal.payload;
1466 let digest = V::commitment_to_inner(commitment);
1467 debug!(?round, ?digest, "received notarization");
1468
1469 let height = block.height();
1471 self.cache_block(round, digest, block.clone()).await;
1472 self.cache
1473 .put_notarization(round, digest, notarization)
1474 .await;
1475
1476 if self
1479 .apply_floor_anchor(&block, buffer, application, resolver)
1480 .await
1481 {
1482 continue;
1483 }
1484
1485 if let Some(finalization) = self.cache.get_finalization_for(digest).await {
1490 self.update_processed_round_floor(height, finalization.round(), resolver)
1491 .await;
1492
1493 wrote |= self
1496 .store_finalization(
1497 height,
1498 digest,
1499 block.clone(),
1500 Some(finalization),
1501 application,
1502 )
1503 .await;
1504 }
1505 }
1506 }
1507 }
1508
1509 wrote
1510 }
1511
1512 fn get_scheme_certificate_verifier(&self, epoch: Epoch) -> Option<Arc<P::Scheme>> {
1517 self.provider.all().or_else(|| self.provider.scoped(epoch))
1518 }
1519
1520 async fn try_dispatch_blocks(
1557 &mut self,
1558 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1559 ) {
1560 if self.floor.blocks_progress() {
1562 return;
1563 }
1564
1565 while self.pending_acks.has_capacity() {
1566 let next_height = self
1567 .pending_acks
1568 .next_dispatch_height(self.stream.next_height());
1569 let Some(block) = self.get_finalized_block(next_height).await else {
1570 return;
1571 };
1572 assert_eq!(
1573 block.height(),
1574 next_height,
1575 "finalized block height mismatch"
1576 );
1577
1578 let (height, commitment) = (block.height(), V::commitment(&block));
1579 let (ack, ack_waiter) = A::handle();
1580 application.report(Update::Block(V::into_inner(block), ack));
1581 self.pending_acks.enqueue(PendingAck {
1582 height,
1583 commitment,
1584 receiver: ack_waiter,
1585 });
1586 }
1587 }
1588
1589 async fn cache_verified(
1593 &mut self,
1594 round: Round,
1595 digest: <V::Block as Digestible>::Digest,
1596 block: V::Block,
1597 ) {
1598 self.block_subscriptions.notify(&block);
1599 self.cache.put_verified(round, digest, block.into()).await;
1600 }
1601
1602 fn take_proposed(&mut self, round: Round, commitment: V::Commitment) -> Option<V::Block> {
1605 let (cached_round, cached_commitment, _) = self.last_proposed_block.as_ref()?;
1606 if *cached_round != round || *cached_commitment != commitment {
1607 return None;
1608 }
1609 self.last_proposed_block.take().map(|(_, _, block)| block)
1610 }
1611
1612 async fn cache_block(
1614 &mut self,
1615 round: Round,
1616 digest: <V::Block as Digestible>::Digest,
1617 block: V::Block,
1618 ) {
1619 self.block_subscriptions.notify(&block);
1620 self.cache.put_block(round, digest, block.into()).await;
1621 }
1622
1623 async fn sync_finalized(&mut self) {
1632 if let Err(e) = try_join!(
1633 async {
1634 self.finalized_blocks.sync().await.map_err(Box::new)?;
1635 Ok::<_, BoxedError>(())
1636 },
1637 async {
1638 self.finalizations_by_height
1639 .sync()
1640 .await
1641 .map_err(Box::new)?;
1642 Ok::<_, BoxedError>(())
1643 },
1644 ) {
1645 panic!("failed to sync finalization archives: {e}");
1646 }
1647 }
1648
1649 async fn get_finalized_block(&self, height: Height) -> Option<V::Block> {
1653 match self
1654 .finalized_blocks
1655 .get(ArchiveID::Index(height.get()))
1656 .await
1657 {
1658 Ok(stored) => stored.map(|stored| stored.into()),
1659 Err(e) => panic!("failed to get block: {e}"),
1660 }
1661 }
1662
1663 async fn get_finalization_by_height(
1665 &self,
1666 height: Height,
1667 ) -> Option<Finalization<P::Scheme, V::Commitment>> {
1668 match self
1669 .finalizations_by_height
1670 .get(ArchiveID::Index(height.get()))
1671 .await
1672 {
1673 Ok(finalization) => finalization,
1674 Err(e) => panic!("failed to get finalization: {e}"),
1675 }
1676 }
1677
1678 async fn get_info_by_height(
1681 &self,
1682 height: Height,
1683 ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
1684 if let Some(finalization) = self.get_finalization_by_height(height).await {
1685 return Some((
1686 height,
1687 V::commitment_to_inner(finalization.proposal.payload),
1688 ));
1689 }
1690
1691 self.get_finalized_block(height)
1692 .await
1693 .map(|block| (block.height(), block.digest()))
1694 }
1695
1696 async fn store_finalization(
1709 &mut self,
1710 height: Height,
1711 digest: <V::Block as Digestible>::Digest,
1712 block: V::Block,
1713 finalization: Option<Finalization<P::Scheme, V::Commitment>>,
1714 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1715 ) -> bool {
1716 if height <= self.floor.processed_height() {
1720 debug!(
1721 %height,
1722 floor = %self.floor.processed_height(),
1723 ?digest,
1724 "dropping finalization at or below processed height floor"
1725 );
1726 return false;
1727 }
1728 self.block_subscriptions.notify(&block);
1729
1730 let stored: V::StoredBlock = block.into();
1732 let round = finalization.as_ref().map(|f| f.round());
1733
1734 if let Err(e) = try_join!(
1736 async {
1738 self.finalized_blocks.put(stored).await.map_err(Box::new)?;
1739 Ok::<_, BoxedError>(())
1740 },
1741 async {
1743 if let Some(finalization) = finalization {
1744 self.finalizations_by_height
1745 .put(height, digest, finalization)
1746 .await
1747 .map_err(Box::new)?;
1748 }
1749 Ok::<_, BoxedError>(())
1750 }
1751 ) {
1752 panic!("failed to finalize: {e}");
1753 }
1754
1755 if let Some(round) = round.filter(|_| height > self.tip) {
1757 application.report(Update::Tip(round, height, digest));
1758 self.tip = height;
1759 let _ = self.finalized_height.try_set(height.get());
1760 }
1761
1762 true
1763 }
1764
1765 async fn get_latest(&mut self) -> Option<(Height, <V::Block as Digestible>::Digest, Round)> {
1778 let height = self.finalizations_by_height.last_index()?;
1779 let finalization = self
1780 .get_finalization_by_height(height)
1781 .await
1782 .expect("finalization missing");
1783 Some((
1784 height,
1785 V::commitment_to_inner(finalization.proposal.payload),
1786 finalization.round(),
1787 ))
1788 }
1789
1790 async fn find_block_in_storage(
1794 &self,
1795 digest: <V::Block as Digestible>::Digest,
1796 ) -> Option<V::Block> {
1797 if let Some(block) = self.cache.find_block(digest).await {
1799 return Some(block.into());
1800 }
1801 match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1803 Ok(stored) => stored.map(|stored| stored.into()),
1804 Err(e) => panic!("failed to get block: {e}"),
1805 }
1806 }
1807
1808 async fn find_block_in_storage_matching(
1811 &self,
1812 digest: <V::Block as Digestible>::Digest,
1813 mut predicate: impl FnMut(&V::Block) -> bool,
1814 ) -> Option<V::Block> {
1815 if let Some(block) = self
1816 .cache
1817 .find_block_matching(digest, |stored| {
1818 let block = stored.clone().into();
1819 predicate(&block)
1820 })
1821 .await
1822 {
1823 return Some(block.into());
1824 }
1825
1826 match self.finalized_blocks.get(ArchiveID::Key(&digest)).await {
1827 Ok(Some(stored)) => {
1828 let block = stored.into();
1829 predicate(&block).then_some(block)
1830 }
1831 Ok(None) => None,
1832 Err(e) => panic!("failed to get block: {e}"),
1833 }
1834 }
1835
1836 async fn find_block_by_digest<Buf: Buffer<V>>(
1841 &self,
1842 buffer: &Buf,
1843 digest: <V::Block as Digestible>::Digest,
1844 ) -> Option<V::Block> {
1845 if let Some(block) = buffer.find_by_digest(digest).await {
1846 return Some(block);
1847 }
1848 self.find_block_in_storage(digest).await
1849 }
1850
1851 async fn find_block_by_commitment<Buf: Buffer<V>>(
1856 &self,
1857 buffer: &Buf,
1858 commitment: V::Commitment,
1859 ) -> Option<V::Block> {
1860 if let Some(block) = buffer.find_by_commitment(commitment).await {
1861 return Some(block);
1862 }
1863 self.find_block_in_storage_matching(V::commitment_to_inner(commitment), |block| {
1864 V::commitment(block) == commitment
1865 })
1866 .await
1867 }
1868
1869 async fn try_repair_gaps<Buf: Buffer<V>>(
1881 &mut self,
1882 buffer: &mut Buf,
1883 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
1884 application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
1885 ) -> bool {
1886 if self.floor.blocks_progress() {
1889 return false;
1890 }
1891
1892 let mut wrote = false;
1893 let start = self.floor.processed_height().next();
1894
1895 if let Some(last_finalized) = self.finalizations_by_height.last_index() {
1898 let have_block = self
1899 .finalized_blocks
1900 .last_index()
1901 .is_some_and(|last| last >= last_finalized);
1902 if last_finalized > self.floor.processed_height() && !have_block {
1903 let finalization = self
1905 .get_finalization_by_height(last_finalized)
1906 .await
1907 .expect("finalization missing");
1908 let commitment = finalization.proposal.payload;
1909 if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1910 let digest = block.digest();
1912 wrote |= self
1913 .store_finalization(
1914 last_finalized,
1915 digest,
1916 block,
1917 Some(finalization),
1918 application,
1919 )
1920 .await;
1921 } else {
1922 self.floor
1924 .fetch_if_permitted(
1925 resolver,
1926 Request::finalized_block_by_height(commitment, last_finalized),
1927 )
1928 .ignore();
1929 }
1930 }
1931 }
1932
1933 'cache_repair: loop {
1935 let (gap_start, Some(gap_end)) = self.finalized_blocks.next_gap(start) else {
1936 return wrote;
1938 };
1939
1940 let Some(mut cursor) = self.get_finalized_block(gap_end).await else {
1943 panic!("gapped block missing that should exist: {gap_end}");
1944 };
1945
1946 let gap_start = gap_start.map(Height::next).unwrap_or(start);
1950
1951 while cursor.height() > gap_start {
1953 let parent_digest = cursor.parent();
1954 let parent_commitment = V::parent_commitment(&cursor);
1955 if let Some(block) = self
1956 .find_block_by_commitment(buffer, parent_commitment)
1957 .await
1958 {
1959 let finalization = self.cache.get_finalization_for(parent_digest).await;
1960 wrote |= self
1961 .store_finalization(
1962 block.height(),
1963 parent_digest,
1964 block.clone(),
1965 finalization,
1966 application,
1967 )
1968 .await;
1969 debug!(height = %block.height(), "repaired block");
1970 cursor = block;
1971 } else {
1972 let parent_height = cursor
1978 .height()
1979 .previous()
1980 .expect("cursor above gap start has a parent");
1981 self.floor
1982 .fetch_if_permitted(
1983 resolver,
1984 Request::finalized_block_by_height(parent_commitment, parent_height),
1985 )
1986 .ignore();
1987 break 'cache_repair;
1988 }
1989 }
1990 }
1991
1992 let missing_items = self
1998 .finalized_blocks
1999 .missing_items(start, self.max_repair.get());
2000 let requests: Vec<_> = missing_items.into_iter().map(Request::finalized).collect();
2001 if !requests.is_empty() {
2002 self.floor
2003 .fetch_all_if_permitted(resolver, requests)
2004 .ignore();
2005 }
2006 wrote
2007 }
2008
2009 fn update_processed_height(
2012 &mut self,
2013 height: Height,
2014 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
2015 ) {
2016 self.stream.acknowledge(height);
2017 self.floor.set_processed_height(height);
2018 let _ = self
2019 .processed_height
2020 .try_set(self.floor.processed_height().get());
2021
2022 resolver.retain(handler::above_height_floor::<V::Commitment>(height));
2024 }
2025
2026 async fn latest_processed_round(finalizations_by_height: &FC, height: Option<Height>) -> Round {
2028 let Some(height) = height else {
2029 return Round::zero();
2030 };
2031 let Some(finalization_height) = finalizations_by_height
2032 .ranges_from(Height::zero())
2033 .filter_map(|(start, end)| (start <= height).then_some(end.min(height)))
2034 .max()
2035 else {
2036 return Round::zero();
2037 };
2038
2039 match finalizations_by_height
2040 .get(ArchiveID::Index(finalization_height.get()))
2041 .await
2042 {
2043 Ok(Some(finalization)) => finalization.round(),
2044 Ok(None) => panic!("processed finalization missing from stored range"),
2045 Err(err) => panic!("failed to get processed finalization: {err}"),
2046 }
2047 }
2048
2049 async fn update_processed_round(
2051 &mut self,
2052 height: Height,
2053 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
2054 ) {
2055 let Some(finalization) = self.get_finalization_by_height(height).await else {
2056 return;
2057 };
2058 self.update_processed_round_floor(height, finalization.round(), resolver)
2059 .await;
2060 }
2061
2062 async fn update_processed_round_floor(
2064 &mut self,
2065 height: Height,
2066 round: Round,
2067 resolver: &mut impl Resolver<Key = ResolverRequestFor<V>, Subscriber = Annotation>,
2068 ) {
2069 if height > self.floor.processed_height() || round <= self.floor.processed_round() {
2070 return;
2071 }
2072
2073 let previous = self.floor.processed_round();
2074 self.floor.set_processed_round(round);
2075
2076 let prune_round = Round::new(
2079 previous.epoch(),
2080 previous.view().saturating_sub(self.view_retention_timeout),
2081 );
2082 self.cache.prune_by_view(prune_round).await;
2083
2084 resolver.retain(handler::above_round_floor::<V::Commitment>(
2086 self.floor.processed_round(),
2087 ));
2088 }
2089
2090 async fn prune_finalized_archives(&mut self, height: Height) -> Result<(), BoxedError> {
2092 try_join!(
2094 async {
2095 self.finalized_blocks
2096 .prune(height)
2097 .await
2098 .map_err(Box::new)?;
2099 Ok::<_, BoxedError>(())
2100 },
2101 async {
2102 self.finalizations_by_height
2103 .prune(height)
2104 .await
2105 .map_err(Box::new)?;
2106 Ok::<_, BoxedError>(())
2107 }
2108 )?;
2109 Ok(())
2110 }
2111
2112 async fn prune_after_floor(&mut self, height: Height) -> Result<(), BoxedError> {
2114 let cache = &mut self.cache;
2115 let finalized_blocks = &mut self.finalized_blocks;
2116 let finalizations_by_height = &mut self.finalizations_by_height;
2117 try_join!(
2118 async {
2119 cache.prune_by_height(height).await;
2120 Ok::<_, BoxedError>(())
2121 },
2122 async {
2123 finalized_blocks.prune(height).await.map_err(Box::new)?;
2124 Ok::<_, BoxedError>(())
2125 },
2126 async {
2127 finalizations_by_height
2128 .prune(height)
2129 .await
2130 .map_err(Box::new)?;
2131 Ok::<_, BoxedError>(())
2132 }
2133 )?;
2134 Ok(())
2135 }
2136}