1use super::Variant;
2use crate::{
3 marshal::{
4 ancestry::{AncestorStream, Ancestry, BlockProvider},
5 Identifier,
6 },
7 simplex::types::{Activity, Finalization, Notarization},
8 types::{Height, Round},
9 Reporter,
10};
11use commonware_actor::{
12 mailbox::{Overflow, Policy, Sender},
13 Feedback,
14};
15use commonware_cryptography::{certificate::Scheme, Digestible};
16use commonware_p2p::Recipients;
17use commonware_runtime::{telemetry::metrics::histogram::Timed, Clock};
18use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
19use std::{
20 collections::{btree_map::Entry, BTreeMap, VecDeque},
21 sync::Arc,
22};
23
24pub(crate) enum Message<S: Scheme, V: Variant> {
29 GetInfo {
32 identifier: Identifier<<V::Block as Digestible>::Digest>,
34 response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
36 },
37 GetBlock {
43 identifier: Identifier<<V::Block as Digestible>::Digest>,
45 response: oneshot::Sender<Option<V::Block>>,
47 },
48 GetFinalization {
50 height: Height,
52 response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
54 },
55 GetProcessedHeight {
57 response: oneshot::Sender<Option<Height>>,
59 },
60 HintFinalized {
75 height: Height,
77 targets: NonEmptyVec<S::PublicKey>,
79 },
80 SubscribeByDigest {
82 digest: <V::Block as Digestible>::Digest,
84 fallback: DigestFallback,
86 response: oneshot::Sender<V::Block>,
88 },
89 SubscribeByCommitment {
91 commitment: V::Commitment,
93 fallback: CommitmentFallback,
95 response: oneshot::Sender<V::Block>,
97 },
98 HintNotarized {
103 round: Round,
105 commitment: V::Commitment,
107 },
108 GetVerified {
110 round: Round,
112 response: oneshot::Sender<Option<V::Block>>,
114 },
115 Forward {
117 round: Round,
119 commitment: V::Commitment,
121 recipients: Recipients<S::PublicKey>,
123 },
124 Proposed {
126 round: Round,
128 block: V::Block,
130 ack: Option<oneshot::Sender<()>>,
132 },
133 Verified {
135 round: Round,
137 block: V::Block,
139 ack: Option<oneshot::Sender<()>>,
141 },
142 Certified {
144 round: Round,
146 block: V::Block,
148 ack: Option<oneshot::Sender<()>>,
150 },
151 SetFloor {
160 finalization: Finalization<S, V::Commitment>,
162 },
163 Prune {
168 height: Height,
170 },
171 Notarization {
173 notarization: Notarization<S, V::Commitment>,
175 },
176 Finalization {
178 finalization: Finalization<S, V::Commitment>,
180 },
181}
182
183#[derive(Clone, Copy, Debug, Eq, PartialEq)]
185pub enum DigestFallback {
186 Wait,
188 FetchByRound { round: Round },
193}
194
195impl From<DigestFallback> for CommitmentFallback {
196 fn from(fallback: DigestFallback) -> Self {
197 match fallback {
198 DigestFallback::Wait => Self::Wait,
199 DigestFallback::FetchByRound { round } => Self::FetchByRound { round },
200 }
201 }
202}
203
204#[derive(Clone, Copy, Debug, Eq, PartialEq)]
206pub enum CommitmentFallback {
207 Wait,
211 FetchByRound { round: Round },
223 FetchByCommitment { height: Height },
236}
237
238impl<S: Scheme, V: Variant> Message<S, V> {
239 fn stale(&self, current: Option<Height>) -> bool {
240 match self {
241 Self::GetInfo {
243 identifier: Identifier::Height(height),
244 ..
245 }
246 | Self::GetBlock {
247 identifier: Identifier::Height(height),
248 ..
249 }
250 | Self::GetFinalization { height, .. } => Some(*height) < current,
251 Self::HintFinalized { height, .. } => Some(*height) <= current,
253 Self::Proposed { .. } | Self::Verified { .. } | Self::Certified { .. } => false,
255 Self::GetBlock {
257 identifier: Identifier::Digest(_) | Identifier::Latest,
258 ..
259 }
260 | Self::GetInfo {
261 identifier: Identifier::Digest(_) | Identifier::Latest,
262 ..
263 }
264 | Self::GetProcessedHeight { .. } => false,
265 Self::HintNotarized { .. } => false,
266 Self::SubscribeByDigest { .. }
267 | Self::SubscribeByCommitment { .. }
268 | Self::GetVerified { .. }
269 | Self::Forward { .. }
270 | Self::SetFloor { .. }
271 | Self::Prune { .. }
272 | Self::Notarization { .. }
273 | Self::Finalization { .. } => false,
274 }
275 }
276
277 pub(crate) fn response_closed(&self) -> bool {
278 match self {
279 Self::GetInfo { response, .. } => response.is_closed(),
280 Self::GetBlock { response, .. } | Self::GetVerified { response, .. } => {
281 response.is_closed()
282 }
283 Self::GetFinalization { response, .. } => response.is_closed(),
284 Self::GetProcessedHeight { response } => response.is_closed(),
285 Self::SubscribeByDigest { response, .. }
286 | Self::SubscribeByCommitment { response, .. } => response.is_closed(),
287 Self::HintNotarized { .. } => false,
288 Self::HintFinalized { .. }
289 | Self::Forward { .. }
290 | Self::Proposed { .. }
291 | Self::Verified { .. }
292 | Self::Certified { .. }
293 | Self::SetFloor { .. }
294 | Self::Prune { .. }
295 | Self::Notarization { .. }
296 | Self::Finalization { .. } => false,
297 }
298 }
299}
300
301pub(crate) struct Pending<S: Scheme, V: Variant> {
302 floor: Option<Finalization<S, V::Commitment>>,
303 prune: Option<Height>,
304 hints: BTreeMap<Height, NonEmptyVec<S::PublicKey>>,
305 messages: VecDeque<PendingMessage<S, V>>,
306}
307
308enum PendingMessage<S: Scheme, V: Variant> {
309 Message(Message<S, V>),
310 HintFinalized(Height),
311}
312
313impl<S: Scheme, V: Variant> Default for Pending<S, V> {
314 fn default() -> Self {
315 Self {
316 floor: None,
317 prune: None,
318 hints: BTreeMap::new(),
319 messages: VecDeque::new(),
320 }
321 }
322}
323
324impl<S: Scheme, V: Variant> Pending<S, V> {
325 const fn height(&self) -> Option<Height> {
328 self.prune
329 }
330
331 fn retain(&mut self) {
332 let current = self.height();
333 self.hints.retain(|height, _| Some(*height) > current);
334
335 let hints = &self.hints;
336 self.messages.retain(|message| match message {
337 PendingMessage::Message(message) => {
338 !message.response_closed() && !message.stale(current)
339 }
340 PendingMessage::HintFinalized(height) => hints.contains_key(height),
341 });
342 }
343
344 fn set_floor(&mut self, finalization: Finalization<S, V::Commitment>) {
345 let round = finalization.round();
346 if self
347 .floor
348 .as_ref()
349 .is_some_and(|floor| floor.round() >= round)
350 {
351 return;
352 }
353
354 self.floor = Some(finalization);
355 }
356
357 fn prune(&mut self, height: Height) {
358 let current = self.height();
359 let prune = Some(height);
360 if self.prune >= prune {
361 return;
362 }
363
364 self.prune = self.prune.max(prune);
365 if self.height() > current {
366 self.retain();
367 }
368 }
369
370 fn extend_hint_targets(
371 pending: &mut NonEmptyVec<S::PublicKey>,
372 targets: NonEmptyVec<S::PublicKey>,
373 ) {
374 for target in targets {
375 if !pending.contains(&target) {
376 pending.push(target);
377 }
378 }
379 }
380
381 fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
382 let current = self.height();
384 if current.is_some_and(|current| height <= current) {
385 return;
386 }
387
388 match self.hints.entry(height) {
389 Entry::Vacant(entry) => {
390 entry.insert(targets);
391 self.messages
392 .push_back(PendingMessage::HintFinalized(height));
393 }
394 Entry::Occupied(mut entry) => {
395 Self::extend_hint_targets(entry.get_mut(), targets);
396 }
397 }
398 }
399
400 fn restore_hint(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
401 match self.hints.entry(height) {
402 Entry::Vacant(entry) => {
403 entry.insert(targets);
404 }
405 Entry::Occupied(mut entry) => {
406 Self::extend_hint_targets(entry.get_mut(), targets);
407 }
408 }
409 self.messages
410 .push_front(PendingMessage::HintFinalized(height));
411 }
412
413 fn drain_one<F>(&mut self, message: Message<S, V>, push: &mut F) -> bool
414 where
415 F: FnMut(Message<S, V>) -> Option<Message<S, V>>,
416 {
417 let Some(message) = push(message) else {
419 return true;
420 };
421
422 match message {
424 Message::SetFloor { finalization } => self.set_floor(finalization),
425 Message::Prune { height } => self.prune(height),
426 Message::HintFinalized { height, targets } => self.restore_hint(height, targets),
427 message => self.messages.push_front(PendingMessage::Message(message)),
428 }
429 false
430 }
431}
432
433impl<S: Scheme, V: Variant> Overflow<Message<S, V>> for Pending<S, V> {
434 fn is_empty(&self) -> bool {
435 self.floor.is_none()
436 && self.prune.is_none()
437 && self.hints.is_empty()
438 && self.messages.is_empty()
439 }
440
441 fn drain<F>(&mut self, mut push: F)
442 where
443 F: FnMut(Message<S, V>) -> Option<Message<S, V>>,
444 {
445 if let Some(finalization) = self.floor.take() {
448 if !self.drain_one(Message::SetFloor { finalization }, &mut push) {
449 return;
450 }
451 }
452 if let Some(height) = self.prune.take() {
453 if !self.drain_one(Message::Prune { height }, &mut push) {
454 return;
455 }
456 }
457
458 while let Some(pending) = self.messages.pop_front() {
460 match pending {
461 PendingMessage::Message(message) => {
462 if message.response_closed() {
463 continue;
464 }
465 if !self.drain_one(message, &mut push) {
466 break;
467 }
468 }
469 PendingMessage::HintFinalized(hint_height) => {
470 let Some(targets) = self.hints.remove(&hint_height) else {
471 continue;
472 };
473 let message = Message::HintFinalized {
474 height: hint_height,
475 targets,
476 };
477 if !self.drain_one(message, &mut push) {
478 break;
479 }
480 }
481 }
482 }
483 }
484}
485
486impl<S: Scheme, V: Variant> Policy for Message<S, V> {
487 type Overflow = Pending<S, V>;
488
489 fn handle(overflow: &mut Self::Overflow, message: Self) {
490 if message.response_closed() {
492 return;
493 }
494 match message {
495 Self::HintFinalized { height, targets } => {
497 overflow.hint_finalized(height, targets);
498 }
499 Self::SetFloor { finalization } => {
502 overflow.set_floor(finalization);
503 }
504 Self::Prune { height } => {
505 overflow.prune(height);
506 }
507 message => {
509 if message.stale(overflow.height()) {
510 return;
511 }
512 overflow
513 .messages
514 .push_back(PendingMessage::Message(message));
515 }
516 }
517 }
518}
519
520#[derive(Clone)]
522pub struct Mailbox<S: Scheme, V: Variant> {
523 sender: Sender<Message<S, V>>,
524}
525
526impl<S: Scheme, V: Variant> Mailbox<S, V> {
527 pub(crate) const fn new(sender: Sender<Message<S, V>>) -> Self {
529 Self { sender }
530 }
531
532 pub(crate) fn ancestor_stream<I, C>(
542 &self,
543 clock: Arc<C>,
544 initial: I,
545 fetch_duration: Timed,
546 ) -> impl Ancestry<V::ApplicationBlock> + use<S, V, I, C>
547 where
548 Self: BlockProvider<Block = V::ApplicationBlock>,
549 I: IntoIterator<Item = V::Block>,
550 C: Clock,
551 {
552 AncestorStream::new(
553 clock,
554 self.clone(),
555 initial.into_iter().map(V::into_inner),
556 fetch_duration,
557 )
558 }
559
560 pub async fn get_info(
562 &self,
563 identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
564 ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
565 let identifier = identifier.into();
566 let (response, receiver) = oneshot::channel();
567 let _ = self.sender.enqueue(Message::GetInfo {
568 identifier,
569 response,
570 });
571 receiver.await.ok().flatten()
572 }
573
574 pub async fn get_block(
577 &self,
578 identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
579 ) -> Option<V::Block> {
580 let identifier = identifier.into();
581 let (response, receiver) = oneshot::channel();
582 let _ = self.sender.enqueue(Message::GetBlock {
583 identifier,
584 response,
585 });
586 receiver.await.ok().flatten()
587 }
588
589 pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
592 let (response, receiver) = oneshot::channel();
593 let _ = self
594 .sender
595 .enqueue(Message::GetFinalization { height, response });
596 receiver.await.ok().flatten()
597 }
598
599 pub async fn get_processed_height(&self) -> Option<Height> {
601 let (response, receiver) = oneshot::channel();
602 let _ = self
603 .sender
604 .enqueue(Message::GetProcessedHeight { response });
605 receiver.await.ok().flatten()
606 }
607
608 pub fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
628 let _ = self
629 .sender
630 .enqueue(Message::HintFinalized { height, targets });
631 }
632
633 pub fn subscribe_by_digest(
646 &self,
647 digest: <V::Block as Digestible>::Digest,
648 fallback: DigestFallback,
649 ) -> oneshot::Receiver<V::Block> {
650 let (tx, rx) = oneshot::channel();
651 let _ = self.sender.enqueue(Message::SubscribeByDigest {
652 digest,
653 fallback,
654 response: tx,
655 });
656 rx
657 }
658
659 pub fn subscribe_by_commitment(
671 &self,
672 commitment: V::Commitment,
673 fallback: CommitmentFallback,
674 ) -> oneshot::Receiver<V::Block> {
675 let (tx, rx) = oneshot::channel();
676 let _ = self.sender.enqueue(Message::SubscribeByCommitment {
677 fallback,
678 commitment,
679 response: tx,
680 });
681 rx
682 }
683
684 pub fn hint_notarized(&self, round: Round, commitment: V::Commitment) {
693 let _ = self
694 .sender
695 .enqueue(Message::HintNotarized { round, commitment });
696 }
697
698 pub async fn ancestry<C>(
706 &self,
707 clock: Arc<C>,
708 (fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
709 fetch_duration: Timed,
710 ) -> Option<impl Ancestry<V::ApplicationBlock> + use<S, V, C>>
711 where
712 Self: BlockProvider<Block = V::ApplicationBlock>,
713 C: Clock,
714 {
715 let receiver = self.subscribe_by_digest(start_digest, fallback);
716 receiver
717 .await
718 .ok()
719 .map(|block| self.ancestor_stream(clock, [block], fetch_duration))
720 }
721
722 pub async fn get_verified(&self, round: Round) -> Option<V::Block> {
724 let (response, receiver) = oneshot::channel();
725 let _ = self
726 .sender
727 .enqueue(Message::GetVerified { round, response });
728 receiver.await.ok().flatten()
729 }
730
731 #[must_use = "callers must consider block durability before proceeding"]
735 pub async fn proposed(&self, round: Round, block: V::Block) -> bool {
736 let (ack, receiver) = oneshot::channel();
737 let _ = self.sender.enqueue(Message::Proposed {
738 round,
739 block,
740 ack: Some(ack),
741 });
742 receiver.await.is_ok()
743 }
744
745 #[must_use = "callers must consider block durability before proceeding"]
749 pub async fn verified(&self, round: Round, block: V::Block) -> bool {
750 let (ack, receiver) = oneshot::channel();
751 let _ = self.sender.enqueue(Message::Verified {
752 round,
753 block,
754 ack: Some(ack),
755 });
756 receiver.await.is_ok()
757 }
758
759 #[must_use = "callers must consider block durability before proceeding"]
763 pub async fn certified(&self, round: Round, block: V::Block) -> bool {
764 let (ack, receiver) = oneshot::channel();
765 let _ = self.sender.enqueue(Message::Certified {
766 round,
767 block,
768 ack: Some(ack),
769 });
770 receiver.await.is_ok()
771 }
772
773 pub fn set_floor(&self, finalization: Finalization<S, V::Commitment>) {
783 let _ = self.sender.enqueue(Message::SetFloor { finalization });
784 }
785
786 pub fn prune(&self, height: Height) {
791 let _ = self.sender.enqueue(Message::Prune { height });
792 }
793
794 pub fn forward(
796 &self,
797 round: Round,
798 commitment: V::Commitment,
799 recipients: Recipients<S::PublicKey>,
800 ) -> Feedback {
801 self.sender.enqueue(Message::Forward {
802 round,
803 commitment,
804 recipients,
805 })
806 }
807}
808
809impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
810 type Activity = Activity<S, V::Commitment>;
811
812 fn report(&mut self, activity: Self::Activity) -> Feedback {
813 let message = match activity {
814 Activity::Notarization(notarization) => Message::Notarization { notarization },
815 Activity::Finalization(finalization) => Message::Finalization { finalization },
816 _ => return Feedback::Ok,
817 };
818 self.sender.enqueue(message)
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::*;
825 use crate::{
826 marshal::{mocks::harness, standard::Standard},
827 simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
828 types::{Epoch, View},
829 Heightable,
830 };
831 use commonware_cryptography::{
832 certificate::mocks::Fixture, ed25519::PrivateKey, Digest as _, Signer as _,
833 };
834 use commonware_utils::{channel::oneshot::error::TryRecvError, test_rng_seeded};
835
836 type TestMessage = Message<harness::S, Standard<harness::B>>;
837 type TestPending = Pending<harness::S, Standard<harness::B>>;
838
839 fn public_key(seed: u64) -> harness::K {
840 PrivateKey::from_seed(seed).public_key()
841 }
842
843 fn round(height: u64) -> Round {
844 Round::new(Epoch::zero(), View::new(height))
845 }
846
847 fn block(height: u64) -> harness::B {
848 harness::make_raw_block(harness::D::EMPTY, Height::new(height), height)
849 }
850
851 fn commitment(height: u64) -> harness::D {
852 <Standard<harness::B> as Variant>::commitment(&block(height))
853 }
854
855 fn finalization(height: u64) -> Finalization<harness::S, harness::D> {
856 let mut rng = test_rng_seeded(height);
857 let Fixture { schemes, .. } = bls12381_threshold_vrf::fixture::<harness::V, _>(
858 &mut rng,
859 harness::NAMESPACE,
860 harness::NUM_VALIDATORS,
861 );
862 let proposal = Proposal::new(round(height), View::zero(), commitment(height));
863 <harness::StandardHarness as harness::TestHarness>::make_finalization(
864 proposal,
865 &schemes,
866 harness::QUORUM,
867 )
868 }
869
870 fn get_info(height: u64) -> (TestMessage, oneshot::Receiver<Option<(Height, harness::D)>>) {
871 let (response, receiver) = oneshot::channel();
872 (
873 TestMessage::GetInfo {
874 identifier: Identifier::Height(Height::new(height)),
875 response,
876 },
877 receiver,
878 )
879 }
880
881 fn proposed(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
882 let (ack, receiver) = oneshot::channel();
883 (
884 TestMessage::Proposed {
885 round: round(height),
886 block: block(height),
887 ack: Some(ack),
888 },
889 receiver,
890 )
891 }
892
893 fn verified(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
894 let (ack, receiver) = oneshot::channel();
895 (
896 TestMessage::Verified {
897 round: round(height),
898 block: block(height),
899 ack: Some(ack),
900 },
901 receiver,
902 )
903 }
904
905 fn certified(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
906 let (ack, receiver) = oneshot::channel();
907 (
908 TestMessage::Certified {
909 round: round(height),
910 block: block(height),
911 ack: Some(ack),
912 },
913 receiver,
914 )
915 }
916
917 fn get_block(height: u64) -> (TestMessage, oneshot::Receiver<Option<harness::B>>) {
918 let (response, receiver) = oneshot::channel();
919 (
920 TestMessage::GetBlock {
921 identifier: Identifier::Height(Height::new(height)),
922 response,
923 },
924 receiver,
925 )
926 }
927
928 fn get_finalization(
929 height: u64,
930 ) -> (
931 TestMessage,
932 oneshot::Receiver<Option<Finalization<harness::S, harness::D>>>,
933 ) {
934 let (response, receiver) = oneshot::channel();
935 (
936 TestMessage::GetFinalization {
937 height: Height::new(height),
938 response,
939 },
940 receiver,
941 )
942 }
943
944 fn subscribe_by_digest(height: u64) -> (TestMessage, oneshot::Receiver<harness::B>) {
945 let (response, receiver) = oneshot::channel();
946 (
947 TestMessage::SubscribeByDigest {
948 digest: block(height).digest(),
949 fallback: DigestFallback::FetchByRound {
950 round: round(height),
951 },
952 response,
953 },
954 receiver,
955 )
956 }
957
958 fn subscribe_by_commitment_message(
959 height: u64,
960 fallback: CommitmentFallback,
961 ) -> (TestMessage, oneshot::Receiver<harness::B>) {
962 let (response, receiver) = oneshot::channel();
963 (
964 TestMessage::SubscribeByCommitment {
965 commitment: commitment(height),
966 fallback,
967 response,
968 },
969 receiver,
970 )
971 }
972
973 fn hint_finalized(height: u64, target: harness::K) -> TestMessage {
974 TestMessage::HintFinalized {
975 height: Height::new(height),
976 targets: NonEmptyVec::new(target),
977 }
978 }
979
980 fn set_floor(height: u64) -> TestMessage {
981 TestMessage::SetFloor {
982 finalization: finalization(height),
983 }
984 }
985
986 fn prune(height: u64) -> TestMessage {
987 TestMessage::Prune {
988 height: Height::new(height),
989 }
990 }
991
992 fn pending() -> TestPending {
993 TestPending::default()
994 }
995
996 fn drain(overflow: &mut TestPending) -> VecDeque<TestMessage> {
997 let mut drained = VecDeque::new();
998 overflow.drain(|message| {
999 drained.push_back(message);
1000 None
1001 });
1002 drained
1003 }
1004
1005 fn has_get_info(overflow: &TestPending, height: u64) -> bool {
1006 overflow.messages.iter().any(|message| {
1007 matches!(
1008 message,
1009 PendingMessage::Message(TestMessage::GetInfo {
1010 identifier: Identifier::Height(found),
1011 response,
1012 ..
1013 }) if *found == Height::new(height) && !response.is_closed()
1014 )
1015 })
1016 }
1017
1018 fn has_get_block(overflow: &TestPending, height: u64) -> bool {
1019 overflow.messages.iter().any(|message| {
1020 matches!(
1021 message,
1022 PendingMessage::Message(TestMessage::GetBlock {
1023 identifier: Identifier::Height(found),
1024 response,
1025 ..
1026 }) if *found == Height::new(height) && !response.is_closed()
1027 )
1028 })
1029 }
1030
1031 fn has_get_finalization(overflow: &TestPending, height: u64) -> bool {
1032 overflow.messages.iter().any(|message| {
1033 matches!(
1034 message,
1035 PendingMessage::Message(TestMessage::GetFinalization {
1036 height: found,
1037 response,
1038 }) if *found == Height::new(height) && !response.is_closed()
1039 )
1040 })
1041 }
1042
1043 fn hint_targets(overflow: &TestPending, height: u64) -> Option<&NonEmptyVec<harness::K>> {
1044 overflow.hints.get(&Height::new(height))
1045 }
1046
1047 fn has_block_message(overflow: &TestPending, height: u64) -> bool {
1048 overflow.messages.iter().any(|message| {
1049 matches!(
1050 message,
1051 PendingMessage::Message(
1052 TestMessage::Proposed { block, .. }
1053 | TestMessage::Verified { block, .. }
1054 | TestMessage::Certified { block, .. }
1055 )
1056 if block.height() == Height::new(height)
1057 )
1058 })
1059 }
1060
1061 fn has_prune(overflow: &TestPending, height: u64) -> bool {
1062 overflow.prune == Some(Height::new(height))
1063 }
1064
1065 fn has_subscription(overflow: &TestPending, height: u64) -> bool {
1066 let expected_digest = block(height).digest();
1067 let expected_commitment = commitment(height);
1068 overflow.messages.iter().any(|message| {
1069 matches!(
1070 message,
1071 PendingMessage::Message(TestMessage::SubscribeByDigest { digest, response, .. })
1072 if *digest == expected_digest && !response.is_closed()
1073 ) || matches!(
1074 message,
1075 PendingMessage::Message(TestMessage::SubscribeByCommitment {
1076 commitment,
1077 response,
1078 ..
1079 }) if *commitment == expected_commitment && !response.is_closed()
1080 )
1081 })
1082 }
1083
1084 #[test]
1085 fn policy_coalesces_hint_targets() {
1086 let mut overflow = pending();
1087 let first = public_key(1);
1088 let second = public_key(2);
1089
1090 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
1091 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
1092 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, second.clone()));
1093
1094 assert_eq!(overflow.messages.len(), 1);
1095 let targets = hint_targets(&overflow, 10).expect("expected hint");
1096 assert_eq!(targets.len().get(), 2);
1097 assert!(targets.contains(&first));
1098 assert!(targets.contains(&second));
1099 }
1100
1101 #[test]
1102 fn policy_preserves_commitment_subscription_fallbacks() {
1103 let mut overflow = pending();
1104
1105 let (wait, _wait_rx) = subscribe_by_commitment_message(1, CommitmentFallback::Wait);
1106 let (by_round, _by_round_rx) = subscribe_by_commitment_message(
1107 2,
1108 CommitmentFallback::FetchByRound { round: round(2) },
1109 );
1110 let (by_commitment, _by_commitment_rx) = subscribe_by_commitment_message(
1111 3,
1112 CommitmentFallback::FetchByCommitment {
1113 height: Height::new(3),
1114 },
1115 );
1116
1117 <TestMessage as Policy>::handle(&mut overflow, wait);
1118 <TestMessage as Policy>::handle(&mut overflow, by_round);
1119 <TestMessage as Policy>::handle(&mut overflow, by_commitment);
1120
1121 let drained = drain(&mut overflow);
1122 assert_eq!(drained.len(), 3);
1123 assert!(matches!(
1124 &drained[0],
1125 TestMessage::SubscribeByCommitment {
1126 fallback: CommitmentFallback::Wait,
1127 ..
1128 }
1129 ));
1130 assert!(matches!(
1131 &drained[1],
1132 TestMessage::SubscribeByCommitment {
1133 fallback: CommitmentFallback::FetchByRound { round: found },
1134 ..
1135 } if *found == round(2)
1136 ));
1137 assert!(matches!(
1138 &drained[2],
1139 TestMessage::SubscribeByCommitment {
1140 fallback: CommitmentFallback::FetchByCommitment { height },
1141 ..
1142 } if *height == Height::new(3)
1143 ));
1144 }
1145
1146 #[test]
1147 fn policy_handles_closed_subscriptions() {
1148 let mut overflow = pending();
1149
1150 let (pending_closed, pending_closed_rx) = subscribe_by_digest(1);
1151 drop(pending_closed_rx);
1152 overflow
1153 .messages
1154 .push_back(PendingMessage::Message(pending_closed));
1155
1156 let (pending_open, mut pending_open_rx) = subscribe_by_commitment_message(
1157 2,
1158 CommitmentFallback::FetchByRound { round: round(2) },
1159 );
1160 overflow
1161 .messages
1162 .push_back(PendingMessage::Message(pending_open));
1163
1164 let (current_closed, current_closed_rx) = subscribe_by_digest(3);
1165 drop(current_closed_rx);
1166 <TestMessage as Policy>::handle(&mut overflow, current_closed);
1167
1168 assert!(!has_subscription(&overflow, 1));
1169 assert!(has_subscription(&overflow, 2));
1170 assert!(!has_subscription(&overflow, 3));
1171 assert!(matches!(
1172 pending_open_rx.try_recv(),
1173 Err(TryRecvError::Empty)
1174 ));
1175 }
1176
1177 #[test]
1178 fn policy_handles_closed_responses() {
1179 let mut overflow = pending();
1180
1181 let (pending_closed, pending_closed_rx) = get_block(1);
1182 drop(pending_closed_rx);
1183 overflow
1184 .messages
1185 .push_back(PendingMessage::Message(pending_closed));
1186
1187 let (pending_open, mut pending_open_rx) = get_info(2);
1188 overflow
1189 .messages
1190 .push_back(PendingMessage::Message(pending_open));
1191
1192 let (current_closed, current_closed_rx) = get_finalization(3);
1193 drop(current_closed_rx);
1194 <TestMessage as Policy>::handle(&mut overflow, current_closed);
1195
1196 assert!(!has_get_block(&overflow, 1));
1197 assert!(has_get_info(&overflow, 2));
1198 assert!(!has_get_finalization(&overflow, 3));
1199 assert!(matches!(
1200 pending_open_rx.try_recv(),
1201 Err(TryRecvError::Empty)
1202 ));
1203 }
1204
1205 #[test]
1206 fn policy_drain_stops_after_returned_response_closes() {
1207 let mut overflow = pending();
1208 let (first, first_rx) = get_block(1);
1209 let (second, mut second_rx) = get_info(2);
1210 overflow.messages.push_back(PendingMessage::Message(first));
1211 overflow.messages.push_back(PendingMessage::Message(second));
1212
1213 let mut first_rx = Some(first_rx);
1214 let mut attempts = 0;
1215 overflow.drain(|message| {
1216 attempts += 1;
1217 drop(first_rx.take());
1218 Some(message)
1219 });
1220 assert_eq!(attempts, 1);
1221
1222 let drained = drain(&mut overflow);
1223 assert_eq!(drained.len(), 1);
1224 assert!(matches!(
1225 &drained[0],
1226 TestMessage::GetInfo {
1227 identifier: Identifier::Height(height),
1228 response,
1229 } if *height == Height::new(2) && !response.is_closed()
1230 ));
1231 assert!(matches!(second_rx.try_recv(), Err(TryRecvError::Empty)));
1232 }
1233
1234 #[test]
1235 fn policy_keeps_coalesced_hints_in_fifo_position() {
1236 let mut overflow = pending();
1237 let first = public_key(1);
1238 let second = public_key(2);
1239 let (get_block_9, _get_block_9_rx) = get_block(9);
1240 let (get_info_11, _get_info_11_rx) = get_info(11);
1241
1242 <TestMessage as Policy>::handle(&mut overflow, get_block_9);
1243 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
1244 <TestMessage as Policy>::handle(&mut overflow, get_info_11);
1245 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, second.clone()));
1246
1247 let drained = drain(&mut overflow);
1248 assert_eq!(drained.len(), 3);
1249 assert!(matches!(
1250 &drained[0],
1251 TestMessage::GetBlock {
1252 identifier: Identifier::Height(height),
1253 ..
1254 } if *height == Height::new(9)
1255 ));
1256 assert!(matches!(
1257 &drained[2],
1258 TestMessage::GetInfo {
1259 identifier: Identifier::Height(height),
1260 ..
1261 } if *height == Height::new(11)
1262 ));
1263 let TestMessage::HintFinalized { height, targets } = &drained[1] else {
1264 panic!("expected hint");
1265 };
1266 assert_eq!(*height, Height::new(10));
1267 assert_eq!(targets.len().get(), 2);
1268 assert!(targets.contains(&first));
1269 assert!(targets.contains(&second));
1270 }
1271
1272 #[test]
1273 fn policy_keeps_highest_floor_and_prune() {
1274 let mut overflow = pending();
1275
1276 <TestMessage as Policy>::handle(&mut overflow, set_floor(5));
1277 <TestMessage as Policy>::handle(&mut overflow, set_floor(3));
1278 <TestMessage as Policy>::handle(&mut overflow, set_floor(8));
1279 <TestMessage as Policy>::handle(&mut overflow, prune(4));
1280 <TestMessage as Policy>::handle(&mut overflow, prune(2));
1281 <TestMessage as Policy>::handle(&mut overflow, prune(7));
1282
1283 assert_eq!(
1284 overflow.floor.as_ref().map(Finalization::round),
1285 Some(round(8))
1286 );
1287 assert_eq!(overflow.prune, Some(Height::new(7)));
1288 assert!(overflow.messages.is_empty());
1289
1290 let drained = drain(&mut overflow);
1291 assert_eq!(drained.len(), 2);
1292 assert!(matches!(
1293 &drained[0],
1294 TestMessage::SetFloor { finalization } if finalization.round() == round(8)
1295 ));
1296 assert!(matches!(
1297 &drained[1],
1298 TestMessage::Prune { height } if *height == Height::new(7)
1299 ));
1300 }
1301
1302 #[test]
1303 fn policy_replaces_floor_and_prune_and_drops_stale_pending_on_drain() {
1304 let mut overflow = pending();
1305
1306 overflow.floor = Some(finalization(5));
1307 let (get_info_4, _get_info_4_rx) = get_info(4);
1308 let (get_block_7, _get_block_7_rx) = get_block(7);
1309 let (get_block_8, _get_block_8_rx) = get_block(8);
1310 overflow
1311 .messages
1312 .push_back(PendingMessage::Message(get_info_4));
1313 overflow
1314 .messages
1315 .push_back(PendingMessage::Message(get_block_7));
1316 overflow.hint_finalized(Height::new(8), NonEmptyVec::new(public_key(1)));
1317 overflow
1318 .messages
1319 .push_back(PendingMessage::Message(get_block_8));
1320 <TestMessage as Policy>::handle(&mut overflow, set_floor(8));
1321 <TestMessage as Policy>::handle(&mut overflow, prune(8));
1322 assert_eq!(
1323 overflow.floor.as_ref().map(Finalization::round),
1324 Some(round(8))
1325 );
1326 assert_eq!(overflow.messages.len(), 1);
1327 assert!(!has_get_info(&overflow, 4));
1328 assert!(!has_get_block(&overflow, 7));
1329 assert!(has_get_block(&overflow, 8));
1330 assert!(hint_targets(&overflow, 8).is_none());
1331 let drained = drain(&mut overflow);
1332 assert_eq!(drained.len(), 3);
1333 assert!(matches!(
1334 &drained[0],
1335 TestMessage::SetFloor { finalization } if finalization.round() == round(8)
1336 ));
1337 assert!(matches!(
1338 &drained[1],
1339 TestMessage::Prune { height } if *height == Height::new(8)
1340 ));
1341 assert!(matches!(
1342 &drained[2],
1343 TestMessage::GetBlock {
1344 identifier: Identifier::Height(height),
1345 ..
1346 } if *height == Height::new(8)
1347 ));
1348
1349 let mut overflow = pending();
1350 overflow.prune = Some(Height::new(5));
1351 let (get_finalization_4, _get_finalization_4_rx) = get_finalization(4);
1352 let (get_block_6, _get_block_6_rx) = get_block(6);
1353 let (get_block_7, _get_block_7_rx) = get_block(7);
1354 overflow
1355 .messages
1356 .push_back(PendingMessage::Message(get_finalization_4));
1357 overflow
1358 .messages
1359 .push_back(PendingMessage::Message(get_block_6));
1360 overflow.hint_finalized(Height::new(6), NonEmptyVec::new(public_key(2)));
1361 overflow
1362 .messages
1363 .push_back(PendingMessage::Message(get_block_7));
1364 <TestMessage as Policy>::handle(&mut overflow, prune(7));
1365 assert_eq!(overflow.prune, Some(Height::new(7)));
1366 assert_eq!(overflow.messages.len(), 1);
1367 assert!(!has_get_finalization(&overflow, 4));
1368 assert!(!has_get_block(&overflow, 6));
1369 assert!(has_get_block(&overflow, 7));
1370 assert!(hint_targets(&overflow, 6).is_none());
1371 let drained = drain(&mut overflow);
1372 assert_eq!(drained.len(), 2);
1373 assert!(matches!(
1374 &drained[0],
1375 TestMessage::Prune { height } if *height == Height::new(7)
1376 ));
1377 assert!(matches!(
1378 &drained[1],
1379 TestMessage::GetBlock {
1380 identifier: Identifier::Height(height),
1381 ..
1382 } if *height == Height::new(7)
1383 ));
1384 }
1385
1386 #[test]
1387 fn policy_prune_drops_closed_pending() {
1388 let mut overflow = pending();
1389 let (closed_message, closed_rx) = get_block(8);
1390 drop(closed_rx);
1391 let (open_message, mut open_rx) = get_block(8);
1392
1393 overflow
1394 .messages
1395 .push_back(PendingMessage::Message(closed_message));
1396 overflow
1397 .messages
1398 .push_back(PendingMessage::Message(open_message));
1399
1400 <TestMessage as Policy>::handle(&mut overflow, prune(7));
1401 assert_eq!(overflow.messages.len(), 1);
1402 assert!(has_get_block(&overflow, 8));
1403 assert!(matches!(open_rx.try_recv(), Err(TryRecvError::Empty)));
1404
1405 let mut overflow = pending();
1406 let (closed_message, closed_rx) = get_finalization(8);
1407 drop(closed_rx);
1408 let (open_message, mut open_rx) = get_finalization(8);
1409
1410 overflow
1411 .messages
1412 .push_back(PendingMessage::Message(closed_message));
1413 overflow
1414 .messages
1415 .push_back(PendingMessage::Message(open_message));
1416
1417 <TestMessage as Policy>::handle(&mut overflow, prune(7));
1418 assert_eq!(overflow.messages.len(), 1);
1419 assert!(has_get_finalization(&overflow, 8));
1420 assert!(matches!(open_rx.try_recv(), Err(TryRecvError::Empty)));
1421 }
1422
1423 #[test]
1424 fn policy_skips_retain_when_prune_height_does_not_increase() {
1425 let mut overflow = pending();
1426 <TestMessage as Policy>::handle(&mut overflow, prune(10));
1427
1428 let (closed_message, closed_rx) = get_block(11);
1429 drop(closed_rx);
1430 overflow
1431 .messages
1432 .push_back(PendingMessage::Message(closed_message));
1433
1434 <TestMessage as Policy>::handle(&mut overflow, set_floor(9));
1435 assert_eq!(overflow.messages.len(), 1);
1436
1437 <TestMessage as Policy>::handle(&mut overflow, prune(9));
1438 assert_eq!(overflow.messages.len(), 1);
1439
1440 <TestMessage as Policy>::handle(&mut overflow, prune(12));
1441 assert!(overflow.messages.is_empty());
1442 }
1443
1444 #[test]
1445 fn policy_drops_stale_requests_against_pending_floor_and_prune() {
1446 let mut overflow = pending();
1447 let (get_info_4, _get_info_4_rx) = get_info(4);
1448 let (get_info_5, _get_info_5_rx) = get_info(5);
1449 let (get_info_6, _get_info_6_rx) = get_info(6);
1450 let (get_info_7, _get_info_7_rx) = get_info(7);
1451 let (get_block_4, _get_block_4_rx) = get_block(4);
1452 let (get_block_5, _get_block_5_rx) = get_block(5);
1453 let (get_block_6, _get_block_6_rx) = get_block(6);
1454 let (get_block_7, _get_block_7_rx) = get_block(7);
1455 let (get_finalization_4, _get_finalization_4_rx) = get_finalization(4);
1456 let (get_finalization_6, _get_finalization_6_rx) = get_finalization(6);
1457
1458 <TestMessage as Policy>::handle(&mut overflow, set_floor(5));
1459 <TestMessage as Policy>::handle(&mut overflow, get_info_4);
1460 <TestMessage as Policy>::handle(&mut overflow, get_info_5);
1461 <TestMessage as Policy>::handle(&mut overflow, get_block_4);
1462 <TestMessage as Policy>::handle(&mut overflow, get_block_5);
1463 <TestMessage as Policy>::handle(&mut overflow, get_finalization_4);
1464 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(5, public_key(1)));
1465 <TestMessage as Policy>::handle(&mut overflow, hint_finalized(6, public_key(2)));
1466
1467 <TestMessage as Policy>::handle(&mut overflow, prune(7));
1468 assert!(has_prune(&overflow, 7));
1469 <TestMessage as Policy>::handle(&mut overflow, get_info_6);
1470 <TestMessage as Policy>::handle(&mut overflow, get_finalization_6);
1471 assert!(!has_get_finalization(&overflow, 6));
1472 <TestMessage as Policy>::handle(&mut overflow, get_block_6);
1473 <TestMessage as Policy>::handle(&mut overflow, get_info_7);
1474 assert!(has_get_info(&overflow, 7));
1475 <TestMessage as Policy>::handle(&mut overflow, get_block_7);
1476 assert!(has_get_block(&overflow, 7));
1477
1478 let drained = drain(&mut overflow);
1479 assert_eq!(drained.len(), 4);
1480 assert!(matches!(
1481 &drained[0],
1482 TestMessage::SetFloor { finalization } if finalization.round() == round(5)
1483 ));
1484 assert!(matches!(
1485 &drained[1],
1486 TestMessage::Prune { height } if *height == Height::new(7)
1487 ));
1488 assert!(matches!(
1489 &drained[2],
1490 TestMessage::GetInfo {
1491 identifier: Identifier::Height(height),
1492 ..
1493 } if *height == Height::new(7)
1494 ));
1495 assert!(matches!(
1496 &drained[3],
1497 TestMessage::GetBlock {
1498 identifier: Identifier::Height(height),
1499 ..
1500 } if *height == Height::new(7)
1501 ));
1502 }
1503
1504 #[test]
1505 fn policy_keeps_block_messages_and_waiters() {
1506 let mut overflow = pending();
1507
1508 let (proposed_message, mut proposed_ack) = proposed(4);
1509 let (verified_message, mut verified_ack) = verified(6);
1510 let (certified_message, mut certified_ack) = certified(8);
1511 overflow
1512 .messages
1513 .push_back(PendingMessage::Message(proposed_message));
1514 overflow
1515 .messages
1516 .push_back(PendingMessage::Message(verified_message));
1517 overflow
1518 .messages
1519 .push_back(PendingMessage::Message(certified_message));
1520
1521 <TestMessage as Policy>::handle(&mut overflow, set_floor(7));
1522 assert!(has_block_message(&overflow, 4));
1523 assert!(has_block_message(&overflow, 6));
1524 assert!(has_block_message(&overflow, 8));
1525 assert!(matches!(proposed_ack.try_recv(), Err(TryRecvError::Empty)));
1526 assert!(matches!(verified_ack.try_recv(), Err(TryRecvError::Empty)));
1527 assert!(matches!(certified_ack.try_recv(), Err(TryRecvError::Empty)));
1528
1529 <TestMessage as Policy>::handle(&mut overflow, prune(9));
1530 assert!(has_block_message(&overflow, 8));
1531 assert!(matches!(certified_ack.try_recv(), Err(TryRecvError::Empty)));
1532
1533 let (stale, mut stale_ack) = proposed(8);
1534 <TestMessage as Policy>::handle(&mut overflow, stale);
1535 assert!(has_block_message(&overflow, 8));
1536 assert!(matches!(stale_ack.try_recv(), Err(TryRecvError::Empty)));
1537
1538 let (current, mut current_ack) = verified(9);
1539 <TestMessage as Policy>::handle(&mut overflow, current);
1540 assert!(has_block_message(&overflow, 9));
1541 assert!(matches!(current_ack.try_recv(), Err(TryRecvError::Empty)));
1542
1543 let drained = drain(&mut overflow);
1544 assert!(matches!(drained[0], TestMessage::SetFloor { .. }));
1545 assert!(matches!(drained[1], TestMessage::Prune { .. }));
1546 }
1547}